Cesar Rodas 1 жил өмнө
parent
commit
20ce47e3f3

+ 2 - 5
utxo/src/storage/cache/batch.rs

@@ -75,14 +75,11 @@ where
         Ok(())
     }
 
-    async fn get_transaction_with_revision(
+    async fn get_and_lock_transaction(
         &mut self,
         transaction_id: &RevisionId,
-        revision_id: &RevisionId,
     ) -> Result<Transaction, Error> {
-        self.inner
-            .get_transaction_with_revision(transaction_id, revision_id)
-            .await
+        self.inner.get_and_lock_transaction(transaction_id).await
     }
 
     async fn relate_account_to_transaction(

+ 16 - 6
utxo/src/storage/mod.rs

@@ -3,6 +3,7 @@ use crate::{
     amount::AmountCents, payment::PaymentTo, transaction::Type, AccountId, Amount, Asset,
     PaymentFrom, PaymentId, RevisionId, Transaction,
 };
+//use chrono::{DateTime, Utc};
 use serde::Serialize;
 
 pub mod cache;
@@ -102,14 +103,12 @@ pub trait Batch<'a> {
     /// The batch is commited into the storage layer and it is consumed.
     async fn commit(self) -> Result<(), Error>;
 
-    /// Gets a transaction id from the Batch transaction, making sure the current revision is still
-    /// the known one. It is expected this function to be called before any update to the
-    /// transaction, to make sure that only the current revision is updated, therefore this function
-    /// is expected to put some hold on the transaction
-    async fn get_transaction_with_revision(
+    /// Returns a transaction from the batch's point of view, giving the ability to lock the
+    /// resource to update it. The lock should be released when the transaction is dropped or
+    /// committed.
+    async fn get_and_lock_transaction(
         &mut self,
         transaction_id: &RevisionId,
-        revision_id: &RevisionId,
     ) -> Result<Transaction, Error>;
 
     /// Flag the given payments as spent by the given transaction.
@@ -212,6 +211,17 @@ pub trait Storage {
         target_amount: AmountCents,
     ) -> Result<Vec<PaymentFrom>, Error>;
 
+    /*
+    ///
+    async fn get_revision(&self, revision_id: &RevisionId) -> Result<Transaction, Error>;
+
+    ///
+    async fn get_all_revisions(
+        &self,
+        transaction_id: &RevisionId,
+    ) -> Result<Vec<(RevisionId, String, DateTime<Utc>)>, Error>;
+    */
+
     /// Returns a transaction object by id
     async fn get_transaction(&self, transaction_id: &RevisionId) -> Result<Transaction, Error>;
 

+ 20 - 14
utxo/src/storage/sqlite/batch.rs

@@ -56,14 +56,14 @@ impl<'a> storage::Batch<'a> for Batch<'a> {
         Ok(())
     }
 
-    async fn get_transaction_with_revision(
+    async fn get_and_lock_transaction(
         &mut self,
         transaction_id: &RevisionId,
-        revision_id: &RevisionId,
     ) -> Result<Transaction, Error> {
         let row = sqlx::query(
             r#"
-            SELECT
+           SELECT
+                "t"."blob_id" as "current_id",
                 "b"."blob"
             FROM
                 "transactions" as "t",
@@ -71,22 +71,28 @@ impl<'a> storage::Batch<'a> for Batch<'a> {
             WHERE
                 "t"."transaction_id" = ?
                 AND "t"."blob_id" = "b"."id"
-                AND "t"."blob_id" = ?
-        "#,
+                "#,
         )
         .bind(transaction_id.to_string())
-        .bind(revision_id.to_string())
         .fetch_one(&mut *self.inner)
         .await
         .map_err(|e| Error::Storage(e.to_string()))?;
 
         let encoded = row
-            .try_get::<Vec<u8>, usize>(0)
+            .try_get::<Vec<u8>, usize>(1)
             .map_err(|e| Error::Storage(e.to_string()))?;
-        bincode::deserialize::<Revision>(&encoded)
-            .map_err(|e| Error::Storage(e.to_string()))?
-            .try_into()
-            .map_err(|e: crate::transaction::Error| Error::Storage(e.to_string()))
+        Transaction::from_revision(
+            bincode::deserialize::<Revision>(&encoded)
+                .map_err(|e| Error::Storage(e.to_string()))?,
+            Some(transaction_id.clone()),
+            Some(
+                row.try_get::<String, usize>(0)
+                    .map_err(|e| Error::Storage(e.to_string()))?
+                    .parse::<RevisionId>()
+                    .map_err(|e| Error::Storage(e.to_string()))?,
+            ),
+        )
+        .map_err(|e| Error::Encoding(e.to_string()))
     }
 
     async fn commit(self) -> Result<(), Error> {
@@ -260,7 +266,7 @@ impl<'a> storage::Batch<'a> for Batch<'a> {
                 VALUES(?, ?, ?, ?, ?)
             "#,
         )
-        .bind(transaction.current.to_string())
+        .bind(transaction.revision_id.to_string())
         .bind(transaction.id.to_string())
         .bind(bincode::serialize(transaction.inner()).map_err(|e| Error::Encoding(e.to_string()))?)
         .bind(transaction.status().to_string())
@@ -276,7 +282,7 @@ impl<'a> storage::Batch<'a> for Batch<'a> {
                 WHERE "transaction_id" = ? and "blob_id" = ?
                 "#,
             )
-            .bind(transaction.current.to_string())
+            .bind(transaction.revision_id.to_string())
             .bind(transaction.status().to_string())
             .bind(transaction.id.to_string())
             .bind(previous.to_string())
@@ -295,7 +301,7 @@ impl<'a> storage::Batch<'a> for Batch<'a> {
             "#,
             )
             .bind(transaction.id.to_string())
-            .bind(transaction.current.to_string())
+            .bind(transaction.revision_id.to_string())
             .bind(transaction.status().to_string())
             .execute(&mut *self.inner)
             .await

+ 36 - 17
utxo/src/storage/sqlite/mod.rs

@@ -241,6 +241,7 @@ impl Storage for SQLite {
         let row = sqlx::query(
             r#"
             SELECT
+                "t"."blob_id" as "current_id",
                 "b"."blob"
             FROM
                 "transactions" as "t",
@@ -256,13 +257,21 @@ impl Storage for SQLite {
         .map_err(|e| Error::Storage(e.to_string()))?;
 
         let encoded = row
-            .try_get::<Vec<u8>, usize>(0)
+            .try_get::<Vec<u8>, usize>(1)
             .map_err(|e| Error::Storage(e.to_string()))?;
 
-        bincode::deserialize::<Revision>(&encoded)
-            .map_err(|e| Error::Storage(e.to_string()))?
-            .try_into()
-            .map_err(|e: crate::transaction::Error| Error::Storage(e.to_string()))
+        Transaction::from_revision(
+            bincode::deserialize::<Revision>(&encoded)
+                .map_err(|e| Error::Storage(e.to_string()))?,
+            Some(transaction_id.clone()),
+            Some(
+                row.try_get::<String, usize>(0)
+                    .map_err(|e| Error::Storage(e.to_string()))?
+                    .parse::<RevisionId>()
+                    .map_err(|e| Error::Storage(e.to_string()))?,
+            ),
+        )
+        .map_err(|e| Error::Encoding(e.to_string()))
     }
 
     async fn get_transactions(
@@ -279,7 +288,8 @@ impl Storage for SQLite {
 
         let sql = if types.is_empty() {
             r#"SELECT
-                "b"."id",
+                "t"."transaction_id",
+                "t"."blob_id",
                 "b"."blob"
             FROM
                 "transaction_accounts" as "ta",
@@ -299,7 +309,8 @@ impl Storage for SQLite {
                 .join(",");
             format!(
                 r#"SELECT
-                    "b"."id",
+                    "t"."transaction_id",
+                    "t"."blob_id",
                     "b"."blob"
                 FROM
                     "transaction_accounts" as "ta",
@@ -321,19 +332,27 @@ impl Storage for SQLite {
             .map_err(|e| Error::Storage(e.to_string()))?
             .into_iter()
             .map(|row| {
-                let id = row
-                    .try_get::<String, usize>(0)
-                    .map_err(|e| Error::Storage(e.to_string()))?;
                 let encoded = row
-                    .try_get::<Vec<u8>, usize>(1)
+                    .try_get::<Vec<u8>, usize>(2)
                     .map_err(|e| Error::Storage(e.to_string()))?;
 
-                bincode::deserialize::<Revision>(&encoded)
-                    .map_err(|e| Error::Storage(format!("Error parsing {id}: {}", e.to_string())))?
-                    .try_into()
-                    .map_err(|e| {
-                        Error::Storage(format!("Error converting {id} to Transaction: {}", e))
-                    })
+                Transaction::from_revision(
+                    bincode::deserialize::<Revision>(&encoded)
+                        .map_err(|e| Error::Storage(e.to_string()))?,
+                    Some(
+                        row.try_get::<String, usize>(0)
+                            .map_err(|e| Error::Storage(e.to_string()))?
+                            .parse::<RevisionId>()
+                            .map_err(|e| Error::Storage(e.to_string()))?,
+                    ),
+                    Some(
+                        row.try_get::<String, usize>(1)
+                            .map_err(|e| Error::Storage(e.to_string()))?
+                            .parse::<RevisionId>()
+                            .map_err(|e| Error::Storage(e.to_string()))?,
+                    ),
+                )
+                .map_err(|e| Error::Encoding(e.to_string()))
             })
             .collect::<Result<Vec<Transaction>, Error>>()
     }

+ 114 - 104
utxo/src/transaction/inner.rs

@@ -23,8 +23,6 @@ use std::collections::HashMap;
 /// security that its content was not altered.
 #[derive(Debug, Clone, Deserialize, Serialize)]
 pub struct Revision {
-    /// A pointer to the first revision of the transaction.
-    first: Option<RevisionId>,
     /// Any previous transaction that this transaction is replacing.
     previous: Option<RevisionId>,
     /// A human-readable description of the transaction changes.
@@ -161,26 +159,18 @@ impl Revision {
 pub struct Transaction {
     /// The RevisionId is the RevisionID of the first revision of the transaction.
     pub id: RevisionId,
+
     /// Current Revision ID.
-    pub current: RevisionId,
+    pub revision_id: RevisionId,
+
+    /// Latest revision of this transaction
+    pub lastest_revision: RevisionId,
+
     /// The transaction inner details
     #[serde(flatten)]
     inner: Revision,
 }
 
-impl TryFrom<Revision> for Transaction {
-    type Error = Error;
-
-    fn try_from(inner: Revision) -> Result<Self, Self::Error> {
-        let id = inner.calculate_id()?;
-        Ok(Transaction {
-            id: inner.first.clone().unwrap_or_else(|| id.clone()),
-            current: id,
-            inner,
-        })
-    }
-}
-
 impl Transaction {
     /// Creates a new external deposit transaction
     ///
@@ -192,52 +182,25 @@ impl Transaction {
         status: Status,
         pay_to: Vec<(AccountId, Amount)>,
     ) -> Result<Transaction, Error> {
-        Revision {
-            first: None,
-            changelog: "".to_owned(),
-            previous: None,
-            spends: vec![],
-            creates: pay_to
-                .into_iter()
-                .map(|(to, amount)| PaymentTo { to, amount })
-                .collect(),
-            reference,
-            typ: Type::Deposit,
-            tags: Vec::new(),
-            status,
-            created_at: Utc::now(),
-            updated_at: Utc::now(),
-        }
-        .try_into()
-    }
-
-    /// Returns the previous revision of this transaction, if any
-    pub fn previous(&self) -> Option<RevisionId> {
-        self.inner.previous.clone()
-    }
-
-    /// Returns a unique list of accounts involved in this transaction.
-    ///
-    /// Accounts are sorted and unique, and they include the accounts that spent and that receives
-    pub fn accounts(&self) -> Vec<AccountId> {
-        let mut accounts = self
-            .inner
-            .creates
-            .iter()
-            .map(|x| x.to.clone())
-            .collect::<Vec<_>>();
-
-        accounts.extend(
-            self.inner
-                .spends
-                .iter()
-                .map(|x| x.from.clone())
-                .collect::<Vec<_>>(),
-        );
-
-        accounts.sort();
-        accounts.dedup();
-        accounts
+        Self::from_revision(
+            Revision {
+                changelog: "".to_owned(),
+                previous: None,
+                spends: vec![],
+                creates: pay_to
+                    .into_iter()
+                    .map(|(to, amount)| PaymentTo { to, amount })
+                    .collect(),
+                reference,
+                typ: Type::Deposit,
+                tags: Vec::new(),
+                status,
+                created_at: Utc::now(),
+                updated_at: Utc::now(),
+            },
+            None,
+            None,
+        )
     }
 
     /// Creates a new external withdrawal transaction
@@ -248,25 +211,22 @@ impl Transaction {
         status: Status,
         spend: Vec<PaymentFrom>,
     ) -> Result<Transaction, Error> {
-        Revision {
-            first: None,
-            changelog: "".to_owned(),
-            previous: None,
-            spends: spend,
-            creates: vec![],
-            reference,
-            typ: Type::Withdrawal,
-            tags: Vec::new(),
-            status,
-            created_at: Utc::now(),
-            updated_at: Utc::now(),
-        }
-        .try_into()
-    }
-
-    /// Gets the inner transaction
-    pub fn inner(&self) -> &Revision {
-        &self.inner
+        Self::from_revision(
+            Revision {
+                changelog: "".to_owned(),
+                previous: None,
+                spends: spend,
+                creates: vec![],
+                reference,
+                typ: Type::Withdrawal,
+                tags: Vec::new(),
+                status,
+                created_at: Utc::now(),
+                updated_at: Utc::now(),
+            },
+            None,
+            None,
+        )
     }
 
     /// Creates a new transaction
@@ -287,20 +247,74 @@ impl Transaction {
             .map(|(to, amount)| PaymentTo { to, amount })
             .collect();
 
-        Revision {
-            first: None,
-            changelog: "".to_owned(),
-            previous: None,
-            spends,
-            creates: create,
-            reference,
-            typ,
-            tags: Vec::new(),
-            status,
-            created_at: Utc::now(),
-            updated_at: Utc::now(),
-        }
-        .try_into()
+        Self::from_revision(
+            Revision {
+                changelog: "".to_owned(),
+                previous: None,
+                spends,
+                creates: create,
+                reference,
+                typ,
+                tags: Vec::new(),
+                status,
+                created_at: Utc::now(),
+                updated_at: Utc::now(),
+            },
+            None,
+            None,
+        )
+    }
+
+    /// Creates a new transaction object from a given revision
+    pub fn from_revision(
+        revision: Revision,
+        first_revision: Option<RevisionId>,
+        lastest_revision: Option<RevisionId>,
+    ) -> Result<Self, Error> {
+        let revision_id = revision.calculate_id()?;
+        let first_revision = first_revision.unwrap_or_else(|| revision_id.clone());
+        let lastest_revision = lastest_revision.unwrap_or_else(|| revision_id.clone());
+
+        Ok(Transaction {
+            id: first_revision,
+            revision_id,
+            lastest_revision,
+            inner: revision,
+        })
+    }
+
+    /// Gets the inner transaction
+    pub fn inner(&self) -> &Revision {
+        &self.inner
+    }
+
+    /// Returns the previous revision of this transaction, if any
+    pub fn previous(&self) -> Option<RevisionId> {
+        self.inner.previous.clone()
+    }
+
+    /// Returns a unique list of accounts involved in this transaction.
+    ///
+    /// Accounts are sorted and unique, and they include the accounts that spent and that receives
+    pub fn accounts(&self) -> Vec<AccountId> {
+        let mut accounts = self
+            .inner
+            .creates
+            .iter()
+            .map(|x| x.to.clone())
+            .collect::<Vec<_>>();
+
+        accounts.extend(
+            self.inner
+                .spends
+                .iter()
+                .map(|x| x.from.clone())
+                .collect::<Vec<_>>(),
+        );
+
+        accounts.sort();
+        accounts.dedup();
+        accounts
     }
 
     /// Prepares a transaction ammend to update its status.
@@ -324,13 +338,10 @@ impl Transaction {
             .is_valid_transition(&self.inner.status, &new_status)?;
         let mut inner = self.inner;
         inner.changelog = reason;
-        if inner.first.is_none() {
-            inner.first = Some(self.id);
-        }
         inner.updated_at = Utc::now();
-        inner.previous = Some(self.current);
+        inner.previous = Some(self.revision_id);
         inner.status = new_status;
-        let mut x: Transaction = inner.try_into()?;
+        let mut x: Transaction = Transaction::from_revision(inner, Some(self.id), None)?;
         x.persist(config).await?;
         Ok(x)
     }
@@ -339,7 +350,7 @@ impl Transaction {
     pub(crate) fn validate(&self) -> Result<(), Error> {
         let calculated_revision_id = self.inner.calculate_id()?;
 
-        if self.current != calculated_revision_id
+        if self.revision_id != calculated_revision_id
             || (self.inner.previous.is_none() && self.id != calculated_revision_id)
         {
             return Err(Error::InvalidRevisionId(
@@ -404,13 +415,12 @@ impl Transaction {
 
         if let Some(previous_id) = &self.inner.previous {
             // Make sure this update is updating the last revision and the status is not final
-            let current_transaction = batch
-                .get_transaction_with_revision(&self.id, previous_id)
-                .await?;
+            let current_transaction = batch.get_and_lock_transaction(&self.id).await?;
 
             if config.status.is_final(&current_transaction.inner.status)
                 || self.inner.transaction_fingerprint()?
                     != current_transaction.inner.transaction_fingerprint()?
+                || *previous_id != current_transaction.lastest_revision
             {
                 return Err(Error::TransactionUpdatesNotAllowed);
             }