Browse Source

WIP: add token update manager

Cesar Rodas 9 months ago
parent
commit
8cb0336aac

+ 1 - 0
utxo/Cargo.toml

@@ -9,6 +9,7 @@ bech32 = "0.11.0"
 borsh = { version = "1.3.1", features = ["derive", "bytes", "de_strict_order"] }
 chrono = { version = "0.4.31", features = ["serde"] }
 futures = { version = "0.3.30", optional = true }
+hmac = "0.12.1"
 parking_lot = "0.12.2"
 serde = { version = "1.0.188", features = ["derive"] }
 sha2 = "0.10.7"

+ 9 - 1
utxo/src/error.rs

@@ -1,4 +1,4 @@
-use crate::{amount, asset::Asset, storage, transaction, AccountId};
+use crate::{amount, asset::Asset, storage, token, transaction, AccountId};
 use serde::Serialize;
 
 /// The errors that can happen in the Verax crate
@@ -39,4 +39,12 @@ pub enum Error {
     /// The amount is invalid
     #[error("Invalid amount: {0}")]
     InvalidAmount(#[from] amount::Error),
+
+    /// Invalid token
+    #[error("Invalid update token: {0}")]
+    InvalidToken(#[from] token::Error),
+
+    /// Valid update token is required
+    #[error("Valid update token is required")]
+    ValidUpdateTokenRequired,
 }

+ 5 - 0
utxo/src/id/binary.rs

@@ -23,6 +23,11 @@ macro_rules! BinaryId {
             pub fn new(bytes: [u8; 32]) -> Self {
                 Self { bytes }
             }
+
+            /// Returns the raw bytes of the identifier
+            pub fn as_bytes(&self) -> &[u8; 32] {
+                &self.bytes
+            }
         }
 
         impl FromStr for $id {

+ 51 - 17
utxo/src/ledger.rs

@@ -4,13 +4,13 @@ use crate::{
     config::Config,
     status::{InternalStatus, StatusManager},
     storage::{AccountTransactionType, Batch, ReceivedPaymentStatus, Storage},
-    transaction::Error as TxError,
-    transaction::Type,
+    token::{Token, TokenManager},
+    transaction::{Error as TxError, Type},
     worker::WorkerManager,
     AccountId, Amount, Error, Filter, PaymentFrom, PaymentId, RevId, Status, Tag, Transaction,
     TxId,
 };
-use std::{cmp::Ordering, collections::HashMap, sync::Arc};
+use std::{cmp::Ordering, collections::HashMap, sync::Arc, time::Duration};
 use tokio::sync::mpsc::{self, Receiver, Sender};
 
 /// The Verax ledger
@@ -20,6 +20,7 @@ where
     S: Storage + Sync + Send,
 {
     config: Config<S>,
+    token_manager: TokenManager,
     broadcaster: WorkerManager<Broadcaster>,
 }
 
@@ -31,6 +32,7 @@ where
     pub fn new(config: Config<S>) -> Arc<Self> {
         Arc::new(Self {
             config,
+            token_manager: TokenManager(b"test".to_vec()),
             broadcaster: WorkerManager::new(Broadcaster::default()),
         })
     }
@@ -248,6 +250,15 @@ where
         Ok(())
     }
 
+    /// TODO
+    pub async fn lock_transaction(&self, transaction_id: &TxId) -> Result<Token, Error> {
+        let token = self
+            .token_manager
+            .new_token(transaction_id, Duration::from_secs(60));
+        self.config.storage.lock_transaction(&token).await?;
+        Ok(token)
+    }
+
     /// Stores the current transaction object to the storage layer.
     ///
     /// This method is not idempotent, and it will fail if the transaction if the requested update
@@ -255,10 +266,24 @@ where
     ///
     /// This function will store the base transaction if it is the first revision, and will create a
     /// new revision otherwise.
-    pub async fn store(&self, transaction: Transaction) -> Result<Transaction, Error> {
+    pub async fn store(
+        &self,
+        transaction: Transaction,
+        update_token: Option<Vec<u8>>,
+    ) -> Result<Transaction, Error> {
         transaction.validate()?;
 
+        let update_token = if let Some(update_token) = update_token {
+            Some(self.token_manager.verify(&transaction.id, &update_token)?)
+        } else {
+            None
+        };
+
         let mut batch = self.config.storage.begin().await?;
+        let stored_update_token = batch.consume_update_token(&transaction.id).await?;
+        if stored_update_token != update_token {
+            return Err(Error::ValidUpdateTokenRequired);
+        }
         if transaction.revision.previous.is_none() {
             Self::store_base_transaction(&transaction, &mut batch).await?;
         }
@@ -365,10 +390,13 @@ where
     ) -> Result<Transaction, Error> {
         let (change_transaction, payments) = self.select_payments_from_accounts(from).await?;
         if let Some(change_tx) = change_transaction {
-            self.store(change_tx).await?;
+            self.store(change_tx, None).await?;
         }
-        self.store(Transaction::new(reference, status, Type::Transaction, payments, to).await?)
-            .await
+        self.store(
+            Transaction::new(reference, status, Type::Transaction, payments, to).await?,
+            None,
+        )
+        .await
     }
 
     /// Return the balances from a given account
@@ -395,12 +423,15 @@ where
         tags: Vec<Tag>,
         reference: String,
     ) -> Result<Transaction, Error> {
-        self.store(Transaction::new_external_deposit(
-            reference,
-            status,
-            tags,
-            vec![(account.clone(), amount)],
-        )?)
+        self.store(
+            Transaction::new_external_deposit(
+                reference,
+                status,
+                tags,
+                vec![(account.clone(), amount)],
+            )?,
+            None,
+        )
         .await
     }
 
@@ -421,11 +452,12 @@ where
             .select_payments_from_accounts(vec![(account.clone(), amount)])
             .await?;
         for change_tx in change_transactions.into_iter() {
-            self.store(change_tx).await?;
+            self.store(change_tx, None).await?;
         }
-        self.store(Transaction::new_external_withdrawal(
-            reference, status, payments,
-        )?)
+        self.store(
+            Transaction::new_external_withdrawal(reference, status, payments)?,
+            None,
+        )
         .await
     }
 
@@ -481,6 +513,7 @@ where
                 .pop()
                 .ok_or(Error::TxNotFound)?
                 .set_tags(tags, reason)?,
+            None,
         )
         .await
     }
@@ -506,6 +539,7 @@ where
                 .pop()
                 .ok_or(Error::TxNotFound)?
                 .change_status(&self.config, new_status, reason)?,
+            None,
         )
         .await
     }

+ 1 - 0
utxo/src/lib.rs

@@ -37,6 +37,7 @@ mod status;
 pub mod storage;
 #[cfg(test)]
 mod tests;
+pub mod token;
 mod transaction;
 mod worker;
 

+ 8 - 0
utxo/src/storage/cache/batch.rs

@@ -2,6 +2,7 @@ use super::Storage;
 use crate::{
     payment::PaymentTo,
     storage::{AccountTransactionType, Batch, Error, ReceivedPaymentStatus},
+    token::Token,
     AccountId, BaseTx, PaymentId, RevId, Revision, Tag, TxId,
 };
 use std::{collections::HashMap, marker::PhantomData, sync::Arc};
@@ -66,6 +67,13 @@ where
         Ok(())
     }
 
+    async fn consume_update_token(
+        &mut self,
+        transaction_id: &TxId,
+    ) -> Result<Option<Token>, Error> {
+        self.inner.consume_update_token(transaction_id).await
+    }
+
     async fn relate_account_to_transaction(
         &mut self,
         account_type: AccountTransactionType,

+ 5 - 0
utxo/src/storage/cache/mod.rs

@@ -2,6 +2,7 @@
 use crate::{
     amount::AmountCents,
     storage::{self, Error},
+    token::Token,
     AccountId, Amount, Asset, BaseTx, Filter, PaymentFrom, PaymentId, RevId, Revision, TxId,
 };
 use std::{collections::HashMap, sync::Arc};
@@ -77,6 +78,10 @@ where
         }
     }
 
+    async fn lock_transaction(&self, token: &Token) -> Result<(), Error> {
+        self.inner.lock_transaction(token).await
+    }
+
     async fn get_revisions(&self, transaction_id: &TxId) -> Result<Vec<RevId>, Error> {
         let cache = self.storage.revisions_by_transactions.read().await;
         if let Some(revisions) = cache.get(transaction_id) {

+ 25 - 3
utxo/src/storage/mod.rs

@@ -1,7 +1,7 @@
 //! Storage layer trait
 use crate::{
-    amount::AmountCents, payment::PaymentTo, transaction::Type, AccountId, Amount, Asset, BaseTx,
-    Filter, PaymentFrom, PaymentId, RevId, Revision, Tag, Transaction, TxId,
+    amount::AmountCents, payment::PaymentTo, token::Token, transaction::Type, AccountId, Amount,
+    Asset, BaseTx, Filter, PaymentFrom, PaymentId, RevId, Revision, Tag, Transaction, TxId,
 };
 //use chrono::{DateTime, Utc};
 use serde::Serialize;
@@ -160,6 +160,13 @@ pub trait Batch<'a> {
         status: ReceivedPaymentStatus,
     ) -> Result<(), Error>;
 
+    /// Consumes the update token, if any, returning the token.
+    ///
+    /// Update tokens one time only. Expired update tokens should be discarded by the storage engine
+    /// and None should be returned instead.
+    async fn consume_update_token(&mut self, transaction_id: &TxId)
+        -> Result<Option<Token>, Error>;
+
     /// Create a new list of payments
     async fn create_payments(
         &mut self,
@@ -259,6 +266,21 @@ pub trait Storage {
     /// Returns the balances for a given account
     async fn get_balance(&self, account: &AccountId) -> Result<Vec<Amount>, Error>;
 
+    /// Locks the transaction
+    ///
+    /// Upon success, the client will have an update token, which can be used once. The locked
+    /// transactions can only be updated with their update token; without it, the storage engine
+    /// will refuse to perform any updates. By default, the update tokens have an expiration time.
+    ///
+    /// This is a straightforward and user-friendly mechanism to lock transactions for further
+    /// updates. It operates without the need for any external semaphore-like system to coordinate
+    /// updates, making it easy to understand and manage.
+    ///
+    /// The database ensures correct concurrencies with its revision design. Only one update will
+    /// succeed, and all others will be stale updates, forcing clients of failed updates to fetch
+    /// the latest revision and re-submit their updates.
+    async fn lock_transaction(&self, token: &Token) -> Result<(), Error>;
+
     /// Returns all the negative payments that are unspent by any account
     ///
     /// If any unspent negative deposit exists with the given account and asset, it must be spend in
@@ -400,7 +422,7 @@ pub mod test {
         )
         .expect("valid tx");
 
-        let deposit = ledger.store(deposit).await.expect("valid insert");
+        let deposit = ledger.store(deposit, None).await.expect("valid insert");
 
         ledger
             .change_status(

+ 8 - 0
utxo/src/storage/sqlite/batch.rs

@@ -1,6 +1,7 @@
 use crate::{
     payment::PaymentTo,
     storage::{self, to_bytes, AccountTransactionType, Error, ReceivedPaymentStatus},
+    token::Token,
     AccountId, BaseTx, PaymentId, RevId, Revision, Tag, TxId, Type,
 };
 use sqlx::{Row, Sqlite, Transaction as SqlxTransaction};
@@ -34,6 +35,13 @@ impl<'a> storage::Batch<'a> for Batch<'a> {
             .map_err(|e| Error::Storage(e.to_string()))
     }
 
+    async fn consume_update_token(
+        &mut self,
+        _transaction_id: &TxId,
+    ) -> Result<Option<Token>, Error> {
+        todo!()
+    }
+
     async fn relate_account_to_transaction(
         &mut self,
         account_type: AccountTransactionType,

+ 13 - 0
utxo/src/storage/sqlite/mod.rs

@@ -3,6 +3,7 @@ use super::{AccountTransactionType, Cursor, ReceivedPaymentStatus};
 use crate::{
     amount::AmountCents,
     storage::{Error, Storage},
+    token::Token,
     transaction::Revision,
     AccountId, Amount, Asset, BaseTx, Filter, PaymentFrom, PaymentId, PrimaryFilter, RevId, TxId,
 };
@@ -400,6 +401,10 @@ impl SQLite {
         );
         CREATE UNIQUE INDEX IF NOT EXISTS "unique_account_transaction" ON "transaction_accounts" ("account_id", "transaction_id", "relationship");
         CREATE INDEX IF NOT EXISTS "sorted_account_transaction" ON "transaction_accounts" ("account_id", "id" desc);
+        CREATE TABLE IF NOT EXISTS "update_token" (
+            "token" VARCHAR(64) NOT NULL PRIMARY KEY,
+            "created_at" DATETIME DEFAULT CURRENT_TIMESTAMP
+        );
         "#,
         )
         .await?;
@@ -445,6 +450,14 @@ impl Storage for SQLite {
             .map_err(|x| Error::Storage(x.to_string()))
     }
 
+    async fn lock_transaction(&self, token: &Token) -> Result<(), Error> {
+        format!(
+            "SELECT * FROM transactions WHERE transaction_id = '{:?}' FOR UPDATE",
+            token
+        );
+        todo!()
+    }
+
     async fn get_balance(&self, account: &AccountId) -> Result<Vec<Amount>, Error> {
         let mut conn = self
             .db

+ 114 - 0
utxo/src/token.rs

@@ -0,0 +1,114 @@
+//! Lock Token module
+use crate::TxId;
+use borsh::{BorshDeserialize, BorshSerialize};
+use chrono::{DateTime, Utc};
+use hmac::{Hmac, Mac};
+use serde::{Deserialize, Serialize};
+use sha2::Sha256;
+use std::{ops::Deref, time::Duration};
+
+type HmacSha256 = Hmac<Sha256>;
+
+#[derive(thiserror::Error, Debug, Serialize)]
+/// Error type
+pub enum Error {
+    /// The token is expired
+    #[error("Expired token")]
+    Expired,
+
+    /// Invalid signature
+    #[error("Invalid signature")]
+    InvalidSignature,
+
+    /// I/O error
+    #[error("IO error: {0}")]
+    #[serde(serialize_with = "crate::serialize_error_to_string")]
+    IO(#[from] std::io::Error),
+}
+
+#[derive(Debug)]
+/// Lock Token Manager
+pub struct TokenManager(pub Vec<u8>);
+
+impl TokenManager {
+    /// Creates a new instance `Token` from a vector of bytes,
+    ///
+    /// The token is verified to be valid and to be still not expired before it is being returned.
+    pub fn verify(&self, transaction_id: &TxId, token: &[u8]) -> Result<Token, Error> {
+        let token: Token = borsh::BorshDeserialize::try_from_slice(token)?;
+
+        let mut mac = HmacSha256::new_from_slice(&self.0).expect("HMAC can take key of any size");
+        mac.update(&token.inner.to_bytes().unwrap_or_default());
+        let result = mac.finalize().into_bytes();
+        if &result[..] != token.proof || token.inner.transaction != *transaction_id {
+            Err(Error::InvalidSignature)
+        } else {
+            token.is_valid().map(|_| token)
+        }
+    }
+
+    /// Creates a new instance of the token
+    pub fn new_token(&self, transaction: &TxId, duration: Duration) -> Token {
+        let token = InnerToken {
+            transaction: transaction.clone(),
+            owner: "owner".to_string(),
+            expires_at: Utc::now() + duration,
+        };
+        let token_bytes = token.to_bytes().unwrap_or_default();
+        let mut mac = HmacSha256::new_from_slice(&self.0).expect("HMAC can take key of any size");
+        mac.update(&token_bytes);
+        let proof = mac.finalize().into_bytes();
+
+        Token {
+            inner: token,
+            proof: proof.to_vec(),
+        }
+    }
+}
+
+#[derive(Serialize, Deserialize, BorshSerialize, BorshDeserialize, Debug, PartialEq)]
+/// Token to lock a transaction for a period of time
+pub struct Token {
+    #[serde(flatten)]
+    inner: InnerToken,
+    proof: Vec<u8>,
+}
+
+impl Deref for Token {
+    type Target = InnerToken;
+
+    fn deref(&self) -> &Self::Target {
+        &self.inner
+    }
+}
+
+#[derive(Serialize, Deserialize, BorshSerialize, BorshDeserialize, Debug, PartialEq)]
+/// Inner token
+///
+/// This token has the transaction ID, the expiration date and the owner string. This is the data to
+/// be signed by the HMAC and sent to the client.
+pub struct InnerToken {
+    transaction: TxId,
+    #[borsh(
+        serialize_with = "crate::to_ts_microseconds",
+        deserialize_with = "crate::from_ts_microseconds"
+    )]
+    expires_at: DateTime<Utc>,
+    owner: String,
+}
+
+impl InnerToken {
+    /// Converts the token to bytes
+    pub fn to_bytes(&self) -> Result<Vec<u8>, Error> {
+        Ok(borsh::to_vec(&self)?)
+    }
+
+    /// Checks if the token is valid
+    pub fn is_valid(&self) -> Result<(), Error> {
+        if self.expires_at < Utc::now() {
+            Err(Error::Expired)
+        } else {
+            Ok(())
+        }
+    }
+}