Cesar Rodas 1 жил өмнө
parent
commit
91981dc10a

+ 5 - 2
utxo/src/storage/cache/batch.rs

@@ -75,11 +75,14 @@ where
         Ok(())
     }
 
-    async fn get_transaction(
+    async fn get_transaction_with_revision(
         &mut self,
         transaction_id: &TransactionId,
+        revision_id: &TransactionId,
     ) -> Result<Transaction, Error> {
-        self.inner.get_transaction(transaction_id).await
+        self.inner
+            .get_transaction_with_revision(transaction_id, revision_id)
+            .await
     }
 
     async fn relate_account_to_transaction(

+ 6 - 3
utxo/src/storage/mod.rs

@@ -102,11 +102,14 @@ pub trait Batch<'a> {
     /// The batch is commited into the storage layer and it is consumed.
     async fn commit(self) -> Result<(), Error>;
 
-    /// Gets the last transaction from the Batch, potentially the database implementation should
-    /// lock the transaction record for update to avoid concurrency issues
-    async fn get_transaction(
+    /// Gets a transaction id from the Batch transaction, making sure the current revision is still
+    /// the known one. It is expected this function to be called before any update to the
+    /// transaction, to make sure that only the current revision is updated, therefore this function
+    /// is expected to put some hold on the transaction
+    async fn get_transaction_with_revision(
         &mut self,
         transaction_id: &TransactionId,
+        revision_id: &TransactionId,
     ) -> Result<Transaction, Error>;
 
     /// Flag the given payments as spent by the given transaction.

+ 8 - 5
utxo/src/storage/sqlite/batch.rs

@@ -1,7 +1,7 @@
 use crate::{
     payment::PaymentTo,
     storage::{self, Error, Status},
-    transaction::inner::TransactionInner,
+    transaction::inner::Revision,
     AccountId, PaymentId, Transaction, TransactionId, Type,
 };
 use sqlx::{Row, Sqlite, Transaction as SqlxTransaction};
@@ -56,9 +56,10 @@ impl<'a> storage::Batch<'a> for Batch<'a> {
         Ok(())
     }
 
-    async fn get_transaction(
+    async fn get_transaction_with_revision(
         &mut self,
         transaction_id: &TransactionId,
+        revision_id: &TransactionId,
     ) -> Result<Transaction, Error> {
         let row = sqlx::query(
             r#"
@@ -70,9 +71,11 @@ impl<'a> storage::Batch<'a> for Batch<'a> {
             WHERE
                 "t"."transaction_id" = ?
                 AND "t"."blob_id" = "b"."id"
+                AND "t"."blob_id" = ?
         "#,
         )
         .bind(transaction_id.to_string())
+        .bind(revision_id.to_string())
         .fetch_one(&mut *self.inner)
         .await
         .map_err(|e| Error::Storage(e.to_string()))?;
@@ -80,7 +83,7 @@ impl<'a> storage::Batch<'a> for Batch<'a> {
         let encoded = row
             .try_get::<Vec<u8>, usize>(0)
             .map_err(|e| Error::Storage(e.to_string()))?;
-        bincode::deserialize::<TransactionInner>(&encoded)
+        bincode::deserialize::<Revision>(&encoded)
             .map_err(|e| Error::Storage(e.to_string()))?
             .try_into()
             .map_err(|e: crate::transaction::Error| Error::Storage(e.to_string()))
@@ -257,7 +260,7 @@ impl<'a> storage::Batch<'a> for Batch<'a> {
                 VALUES(?, ?, ?, ?, ?)
             "#,
         )
-        .bind(transaction.revision.to_string())
+        .bind(transaction.current.to_string())
         .bind(transaction.id.to_string())
         .bind(bincode::serialize(transaction.inner()).map_err(|e| Error::Encoding(e.to_string()))?)
         .bind(transaction.status().to_string())
@@ -275,7 +278,7 @@ impl<'a> storage::Batch<'a> for Batch<'a> {
             "#,
         )
         .bind(transaction.id.to_string())
-        .bind(transaction.revision.to_string())
+        .bind(transaction.current.to_string())
         .bind(transaction.status().to_string())
         .execute(&mut *self.inner)
         .await

+ 2 - 2
utxo/src/storage/sqlite/mod.rs

@@ -2,7 +2,7 @@
 use crate::{
     amount::AmountCents,
     storage::{Error, Storage},
-    transaction::{inner::TransactionInner, Type},
+    transaction::{inner::Revision, Type},
     AccountId, Amount, Asset, PaymentFrom, PaymentId, Transaction, TransactionId,
 };
 use futures::TryStreamExt;
@@ -258,7 +258,7 @@ impl Storage for SQLite {
             .try_get::<Vec<u8>, usize>(0)
             .map_err(|e| Error::Storage(e.to_string()))?;
 
-        bincode::deserialize::<TransactionInner>(&encoded)
+        bincode::deserialize::<Revision>(&encoded)
             .map_err(|e| Error::Storage(e.to_string()))?
             .try_into()
             .map_err(|e: crate::transaction::Error| Error::Storage(e.to_string()))

+ 24 - 23
utxo/src/transaction/inner.rs

@@ -22,9 +22,9 @@ use std::collections::HashMap;
 /// Since the transaction ID is calculated from the transaction itself, to provide cryptographic
 /// security that its content was not altered.
 #[derive(Debug, Clone, Deserialize, Serialize)]
-pub struct TransactionInner {
+pub struct Revision {
     /// A pointer to the first revision of the transaction.
-    first_revision: Option<TransactionId>,
+    first: Option<TransactionId>,
     /// Any previous transaction that this transaction is replacing.
     previous: Option<TransactionId>,
     /// A human-readable description of the transaction changes.
@@ -43,7 +43,7 @@ pub struct TransactionInner {
     updated_at: DateTime<Utc>,
 }
 
-impl TransactionInner {
+impl Revision {
     pub fn calculate_id(&self) -> Result<TransactionId, Error> {
         let mut hasher = Sha256::new();
         let bytes = bincode::serialize(self)?;
@@ -162,20 +162,20 @@ pub struct Transaction {
     /// The TransactionID is the RevisionID of the first revision of the transaction.
     pub id: TransactionId,
     /// Current Revision ID.
-    pub revision: TransactionId,
+    pub current: TransactionId,
     /// The transaction inner details
     #[serde(flatten)]
-    inner: TransactionInner,
+    inner: Revision,
 }
 
-impl TryFrom<TransactionInner> for Transaction {
+impl TryFrom<Revision> for Transaction {
     type Error = Error;
 
-    fn try_from(inner: TransactionInner) -> Result<Self, Self::Error> {
+    fn try_from(inner: Revision) -> Result<Self, Self::Error> {
         let id = inner.calculate_id()?;
         Ok(Transaction {
-            id: inner.first_revision.clone().unwrap_or_else(|| id.clone()),
-            revision: id,
+            id: inner.first.clone().unwrap_or_else(|| id.clone()),
+            current: id,
             inner,
         })
     }
@@ -192,8 +192,8 @@ impl Transaction {
         status: Status,
         pay_to: Vec<(AccountId, Amount)>,
     ) -> Result<Transaction, Error> {
-        TransactionInner {
-            first_revision: None,
+        Revision {
+            first: None,
             changelog: "".to_owned(),
             previous: None,
             spends: vec![],
@@ -243,8 +243,8 @@ impl Transaction {
         status: Status,
         spend: Vec<PaymentFrom>,
     ) -> Result<Transaction, Error> {
-        TransactionInner {
-            first_revision: None,
+        Revision {
+            first: None,
             changelog: "".to_owned(),
             previous: None,
             spends: spend,
@@ -260,7 +260,7 @@ impl Transaction {
     }
 
     /// Gets the inner transaction
-    pub fn inner(&self) -> &TransactionInner {
+    pub fn inner(&self) -> &Revision {
         &self.inner
     }
 
@@ -282,8 +282,8 @@ impl Transaction {
             .map(|(to, amount)| PaymentTo { to, amount })
             .collect();
 
-        TransactionInner {
-            first_revision: None,
+        Revision {
+            first: None,
             changelog: "".to_owned(),
             previous: None,
             spends,
@@ -319,11 +319,11 @@ impl Transaction {
             .is_valid_transition(&self.inner.status, &new_status)?;
         let mut inner = self.inner;
         inner.changelog = reason;
-        if inner.first_revision.is_none() {
-            inner.first_revision = Some(self.id);
+        if inner.first.is_none() {
+            inner.first = Some(self.id);
         }
         inner.updated_at = Utc::now();
-        inner.previous = Some(self.revision);
+        inner.previous = Some(self.current);
         inner.status = new_status;
         let mut x: Transaction = inner.try_into()?;
         x.persist(config).await?;
@@ -334,7 +334,7 @@ impl Transaction {
     pub(crate) fn validate(&self) -> Result<(), Error> {
         let calculated_revision_id = self.inner.calculate_id()?;
 
-        if self.revision != calculated_revision_id
+        if self.current != calculated_revision_id
             || (self.inner.previous.is_none() && self.id != calculated_revision_id)
         {
             return Err(Error::InvalidTransactionId(
@@ -399,10 +399,11 @@ impl Transaction {
 
         if let Some(previous_id) = &self.inner.previous {
             // Make sure this update is updating the last revision and the status is not final
-            let current_transaction = batch.get_transaction(&self.id).await?;
+            let current_transaction = batch
+                .get_transaction_with_revision(&self.id, previous_id)
+                .await?;
 
-            if current_transaction.revision != *previous_id
-                || config.status.is_final(&current_transaction.inner.status)
+            if config.status.is_final(&current_transaction.inner.status)
                 || self.inner.transaction_fingerprint()?
                     != current_transaction.inner.transaction_fingerprint()?
             {