Browse Source

Implement a locking mechanism at the Revision layer

This method is high level and is still secure as it relies in the revision for concurrency.

A proof of locking and update token is attached forever as part of the transaction history
Cesar Rodas 9 months ago
parent
commit
0e65719a4d

+ 1 - 0
Cargo.lock

@@ -3265,6 +3265,7 @@ dependencies = [
  "borsh",
  "chrono",
  "futures",
+ "hmac",
  "parking_lot 0.12.2",
  "rand 0.8.5",
  "serde",

+ 1 - 0
utxo/Cargo.toml

@@ -11,6 +11,7 @@ chrono = { version = "0.4.31", features = ["serde"] }
 futures = { version = "0.3.30", optional = true }
 hmac = "0.12.1"
 parking_lot = "0.12.2"
+rand = "0.8.5"
 serde = { version = "1.0.188", features = ["derive"] }
 sha2 = "0.10.7"
 sqlx = { version = "0.7.1", features = [

+ 63 - 49
utxo/src/ledger.rs

@@ -4,13 +4,13 @@ use crate::{
     config::Config,
     status::{InternalStatus, StatusManager},
     storage::{AccountTransactionType, Batch, ReceivedPaymentStatus, Storage},
-    token::{Token, TokenManager},
+    token::TokenManager,
     transaction::{Error as TxError, Type},
     worker::WorkerManager,
     AccountId, Amount, Error, Filter, PaymentFrom, PaymentId, RevId, Status, Tag, Transaction,
     TxId,
 };
-use std::{cmp::Ordering, collections::HashMap, sync::Arc, time::Duration};
+use std::{cmp::Ordering, collections::HashMap, sync::Arc};
 use tokio::sync::mpsc::{self, Receiver, Sender};
 
 /// The Verax ledger
@@ -250,15 +250,6 @@ where
         Ok(())
     }
 
-    /// TODO
-    pub async fn lock_transaction(&self, transaction_id: &TxId) -> Result<Token, Error> {
-        let token = self
-            .token_manager
-            .new_token(transaction_id, Duration::from_secs(60));
-        self.config.storage.lock_transaction(&token).await?;
-        Ok(token)
-    }
-
     /// Stores the current transaction object to the storage layer.
     ///
     /// This method is not idempotent, and it will fail if the transaction if the requested update
@@ -266,24 +257,34 @@ where
     ///
     /// This function will store the base transaction if it is the first revision, and will create a
     /// new revision otherwise.
-    pub async fn store(
-        &self,
-        transaction: Transaction,
-        update_token: Option<Vec<u8>>,
-    ) -> Result<Transaction, Error> {
+    pub async fn store(&self, transaction: Transaction) -> Result<Transaction, Error> {
         transaction.validate()?;
 
-        let update_token = if let Some(update_token) = update_token {
-            Some(self.token_manager.verify(&transaction.id, &update_token)?)
-        } else {
-            None
-        };
+        if let Some(previous) = &transaction.revision.previous {
+            // Although this operation to check the previous version is being performed outside of
+            // the writer batch, it is safe to do so because the previous version is
+            // already stored in the storage layer, and the batch will make sure
+            // the previous version is the current revision, or else the entire operation will be
+            // revered
+            if let Some(lock_token) = self
+                .config
+                .storage
+                .find(Filter {
+                    revisions: vec![previous.clone()],
+                    ..Default::default()
+                })
+                .await?
+                .pop()
+                .ok_or(Error::TxNotFound)?
+                .revision
+                .locked
+            {
+                self.token_manager
+                    .verify(lock_token, &transaction.revision.update_token)?
+            }
+        }
 
         let mut batch = self.config.storage.begin().await?;
-        let stored_update_token = batch.consume_update_token(&transaction.id).await?;
-        if stored_update_token != update_token {
-            return Err(Error::ValidUpdateTokenRequired);
-        }
         if transaction.revision.previous.is_none() {
             Self::store_base_transaction(&transaction, &mut batch).await?;
         }
@@ -390,13 +391,10 @@ where
     ) -> Result<Transaction, Error> {
         let (change_transaction, payments) = self.select_payments_from_accounts(from).await?;
         if let Some(change_tx) = change_transaction {
-            self.store(change_tx, None).await?;
+            self.store(change_tx).await?;
         }
-        self.store(
-            Transaction::new(reference, status, Type::Transaction, payments, to).await?,
-            None,
-        )
-        .await
+        self.store(Transaction::new(reference, status, Type::Transaction, payments, to).await?)
+            .await
     }
 
     /// Return the balances from a given account
@@ -423,15 +421,12 @@ where
         tags: Vec<Tag>,
         reference: String,
     ) -> Result<Transaction, Error> {
-        self.store(
-            Transaction::new_external_deposit(
-                reference,
-                status,
-                tags,
-                vec![(account.clone(), amount)],
-            )?,
-            None,
-        )
+        self.store(Transaction::new_external_deposit(
+            reference,
+            status,
+            tags,
+            vec![(account.clone(), amount)],
+        )?)
         .await
     }
 
@@ -452,12 +447,11 @@ where
             .select_payments_from_accounts(vec![(account.clone(), amount)])
             .await?;
         for change_tx in change_transactions.into_iter() {
-            self.store(change_tx, None).await?;
+            self.store(change_tx).await?;
         }
-        self.store(
-            Transaction::new_external_withdrawal(reference, status, payments)?,
-            None,
-        )
+        self.store(Transaction::new_external_withdrawal(
+            reference, status, payments,
+        )?)
         .await
     }
 
@@ -493,12 +487,33 @@ where
         &self.config.status
     }
 
+    /// TODO
+    pub async fn lock_transaction(&self, transaction_id: TxId) -> Result<Vec<u8>, Error> {
+        let filter = Filter {
+            ids: vec![transaction_id.clone()],
+            limit: 1,
+            ..Default::default()
+        };
+
+        let (new_revision, secret) = self
+            .config
+            .storage
+            .find(filter)
+            .await?
+            .pop()
+            .ok_or(Error::TxNotFound)?
+            .lock_transaction(&self.token_manager)?;
+        self.store(new_revision).await?;
+        Ok(secret)
+    }
+
     /// Updates a transaction and updates their tags to this given set
     pub async fn set_tags(
         &self,
         revision_id: RevId,
         tags: Vec<Tag>,
         reason: String,
+        update_token: Option<Vec<u8>>,
     ) -> Result<Transaction, Error> {
         let filter = Filter {
             revisions: vec![revision_id],
@@ -512,8 +527,7 @@ where
                 .await?
                 .pop()
                 .ok_or(Error::TxNotFound)?
-                .set_tags(tags, reason)?,
-            None,
+                .set_tags(tags, reason, update_token)?,
         )
         .await
     }
@@ -525,6 +539,7 @@ where
         revision_id: RevId,
         new_status: Status,
         reason: String,
+        update_token: Option<Vec<u8>>,
     ) -> Result<Transaction, Error> {
         let filter = Filter {
             revisions: vec![revision_id],
@@ -538,8 +553,7 @@ where
                 .await?
                 .pop()
                 .ok_or(Error::TxNotFound)?
-                .change_status(&self.config, new_status, reason)?,
-            None,
+                .change_status(&self.config, new_status, reason, update_token)?,
         )
         .await
     }

+ 6 - 0
utxo/src/storage/mod.rs

@@ -407,6 +407,7 @@ pub mod test {
                 deposit.revision_id.clone(),
                 "processing".into(),
                 "some text".to_owned(),
+                None,
             )
             .await
             .expect("valid updated");
@@ -416,6 +417,7 @@ pub mod test {
                 deposit.revision_id.clone(),
                 "processing".into(),
                 "some text".to_owned(),
+                None,
             )
             .await
             .expect_err("stale updates are rejected by storage");
@@ -667,6 +669,7 @@ pub mod test {
                             "all".parse().expect("valid tag"),
                         ],
                         "add tags".to_owned(),
+                        None,
                     )
                     .await
                     .expect("tag tx");
@@ -679,6 +682,7 @@ pub mod test {
                             "all".parse().expect("valid tag"),
                         ],
                         "add tags".to_owned(),
+                        None,
                     )
                     .await
                     .expect("tag tx");
@@ -762,6 +766,7 @@ pub mod test {
                             "all".parse().expect("valid tag"),
                         ],
                         "add tags".to_owned(),
+                        None,
                     )
                     .await
                     .expect("tag tx");
@@ -774,6 +779,7 @@ pub mod test {
                             "all".parse().expect("valid tag"),
                         ],
                         "add tags".to_owned(),
+                        None,
                     )
                     .await
                     .expect("tag tx");

+ 22 - 6
utxo/src/tests/deposit.rs

@@ -26,7 +26,7 @@ async fn pending_deposit_and_failure() {
         .is_empty());
 
     ledger
-        .change_status(rev_id, "failed".into(), "failed due test".to_owned())
+        .change_status(rev_id, "failed".into(), "failed due test".to_owned(), None)
         .await
         .expect("valid tx");
 
@@ -131,7 +131,7 @@ async fn balance_decreases_while_pending_spending_and_confirm() {
     assert!(ledger.get_balance(&fee).await.expect("balance").is_empty());
 
     ledger
-        .change_status(rev_id, "settled".into(), "ready".to_owned())
+        .change_status(rev_id, "settled".into(), "ready".to_owned(), None)
         .await
         .expect("valid tx");
 
@@ -188,7 +188,12 @@ async fn balance_decreases_while_pending_spending_and_cancel() {
     assert!(ledger.get_balance(&fee).await.expect("balance").is_empty());
 
     ledger
-        .change_status(rev_id, "cancelled".into(), "cancelled by test".to_owned())
+        .change_status(
+            rev_id,
+            "cancelled".into(),
+            "cancelled by test".to_owned(),
+            None,
+        )
         .await
         .expect("valid tx");
 
@@ -239,7 +244,12 @@ async fn balance_decreases_while_pending_spending_and_failed() {
     assert!(ledger.get_balance(&fee).await.expect("balance").is_empty());
 
     let new_rev_id = ledger
-        .change_status(rev_id, "processing".into(), "processing now".to_owned())
+        .change_status(
+            rev_id,
+            "processing".into(),
+            "processing now".to_owned(),
+            None,
+        )
         .await
         .expect("valid tx")
         .revision_id;
@@ -257,7 +267,8 @@ async fn balance_decreases_while_pending_spending_and_failed() {
             .change_status(
                 new_rev_id.clone(),
                 "cancelled".into(),
-                "cancelled by user".to_owned()
+                "cancelled by user".to_owned(),
+                None,
             )
             .await
             .unwrap_err()
@@ -272,7 +283,12 @@ async fn balance_decreases_while_pending_spending_and_failed() {
     assert!(ledger.get_balance(&fee).await.expect("balance").is_empty());
 
     ledger
-        .change_status(new_rev_id, "failed".into(), "it has failed".to_owned())
+        .change_status(
+            new_rev_id,
+            "failed".into(),
+            "it has failed".to_owned(),
+            None,
+        )
         .await
         .expect("valid");
 

+ 6 - 1
utxo/src/tests/withdrawal.rs

@@ -198,7 +198,12 @@ async fn cancelled_withdrawal() {
     );
 
     ledger
-        .change_status(rev_id, "cancelled".into(), "cancelled by test".to_owned())
+        .change_status(
+            rev_id,
+            "cancelled".into(),
+            "cancelled by test".to_owned(),
+            None,
+        )
         .await
         .expect("valid tx");
 

+ 33 - 47
utxo/src/token.rs

@@ -1,20 +1,19 @@
 //! Lock Token module
-use crate::TxId;
 use borsh::{BorshDeserialize, BorshSerialize};
-use chrono::{DateTime, Utc};
+use chrono::{DateTime, Duration, Utc};
 use hmac::{Hmac, Mac};
+use rand::Rng;
 use serde::{Deserialize, Serialize};
 use sha2::Sha256;
-use std::{ops::Deref, time::Duration};
 
 type HmacSha256 = Hmac<Sha256>;
 
 #[derive(thiserror::Error, Debug, Serialize)]
 /// Error type
 pub enum Error {
-    /// The token is expired
-    #[error("Expired token")]
-    Expired,
+    /// Missing update token
+    #[error("Missing update token")]
+    MissingUpdateToken,
 
     /// Invalid signature
     #[error("Invalid signature")]
@@ -31,27 +30,37 @@ pub enum Error {
 pub struct TokenManager(pub Vec<u8>);
 
 impl TokenManager {
-    /// Creates a new instance `Token` from a vector of bytes,
-    ///
-    /// The token is verified to be valid and to be still not expired before it is being returned.
-    pub fn verify(&self, transaction_id: &TxId, token: &[u8]) -> Result<Token, Error> {
-        let token: Token = borsh::BorshDeserialize::try_from_slice(token)?;
+    /// Checks if the given token is valid and still not expired
+    pub fn verify(&self, token: Token, update_token: &Option<Vec<u8>>) -> Result<(), Error> {
+        if !token.is_valid() {
+            return Ok(());
+        }
+
+        let update_token = if let Some(update_token) = update_token {
+            update_token
+        } else {
+            return Err(Error::MissingUpdateToken);
+        };
 
         let mut mac = HmacSha256::new_from_slice(&self.0).expect("HMAC can take key of any size");
-        mac.update(&token.inner.to_bytes().unwrap_or_default());
+        mac.update(&token.to_bytes().unwrap_or_default());
         let result = mac.finalize().into_bytes();
-        if &result[..] != token.proof || token.inner.transaction != *transaction_id {
+        if &result[..] != update_token {
             Err(Error::InvalidSignature)
         } else {
-            token.is_valid().map(|_| token)
+            Ok(())
         }
     }
 
     /// Creates a new instance of the token
-    pub fn new_token(&self, transaction: &TxId, duration: Duration) -> Token {
-        let token = InnerToken {
-            transaction: transaction.clone(),
+    pub fn new_token(&self, duration: Duration) -> (Token, Vec<u8>) {
+        let mut rng = rand::thread_rng();
+        let mut payload = [0u8; 10];
+        rng.fill(&mut payload);
+
+        let token = Token {
             owner: "owner".to_string(),
+            payload: payload.to_vec(),
             expires_at: Utc::now() + duration,
         };
         let token_bytes = token.to_bytes().unwrap_or_default();
@@ -59,56 +68,33 @@ impl TokenManager {
         mac.update(&token_bytes);
         let proof = mac.finalize().into_bytes();
 
-        Token {
-            inner: token,
-            proof: proof.to_vec(),
-        }
-    }
-}
-
-#[derive(Serialize, Deserialize, BorshSerialize, BorshDeserialize, Debug, PartialEq)]
-/// Token to lock a transaction for a period of time
-pub struct Token {
-    #[serde(flatten)]
-    inner: InnerToken,
-    proof: Vec<u8>,
-}
-
-impl Deref for Token {
-    type Target = InnerToken;
-
-    fn deref(&self) -> &Self::Target {
-        &self.inner
+        (token, proof.to_vec())
     }
 }
 
-#[derive(Serialize, Deserialize, BorshSerialize, BorshDeserialize, Debug, PartialEq)]
+#[derive(Serialize, Deserialize, BorshSerialize, BorshDeserialize, Clone, Debug, PartialEq)]
 /// Inner token
 ///
 /// This token has the transaction ID, the expiration date and the owner string. This is the data to
 /// be signed by the HMAC and sent to the client.
-pub struct InnerToken {
-    transaction: TxId,
+pub struct Token {
     #[borsh(
         serialize_with = "crate::to_ts_microseconds",
         deserialize_with = "crate::from_ts_microseconds"
     )]
     expires_at: DateTime<Utc>,
+    payload: Vec<u8>,
     owner: String,
 }
 
-impl InnerToken {
+impl Token {
     /// Converts the token to bytes
     pub fn to_bytes(&self) -> Result<Vec<u8>, Error> {
         Ok(borsh::to_vec(&self)?)
     }
 
     /// Checks if the token is valid
-    pub fn is_valid(&self) -> Result<(), Error> {
-        if self.expires_at < Utc::now() {
-            Err(Error::Expired)
-        } else {
-            Ok(())
-        }
+    pub fn is_valid(&self) -> bool {
+        self.expires_at > Utc::now()
     }
 }

+ 43 - 4
utxo/src/transaction/mod.rs

@@ -1,8 +1,8 @@
 use crate::{
-    config::Config, payment::PaymentTo, storage::Storage, AccountId, Amount, FilterableValue,
-    MaxLengthString, PaymentFrom, RevId, Status, TxId,
+    config::Config, payment::PaymentTo, storage::Storage, token::TokenManager, AccountId, Amount,
+    FilterableValue, MaxLengthString, PaymentFrom, RevId, Status, TxId,
 };
-use chrono::{DateTime, TimeZone, Utc};
+use chrono::{DateTime, Duration, TimeZone, Utc};
 use serde::{Deserialize, Serialize};
 use std::ops::Deref;
 
@@ -198,11 +198,47 @@ impl Transaction {
         })
     }
 
+    /// TODO:
+    pub fn lock_transaction(self, token_manager: &TokenManager) -> Result<(Self, Vec<u8>), Error> {
+        let (update_token, secret) = token_manager.new_token(Duration::hours(1));
+        let new_revision = Revision {
+            transaction_id: self.revision.transaction_id,
+            changelog: "Lock transaction for updates".to_owned(),
+            locked: Some(update_token),
+            update_token: None,
+            previous: Some(self.revision_id),
+            tags: self.revision.tags,
+            status: self.revision.status,
+            created_at: Utc::now(),
+        };
+        let revision_id = new_revision.rev_id()?;
+        let mut revisions = self.revisions;
+        revisions.push(revision_id.clone());
+
+        Ok((
+            Transaction {
+                id: self.id,
+                revisions,
+                revision_id,
+                transaction: self.transaction,
+                revision: new_revision,
+            },
+            secret,
+        ))
+    }
+
     /// Updates the transaction tags
-    pub fn set_tags(self, new_tags: Vec<Tag>, reason: String) -> Result<Self, Error> {
+    pub fn set_tags(
+        self,
+        new_tags: Vec<Tag>,
+        reason: String,
+        update_token: Option<Vec<u8>>,
+    ) -> Result<Self, Error> {
         let new_revision = Revision {
             transaction_id: self.revision.transaction_id,
             changelog: reason,
+            locked: None,
+            update_token,
             previous: Some(self.revision_id),
             tags: new_tags,
             status: self.revision.status,
@@ -234,6 +270,7 @@ impl Transaction {
         config: &Config<S>,
         new_status: Status,
         reason: String,
+        update_token: Option<Vec<u8>>,
     ) -> Result<Self, Error>
     where
         S: Storage + Sync + Send,
@@ -248,6 +285,8 @@ impl Transaction {
             tags: self.revision.tags,
             status: new_status,
             created_at: Utc::now(),
+            locked: None,
+            update_token,
         };
         let revision_id = new_revision.rev_id()?;
         let mut revisions = self.revisions;

+ 21 - 1
utxo/src/transaction/revision.rs

@@ -1,4 +1,4 @@
-use crate::{transaction::Error, RevId, Status, Tag, TxId};
+use crate::{token::Token, transaction::Error, RevId, Status, Tag, TxId};
 use chrono::{serde::ts_milliseconds, DateTime, Utc};
 use serde::{Deserialize, Serialize};
 use sha2::{Digest, Sha256};
@@ -23,6 +23,24 @@ pub struct Revision {
     /// Transaction status
     pub status: Status,
 
+    /// If set, the next update must include this token's signature; otherwise, updates will not be
+    /// available until the token expires.
+    ///
+    /// This is a straightforward and user-friendly mechanism to lock transactions for further
+    /// updates. It operates without needing any external semaphore-like system to coordinate
+    /// updates, making it easy to understand and manage.
+    ///
+    /// The database plays a crucial role in ensuring correct concurrencies with its revision
+    /// design. It allows only one update to succeed, making all others stale updates. This forces
+    /// clients of failed updates to fetch the latest revision and re-submit their updates.
+    #[serde(rename = "_locked_by", skip_serializing_if = "Option::is_none")]
+    pub locked: Option<Token>,
+
+    #[serde(rename = "_update_token", skip_serializing_if = "Option::is_none")]
+    /// If the previous revision has an update token, in order to update the update_token or secret
+    /// must be provided, and it is attached to the revision.
+    pub update_token: Option<Vec<u8>>,
+
     #[serde(rename = "updated_at", with = "ts_milliseconds")]
     #[borsh(
         serialize_with = "super::to_ts_microseconds",
@@ -45,6 +63,8 @@ impl Revision {
             transaction_id,
             previous,
             changelog,
+            locked: None,
+            update_token: None,
             tags,
             status,
             created_at: Utc::now(),