123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108 |
- use chrono::{serde::ts_milliseconds, DateTime, Utc};
- use serde::{de::DeserializeOwned, Deserialize, Serialize};
- use sha2::{Digest, Sha256};
- use std::collections::{HashMap, VecDeque};
- #[derive(thiserror::Error, Debug, Serialize)]
- pub enum Error {
- #[error("Missing change {0:?}")]
- MissingChange(Vec<u8>),
- #[error("Unexpected changes {0:?}")]
- UnexpectedChanges(Vec<Vec<u8>>),
- #[error("Bincode: {0}")]
- #[serde(serialize_with = "crate::error::serialize_to_string")]
- Bincode(#[from] bincode::Error),
- }
- #[derive(Clone, Debug, Serialize, Deserialize)]
- #[serde(bound = "T: Serialize + DeserializeOwned")]
- pub struct Changelog<T: DeserializeOwned + Serialize + Send + Sync> {
- #[serde(skip)]
- pub previous: Option<Vec<u8>>,
- #[serde(skip)]
- pub object_id: Vec<u8>,
- #[serde(flatten)]
- pub change: T,
- #[serde(with = "ts_milliseconds")]
- pub updated_at: DateTime<Utc>,
- }
- impl<T: DeserializeOwned + Serialize + Send + Sync> Changelog<T> {
- pub fn new(previous: Option<Vec<u8>>, object_id: Vec<u8>, change: T) -> Changelog<T> {
- Self {
- previous,
- object_id,
- change,
- updated_at: Utc::now(),
- }
- }
- pub fn new_from_db(
- previous: Option<Vec<u8>>,
- object_id: Vec<u8>,
- change: T,
- created_at: DateTime<Utc>,
- ) -> Changelog<T> {
- Self {
- previous,
- object_id,
- change,
- updated_at: created_at,
- }
- }
- pub fn id(&self) -> Result<Vec<u8>, Error> {
- let mut hasher = Sha256::new();
- hasher.update(&self.object_id);
- hasher.update(if let Some(v) = self.previous.as_ref() {
- v.clone()
- } else {
- vec![0, 0]
- });
- hasher.update(&bincode::serialize(&self.change)?);
- hasher.update(&bincode::serialize(&self.updated_at)?);
- Ok(hasher.finalize().to_vec())
- }
- }
- pub fn sort_changes<T: DeserializeOwned + Serialize + Send + Sync>(
- changes: Vec<Changelog<T>>,
- last_change: Vec<u8>,
- ) -> Result<Vec<Changelog<T>>, Error> {
- let mut changes_by_id = changes
- .into_iter()
- .map(|a| a.id().map(|id| (id, a)))
- .collect::<Result<HashMap<Vec<u8>, Changelog<T>>, _>>()?;
- let mut sorted_changes = VecDeque::new();
- let last_change = match changes_by_id.remove(&last_change) {
- Some(change) => change,
- None => return Err(Error::MissingChange(last_change)),
- };
- sorted_changes.push_front(last_change);
- loop {
- let first_element = sorted_changes.get(0).unwrap();
- if let Some(id) = first_element.previous.as_ref() {
- let last_change = match changes_by_id.remove(id) {
- Some(change) => change,
- None => return Err(Error::MissingChange(id.clone())),
- };
- sorted_changes.push_front(last_change);
- } else {
- break;
- }
- }
- if !changes_by_id.is_empty() {
- return Err(Error::UnexpectedChanges(
- changes_by_id.into_keys().collect(),
- ));
- }
- Ok(sorted_changes.into())
- }
|