changelog.rs 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. use chrono::{serde::ts_milliseconds, DateTime, Utc};
  2. use serde::{de::DeserializeOwned, Deserialize, Serialize};
  3. use sha2::{Digest, Sha256};
  4. use std::collections::{HashMap, VecDeque};
  5. #[derive(thiserror::Error, Debug, Serialize)]
  6. pub enum Error {
  7. #[error("Missing change {0:?}")]
  8. MissingChange(Vec<u8>),
  9. #[error("Unexpected changes {0:?}")]
  10. UnexpectedChanges(Vec<Vec<u8>>),
  11. #[error("Bincode: {0}")]
  12. #[serde(serialize_with = "crate::error::serialize_to_string")]
  13. Bincode(#[from] bincode::Error),
  14. }
  15. #[derive(Clone, Debug, Serialize, Deserialize)]
  16. #[serde(bound = "T: Serialize + DeserializeOwned")]
  17. pub struct Changelog<T: DeserializeOwned + Serialize + Send + Sync> {
  18. #[serde(skip)]
  19. pub previous: Option<Vec<u8>>,
  20. #[serde(skip)]
  21. pub object_id: Vec<u8>,
  22. #[serde(flatten)]
  23. pub change: T,
  24. #[serde(with = "ts_milliseconds")]
  25. pub updated_at: DateTime<Utc>,
  26. }
  27. impl<T: DeserializeOwned + Serialize + Send + Sync> Changelog<T> {
  28. pub fn new(previous: Option<Vec<u8>>, object_id: Vec<u8>, change: T) -> Changelog<T> {
  29. Self {
  30. previous,
  31. object_id,
  32. change,
  33. updated_at: Utc::now(),
  34. }
  35. }
  36. pub fn new_from_db(
  37. previous: Option<Vec<u8>>,
  38. object_id: Vec<u8>,
  39. change: T,
  40. created_at: DateTime<Utc>,
  41. ) -> Changelog<T> {
  42. Self {
  43. previous,
  44. object_id,
  45. change,
  46. updated_at: created_at,
  47. }
  48. }
  49. pub fn id(&self) -> Result<Vec<u8>, Error> {
  50. let mut hasher = Sha256::new();
  51. hasher.update(&self.object_id);
  52. hasher.update(if let Some(v) = self.previous.as_ref() {
  53. v.clone()
  54. } else {
  55. vec![0, 0]
  56. });
  57. hasher.update(&bincode::serialize(&self.change)?);
  58. hasher.update(&bincode::serialize(&self.updated_at)?);
  59. Ok(hasher.finalize().to_vec())
  60. }
  61. }
  62. pub fn sort_changes<T: DeserializeOwned + Serialize + Send + Sync>(
  63. changes: Vec<Changelog<T>>,
  64. last_change: Vec<u8>,
  65. ) -> Result<Vec<Changelog<T>>, Error> {
  66. let mut changes_by_id = changes
  67. .into_iter()
  68. .map(|a| a.id().map(|id| (id, a)))
  69. .collect::<Result<HashMap<Vec<u8>, Changelog<T>>, _>>()?;
  70. let mut sorted_changes = VecDeque::new();
  71. let last_change = match changes_by_id.remove(&last_change) {
  72. Some(change) => change,
  73. None => return Err(Error::MissingChange(last_change)),
  74. };
  75. sorted_changes.push_front(last_change);
  76. loop {
  77. let first_element = sorted_changes.get(0).unwrap();
  78. if let Some(id) = first_element.previous.as_ref() {
  79. let last_change = match changes_by_id.remove(id) {
  80. Some(change) => change,
  81. None => return Err(Error::MissingChange(id.clone())),
  82. };
  83. sorted_changes.push_front(last_change);
  84. } else {
  85. break;
  86. }
  87. }
  88. if !changes_by_id.is_empty() {
  89. return Err(Error::UnexpectedChanges(
  90. changes_by_id.into_keys().collect(),
  91. ));
  92. }
  93. Ok(sorted_changes.into())
  94. }