Cesar Rodas 1 жил өмнө
parent
commit
b1444dbbdd

+ 21 - 5
client.js

@@ -14,7 +14,7 @@ async function deposit(account, amount, asset) {
 			memo: "deposit",
 			account,
 			amount: amount.toString(),
-			status: 'settled',
+			status: 'pending',
 			asset,
 		})
 	}));
@@ -48,16 +48,32 @@ async function trade(amount, asset, from, to) {
 					asset,
 				}
 			],
-			status: 'settled',
+			status: 'pending',
 			asset,
 		})
 	}));
 	return response.json();
 }
 
+async function settle(id) {
+	const response = (await fetch(`http://127.0.0.1:8080/${id}`, {
+		method: "POST",
+		headers: {
+			"Content-Type": "application/json",
+		},
+		body: JSON.stringify({
+			status: "settled",
+			memo: "trade",
+		})
+	}));
+	return response.json();
+}
+
 async function test() {
-	console.log(await deposit(addr1, 100, "BTC"));
-	console.log(await trade(10, "BTC", addr1, addr2))
+	const d = (await deposit(addr1, 100, "BTC"));
+	console.log(d);
+	console.log(await settle(d.id));
+	//console.log(await trade(10, "BTC", addr1, addr2))
 }
 
-test()
+test()

+ 7 - 1
utxo/Cargo.toml

@@ -25,7 +25,13 @@ thiserror = "1.0.48"
 tokio = { version = "1.32.0", features = ["full"] }
 
 [dev-dependencies]
-sqlx = { version = "0.7.1", features = ["sqlite"] }
+sqlx = { version = "0.7.1", features = [
+    "runtime-tokio",
+    "runtime-async-std-native-tls",
+    "tls-native-tls",
+    "sqlite",
+    "chrono",
+] }
 
 [features]
 default = []

+ 110 - 0
utxo/src/changelog.rs

@@ -0,0 +1,110 @@
+use chrono::{serde::ts_milliseconds, DateTime, Utc};
+use serde::{de::DeserializeOwned, Deserialize, Serialize};
+use sha2::{Digest, Sha256};
+use std::collections::{HashMap, VecDeque};
+
+#[derive(thiserror::Error, Debug, Serialize)]
+pub enum Error {
+    #[error("Missing change {0:?}")]
+    MissingChange(Vec<u8>),
+
+    #[error("Unexpected changes {0:?}")]
+    UnexpectedChanges(Vec<Vec<u8>>),
+
+    #[error("Bincode: {0}")]
+    #[serde(serialize_with = "crate::error::serialize_to_string")]
+    Bincode(#[from] bincode::Error),
+}
+
+#[derive(Clone, Debug, Serialize, Deserialize)]
+#[serde(bound = "T: Serialize + DeserializeOwned")]
+pub struct Changelog<T: DeserializeOwned + Serialize + Send + Sync> {
+    #[serde(skip)]
+    pub previous: Option<Vec<u8>>,
+    #[serde(skip)]
+    pub object_id: Vec<u8>,
+    #[serde(flatten)]
+    pub change: T,
+    #[serde(with = "ts_milliseconds")]
+    pub created_at: DateTime<Utc>,
+}
+
+impl<T: DeserializeOwned + Serialize + Send + Sync> Changelog<T> {
+    pub fn new(previous: Option<Vec<u8>>, object_id: Vec<u8>, change: T) -> Changelog<T> {
+        Self {
+            previous,
+            object_id,
+            change,
+            created_at: Utc::now(),
+        }
+    }
+
+    pub fn new_from_db(
+        previous: Option<Vec<u8>>,
+        object_id: Vec<u8>,
+        change: T,
+        created_at: DateTime<Utc>,
+    ) -> Changelog<T> {
+        Self {
+            previous,
+            object_id,
+            change,
+            created_at,
+        }
+    }
+
+    pub fn id(&self) -> Result<Vec<u8>, Error> {
+        let mut hasher = Sha256::new();
+        hasher.update(&self.object_id);
+        hasher.update(if let Some(v) = self.previous.as_ref() {
+            v.clone()
+        } else {
+            vec![0, 0]
+        });
+        hasher.update(&bincode::serialize(&self.change)?);
+        hasher.update(&bincode::serialize(&self.created_at)?);
+        Ok(hasher.finalize().to_vec())
+    }
+}
+
+pub fn sort_changes<T: DeserializeOwned + Serialize + Send + Sync>(
+    changes: Vec<Changelog<T>>,
+    last_change: Vec<u8>,
+) -> Result<Vec<Changelog<T>>, Error> {
+    let mut changes_by_id = changes
+        .into_iter()
+        .map(|a| a.id().map(|id| (id, a)))
+        .collect::<Result<HashMap<Vec<u8>, Changelog<T>>, _>>()?;
+
+    println!("changes_by_id: {:?}", changes_by_id.keys());
+
+    let mut sorted_changes = VecDeque::new();
+
+    let last_change = match changes_by_id.remove(&last_change) {
+        Some(change) => change,
+        None => return Err(Error::MissingChange(last_change)),
+    };
+
+    sorted_changes.push_front(last_change);
+
+    loop {
+        let first_element = sorted_changes.get(0).unwrap();
+        if let Some(id) = first_element.previous.as_ref() {
+            let last_change = match changes_by_id.remove(id) {
+                Some(change) => change,
+                None => return Err(Error::MissingChange(id.clone())),
+            };
+            sorted_changes.push_front(last_change);
+        } else {
+            break;
+        }
+    }
+
+    if !changes_by_id.is_empty() {
+        return Err(Error::UnexpectedChanges(
+            changes_by_id.into_keys().collect(),
+        ));
+    }
+
+    Ok(sorted_changes.into())
+}

+ 10 - 1
utxo/src/error.rs

@@ -1,5 +1,6 @@
 use crate::{amount, asset::AssetId, storage, transaction, AccountId};
-use serde::Serialize;
+use serde::{Serialize, Serializer};
+use std::fmt::Display;
 
 #[derive(thiserror::Error, Debug, Serialize)]
 pub enum Error {
@@ -21,3 +22,11 @@ pub enum Error {
     #[error("Invalid amount: {0}")]
     InvalidAmount(#[from] amount::Error),
 }
+
+pub fn serialize_to_string<S, T>(value: &T, serializer: S) -> Result<S::Ok, S::Error>
+where
+    S: Serializer,
+    T: ToString + Display,
+{
+    serializer.serialize_str(&value.to_string())
+}

+ 7 - 0
utxo/src/id.rs

@@ -135,6 +135,13 @@ macro_rules! Id {
                 &self.bytes
             }
         }
+
+        impl std::ops::Deref for $id {
+            type Target = [u8; 32];
+            fn deref(&self) -> &Self::Target {
+                &self.bytes
+            }
+        }
     };
 }
 

+ 1 - 0
utxo/src/lib.rs

@@ -1,6 +1,7 @@
 mod amount;
 mod asset;
 mod asset_manager;
+mod changelog;
 mod error;
 mod id;
 mod ledger;

+ 37 - 12
utxo/src/sqlite/batch.rs

@@ -1,7 +1,9 @@
 use crate::{
+    changelog::Changelog,
     storage::{self, Error},
     AccountId, Payment, PaymentId, Status, Transaction, TransactionId,
 };
+use serde::{de::DeserializeOwned, Serialize};
 use sqlx::{Row, Sqlite, Transaction as SqlxTransaction};
 use std::marker::PhantomData;
 
@@ -35,26 +37,48 @@ impl<'a> storage::Batch<'a> for Batch<'a> {
             .map_err(|e| Error::Storage(e.to_string()))
     }
 
+    async fn store_changelog<T: DeserializeOwned + Serialize + Send + Sync>(
+        &mut self,
+        changelog: &Vec<Changelog<T>>,
+    ) -> Result<(), Error> {
+        for change in changelog.iter() {
+            let change_bytes =
+                bincode::serialize(&change.change).map_err(|e| Error::Storage(e.to_string()))?;
+
+            sqlx::query(
+                r#"
+                INSERT INTO "changelog"("id", "previous", "object_id", "change", "created_at")
+                VALUES(?, ?, ?, ?, ?)
+                ON CONFLICT("id")
+                    DO NOTHING
+            "#,
+            )
+            .bind(change.id().map_err(|e| Error::Storage(e.to_string()))?)
+            .bind(&change.previous)
+            .bind(&change.object_id)
+            .bind(change_bytes)
+            .bind(change.created_at)
+            .execute(&mut *self.inner)
+            .await
+            .map_err(|e| Error::Storage(e.to_string()))?;
+        }
+        Ok(())
+    }
+
     async fn spend_payment(
         &mut self,
         payment_id: &PaymentId,
-        status: Status,
-        transaction_id: &TransactionId,
+        transaction_id: Option<&TransactionId>,
     ) -> Result<(), Error> {
         let result = sqlx::query(
             r#"
                 UPDATE payments SET "spent_by" = ?
-                WHERE "transaction_id" = ? AND "position_id" = ? AND ("spent_by" IS NULL OR "spent_by" = ?)
+                WHERE "transaction_id" = ? AND "position_id" = ?
             "#,
         )
-        .bind(if status.is_rollback() {
-            None
-        } else {
-            Some(transaction_id.to_string())
-        })
+        .bind(transaction_id.map(|t| t.to_string()))
         .bind(payment_id.transaction.to_string())
         .bind(payment_id.position.to_string())
-        .bind(transaction_id.to_string())
         .execute(&mut *self.inner)
         .await
         .map_err(|e| Error::SpendPayment(e.to_string()))?;
@@ -123,16 +147,17 @@ impl<'a> storage::Batch<'a> for Batch<'a> {
     async fn store_transaction(&mut self, transaction: &Transaction) -> Result<(), Error> {
         sqlx::query(
             r#"
-                INSERT INTO "transactions"("transaction_id", "status", "type", "reference", "created_at", "updated_at")
-                VALUES(?, ?, ?, ?, ?, ?)
+                INSERT INTO "transactions"("transaction_id", "status", "type", "reference", "last_version", "created_at", "updated_at")
+                VALUES(?, ?, ?, ?, ?, ?, ?)
                 ON CONFLICT("transaction_id")
-                    DO UPDATE SET "status" = excluded."status", "updated_at" = excluded."updated_at"
+                    DO UPDATE SET "status" = excluded."status", "updated_at" = excluded."updated_at", "last_version" = excluded."last_version"
             "#,
         )
         .bind(transaction.id().to_string())
         .bind::<u32>(transaction.status().into())
         .bind::<u32>(transaction.typ().into())
         .bind(transaction.reference())
+        .bind(transaction.last_version())
         .bind(transaction.created_at())
         .bind(transaction.updated_at())
         .execute(&mut *self.inner)

+ 69 - 4
utxo/src/sqlite/mod.rs

@@ -1,14 +1,16 @@
 use crate::{
     amount::AmountCents,
     asset::AssetId,
+    changelog::{sort_changes, Changelog},
     storage::{Error, Storage},
-    transaction::{from_db, Type},
+    transaction::{self, from_db, Type},
     AccountId, Amount, Asset, AssetManager, Payment, PaymentId, Status, TransactionId,
 };
 use chrono::NaiveDateTime;
 use futures::TryStreamExt;
+use serde::{de::DeserializeOwned, Serialize};
 use sqlx::{sqlite::SqliteRow, Executor, Row};
-use std::{collections::HashMap, marker::PhantomData};
+use std::{collections::HashMap, marker::PhantomData, ops::Deref};
 
 mod batch;
 
@@ -51,6 +53,7 @@ impl<'a> Sqlite<'a> {
                 "status" INTEGER NOT NULL,
                 "type" INTEGER NOT NULL,
                 "reference" TEXT NOT NULL,
+                "last_version" BLOB NOT NULL,
                 "created_at" DATETIME DEFAULT CURRENT_TIMESTAMP,
                 "updated_at" DATETIME DEFAULT CURRENT_TIMESTAMP,
                 PRIMARY KEY ("transaction_id")
@@ -72,6 +75,15 @@ impl<'a> Sqlite<'a> {
                 "updated_at" DATETIME DEFAULT CURRENT_TIMESTAMP,
                 PRIMARY KEY ("transaction_id", "payment_transaction_id", "payment_position_id")
             );
+            CREATE TABLE IF NOT EXISTS "changelog" (
+                "id" BLOB NOT NULL,
+                "object_id" BLOB NOT NULL,
+                "previous" BLOB DEFAULT NULL,
+                "change" BLOB NOT NULL,
+                "created_at" DATETIME DEFAULT CURRENT_TIMESTAMP,
+                PRIMARY KEY ("id")
+            );
+            CREATE INDEX IF NOT EXISTS "changelog_object_id" ON "changelog" ("object_id", "created_at");
         "#,
         )
         .await
@@ -308,6 +320,7 @@ impl<'a> Storage<'a, Batch<'a>> for Sqlite<'a> {
                 "t"."status",
                 "t"."type",
                 "t"."reference",
+                "t"."last_version",
                 "t"."created_at",
                 "t"."updated_at"
             FROM
@@ -403,21 +416,34 @@ impl<'a> Storage<'a, Batch<'a>> for Sqlite<'a> {
             .try_get::<String, usize>(2)
             .map_err(|_| Error::Storage("Invalid reference".to_string()))?;
 
+        let last_change = transaction_row
+            .try_get::<Vec<u8>, usize>(3)
+            .map_err(|_| Error::Storage("Invalid last_version".to_string()))?;
+
         let created_at = transaction_row
-            .try_get::<NaiveDateTime, usize>(3)
+            .try_get::<NaiveDateTime, usize>(4)
             .map_err(|e| Error::InvalidDate(e.to_string()))?
             .and_utc();
 
         let updated_at = transaction_row
-            .try_get::<NaiveDateTime, usize>(4)
+            .try_get::<NaiveDateTime, usize>(5)
             .map_err(|e| Error::InvalidDate(e.to_string()))?
             .and_utc();
 
+        drop(transaction_row);
+        drop(create_result);
+        drop(conn);
+
+        let changes = self
+            .get_changelogs::<transaction::Changelog>(transaction_id.deref().to_vec())
+            .await?;
+
         Ok(from_db::Transaction {
             id: transaction_id.clone(),
             spend,
             create,
             typ,
+            changelog: sort_changes(changes, last_change)?,
             status,
             reference,
             created_at,
@@ -471,4 +497,43 @@ impl<'a> Storage<'a, Batch<'a>> for Sqlite<'a> {
 
         Ok(transactions)
     }
+
+    async fn get_changelogs<T: DeserializeOwned + Serialize + Send + Sync>(
+        &self,
+        object_id: Vec<u8>,
+    ) -> Result<Vec<Changelog<T>>, Error> {
+        let mut conn = self
+            .db
+            .acquire()
+            .await
+            .map_err(|e| Error::Storage(e.to_string()))?;
+
+        let rows = sqlx::query(
+            r#"SELECT  "previous", "change", "created_at", "id" FROM "changelog" WHERE "object_id" = ? ORDER BY "created_at" ASC"#,
+        ).bind(&object_id).fetch_all(&mut *conn).await.map_err(|e| Error::Storage(e.to_string()))?;
+
+        let mut results = vec![];
+
+        for row in rows.into_iter() {
+            let change: T = bincode::deserialize(
+                &row.try_get::<Vec<u8>, usize>(1)
+                    .map_err(|_| Error::Storage("Invalid change content".to_string()))?,
+            )
+            .map_err(|e| Error::Storage(e.to_string()))?;
+            let created_at = row
+                .try_get::<NaiveDateTime, usize>(2)
+                .map_err(|e| Error::InvalidDate(e.to_string()))?
+                .and_utc();
+
+            results.push(Changelog::new_from_db(
+                row.try_get::<Option<Vec<u8>>, usize>(0)
+                    .map_err(|_| Error::Storage("Invalid previous content".to_string()))?,
+                object_id.clone(),
+                change,
+                created_at,
+            ));
+        }
+
+        Ok(results)
+    }
 }

+ 17 - 3
utxo/src/storage.rs

@@ -1,10 +1,11 @@
 use crate::{
     amount::AmountCents,
     asset::AssetId,
+    changelog::{self, Changelog},
     transaction::{from_db, Type},
     AccountId, Amount, Payment, PaymentId, Status, Transaction, TransactionId,
 };
-use serde::Serialize;
+use serde::{de::DeserializeOwned, Serialize};
 
 #[derive(thiserror::Error, Debug, Serialize)]
 pub enum Error {
@@ -23,6 +24,9 @@ pub enum Error {
     #[error("Record not found")]
     NotFound,
 
+    #[error("Error while parsing changelog: {0}")]
+    Changelog(#[from] changelog::Error),
+
     /// TODO: Convert the AmountCents to Amount for better error reporting upstream
     #[error("Not enough unspent payments (missing {0} cents)")]
     NotEnoughUnspentPayments(AmountCents),
@@ -34,11 +38,15 @@ pub trait Batch<'a> {
 
     async fn commit(self) -> Result<(), Error>;
 
+    async fn store_changelog<T: DeserializeOwned + Serialize + Send + Sync>(
+        &mut self,
+        changelog: &Vec<Changelog<T>>,
+    ) -> Result<(), Error>;
+
     async fn spend_payment(
         &mut self,
         payment_id: &PaymentId,
-        status: Status,
-        transaction_id: &TransactionId,
+        transaction_id: Option<&TransactionId>,
     ) -> Result<(), Error>;
 
     async fn get_payment_status(
@@ -117,4 +125,10 @@ where
         account: &AccountId,
         typ: Option<Type>,
     ) -> Result<Vec<from_db::Transaction>, Error>;
+
+    /// Returns all changelogs associated with a given object_id. The result should be sorted from oldest to newest.
+    async fn get_changelogs<T: DeserializeOwned + Serialize + Send + Sync>(
+        &self,
+        object_id: Vec<u8>,
+    ) -> Result<Vec<Changelog<T>>, Error>;
 }

+ 8 - 0
utxo/src/transaction/changelog.rs

@@ -0,0 +1,8 @@
+use crate::Status;
+use serde::{Deserialize, Serialize};
+
+#[derive(Debug, Deserialize, Serialize, Clone)]
+pub struct Changelog {
+    pub status: Status,
+    pub reason: String,
+}

+ 5 - 12
utxo/src/transaction/error.rs

@@ -1,7 +1,5 @@
-use std::fmt::Display;
-
 use crate::{storage, Amount, Asset, Status, TransactionId};
-use serde::{Serialize, Serializer};
+use serde::Serialize;
 
 #[derive(thiserror::Error, Debug, Serialize)]
 pub enum Error {
@@ -33,20 +31,15 @@ pub enum Error {
     Storage(#[from] storage::Error),
 
     #[error("Internal error at serializing: {0}")]
-    #[serde(serialize_with = "serialize_to_string")]
+    #[serde(serialize_with = "crate::error::serialize_to_string")]
     Internal(#[from] Box<bincode::ErrorKind>),
 
+    #[error("Invalid changelog: {0}")]
+    Changelog(#[from] crate::changelog::Error),
+
     #[error("Invalid calculated id {0} (expected {1})")]
     InvalidTransactionId(TransactionId, TransactionId),
 
     #[error("Overflow")]
     Overflow,
 }
-
-fn serialize_to_string<S, T>(value: &T, serializer: S) -> Result<S::Ok, S::Error>
-where
-    S: Serializer,
-    T: ToString + Display,
-{
-    serializer.serialize_str(&value.to_string())
-}

+ 2 - 1
utxo/src/transaction/from_db.rs

@@ -1,5 +1,5 @@
 use super::Type;
-use crate::{Payment, Status, TransactionId};
+use crate::{changelog::Changelog, Payment, Status, TransactionId};
 use chrono::{DateTime, Utc};
 
 pub struct Transaction {
@@ -9,6 +9,7 @@ pub struct Transaction {
     pub reference: String,
     pub typ: Type,
     pub status: Status,
+    pub changelog: Vec<Changelog<super::Changelog>>,
     pub created_at: DateTime<Utc>,
     pub updated_at: DateTime<Utc>,
 }

+ 64 - 3
utxo/src/transaction/inner.rs

@@ -1,5 +1,6 @@
 use crate::{
     amount::AmountCents,
+    changelog::Changelog,
     storage::{Batch, Storage},
     transaction::*,
     AccountId, Amount, Asset, Payment, PaymentId, Status, TransactionId,
@@ -7,7 +8,7 @@ use crate::{
 use chrono::{serde::ts_milliseconds, DateTime, Utc};
 use serde::Serialize;
 use sha2::{Digest, Sha256};
-use std::collections::HashMap;
+use std::{collections::HashMap, ops::Deref};
 
 /// Transactions
 ///
@@ -47,6 +48,7 @@ pub struct Transaction {
     typ: Type,
     #[serde(skip_serializing_if = "Vec::is_empty")]
     creates: Vec<Payment>,
+    changelog: Vec<Changelog<super::Changelog>>,
     status: Status,
     #[serde(with = "ts_milliseconds")]
     created_at: DateTime<Utc>,
@@ -74,6 +76,16 @@ impl Transaction {
                 input
             })
             .collect();
+
+        let changelog: Vec<Changelog<changelog::Changelog>> = vec![Changelog::new(
+            None,
+            id.deref().to_vec(),
+            super::Changelog {
+                status: status.clone(),
+                reason: "".to_owned(),
+            },
+        )];
+
         Ok(Self {
             id,
             spends: spend,
@@ -81,6 +93,7 @@ impl Transaction {
             typ: Type::Withdrawal,
             reference,
             status,
+            changelog,
             created_at,
             updated_at: Utc::now(),
         })
@@ -116,6 +129,15 @@ impl Transaction {
             })
             .collect();
 
+        let changelog: Vec<Changelog<changelog::Changelog>> = vec![Changelog::new(
+            None,
+            id.deref().to_vec(),
+            super::Changelog {
+                status: status.clone(),
+                reason: "".to_owned(),
+            },
+        )];
+
         Ok(Self {
             id,
             spends: vec![],
@@ -123,6 +145,7 @@ impl Transaction {
             reference,
             typ: Type::Deposit,
             status,
+            changelog,
             created_at,
             updated_at: Utc::now(),
         })
@@ -174,13 +197,23 @@ impl Transaction {
             })
             .collect();
 
+        let changelog: Vec<Changelog<changelog::Changelog>> = vec![Changelog::new(
+            None,
+            id.deref().to_vec(),
+            super::Changelog {
+                status: status.clone(),
+                reason: "".to_owned(),
+            },
+        )];
+
         Ok(Self {
-            id,
+            id: id,
             reference,
             spends: spend,
             typ,
             creates: create,
             status,
+            changelog,
             created_at,
             updated_at: Utc::now(),
         })
@@ -230,6 +263,15 @@ impl Transaction {
             self.creates.iter_mut().for_each(|payment| {
                 payment.status = new_status.clone();
             });
+            let previous = self.changelog.last().map(|x| x.id()).transpose()?;
+            self.changelog.push(Changelog::new(
+                previous,
+                self.id.deref().to_vec(),
+                super::Changelog {
+                    status: new_status.clone(),
+                    reason: "".to_owned(),
+                },
+            ));
             self.status = new_status;
             Ok(())
         } else {
@@ -354,6 +396,14 @@ impl Transaction {
         &self.reference
     }
 
+    pub fn last_version(&self) -> Option<Vec<u8>> {
+        if let Some(Ok(x)) = self.changelog.last().map(|x| x.id()) {
+            Some(x)
+        } else {
+            None
+        }
+    }
+
     pub fn created_at(&self) -> DateTime<Utc> {
         self.created_at
     }
@@ -384,12 +434,22 @@ impl Transaction {
         }
         for input in self.spends.iter() {
             batch
-                .spend_payment(&input.id, self.status.clone(), &self.id)
+                .spend_payment(
+                    &input.id,
+                    if self.status.is_rollback() {
+                        None
+                    } else {
+                        Some(&self.id)
+                    },
+                )
                 .await?;
             batch
                 .relate_account_to_transaction(&self, &input.to)
                 .await?;
         }
+
+        batch.store_changelog(&self.changelog).await?;
+
         batch.commit().await?;
         Ok(())
     }
@@ -406,6 +466,7 @@ impl TryFrom<from_db::Transaction> for Transaction {
             creates: value.create,
             reference: value.reference,
             status: value.status,
+            changelog: value.changelog,
             created_at: value.created_at,
             updated_at: value.updated_at,
         };

+ 2 - 1
utxo/src/transaction/mod.rs

@@ -1,6 +1,7 @@
+mod changelog;
 mod error;
 pub mod from_db;
 mod inner;
 mod typ;
 
-pub use self::{error::Error, inner::Transaction, typ::Type};
+pub use self::{changelog::Changelog, error::Error, inner::Transaction, typ::Type};