Преглед изворни кода

Update token (#12)

* WIP: add token update manager

* Has a better idea

The whole locking mechanism will be implemented in a higher level, as the
revision layer.

The idea ise to reuse existing infrastructure/code and to leave a trail of the
locked transactions and their usage

* Implement a locking mechanism at the Revision layer

This method is high level and is still secure as it relies in the revision for concurrency.

A proof of locking and update token is attached forever as part of the transaction history

* Move token_manager to the config

* Moved things around

The payload that was used to sign the Token is needed in order to commit

* Add tests

* Enhance store()

If the status is not changing, do not attempt to update the UTXOs

* Render binary data as base64

* Remove base64 and use the built-in id

* Add API to lock transactions
César D. Rodas пре 9 месеци
родитељ
комит
c673d7b907

+ 1 - 0
Cargo.lock

@@ -3265,6 +3265,7 @@ dependencies = [
  "borsh",
  "chrono",
  "futures",
+ "hmac",
  "parking_lot 0.12.2",
  "rand 0.8.5",
  "serde",

+ 47 - 0
src/lock.rs

@@ -0,0 +1,47 @@
+use crate::{Context, Handler};
+use actix_web::{post, web, HttpResponse, Responder};
+use serde::{Deserialize, Serialize};
+use serde_json::json;
+use verax::{RevId, TxId};
+
+#[derive(Deserialize)]
+pub struct Lock {
+    id: TxId,
+    client_name: String,
+}
+
+#[derive(Serialize)]
+pub struct Response {
+    id: TxId,
+    #[serde(rename = "_rev")]
+    revision_id: RevId,
+    secret: String,
+}
+
+#[async_trait::async_trait]
+impl Handler for Lock {
+    type Ok = Response;
+    type Err = verax::Error;
+
+    async fn handle(self, ledger: &Context) -> Result<Self::Ok, Self::Err> {
+        let (new_tx, token) = ledger
+            .ledger
+            .lock_transaction(self.id, self.client_name)
+            .await?;
+        Ok(Response {
+            id: new_tx.id,
+            revision_id: new_tx.revision_id,
+            secret: token.to_string(),
+        })
+    }
+}
+
+#[post("/lock")]
+async fn handler(item: web::Json<Lock>, ctx: web::Data<Context>) -> impl Responder {
+    match item.into_inner().handle(&ctx).await {
+        Ok(tx) => HttpResponse::Accepted().json(tx),
+        Err(err) => {
+            HttpResponse::InternalServerError().json(json!({ "text": err.to_string(), "err": err}))
+        }
+    }
+}

+ 2 - 0
src/main.rs

@@ -13,6 +13,7 @@ pub trait Handler {
 mod balance;
 mod deposit;
 mod get;
+mod lock;
 mod subscribe;
 mod tx;
 mod update;
@@ -64,6 +65,7 @@ async fn main() -> std::io::Result<()> {
             .service(subscribe::handler)
             .service(deposit::handler)
             .service(balance::handler)
+            .service(lock::handler)
             .service(tx::handler)
             .service(update::handler)
             .service(get::handler)

+ 6 - 3
src/update.rs

@@ -2,7 +2,7 @@ use crate::{Context, Handler};
 use actix_web::{post, web, HttpResponse, Responder};
 use serde::Deserialize;
 use serde_json::json;
-use verax::{RevId, Status, Tag};
+use verax::{RevId, Status, Tag, TokenPayload};
 
 #[derive(Deserialize)]
 pub struct UpdateOperation {
@@ -11,6 +11,7 @@ pub struct UpdateOperation {
     #[serde(default)]
     pub tags: Option<Vec<Tag>>,
     pub memo: String,
+    pub update_token: Option<TokenPayload>,
 }
 
 struct Update {
@@ -26,10 +27,12 @@ impl Handler for Update {
     async fn handle(self, ledger: &Context) -> Result<Self::Ok, Self::Err> {
         let id = self.id;
         let memo = self.operation.memo;
+        let update_token = self.operation.update_token;
+
         let id = if let Some(status) = self.operation.status {
             let transaction = ledger
                 .ledger
-                .change_status(id, status, memo.clone())
+                .change_status(id, status, memo.clone(), update_token.clone())
                 .await?;
             transaction.revision_id
         } else {
@@ -37,7 +40,7 @@ impl Handler for Update {
         };
 
         let id = if let Some(tags) = self.operation.tags {
-            let transaction = ledger.ledger.set_tags(id, tags, memo).await?;
+            let transaction = ledger.ledger.set_tags(id, tags, memo, update_token).await?;
             transaction.revision_id
         } else {
             id

+ 2 - 0
utxo/Cargo.toml

@@ -9,7 +9,9 @@ bech32 = "0.11.0"
 borsh = { version = "1.3.1", features = ["derive", "bytes", "de_strict_order"] }
 chrono = { version = "0.4.31", features = ["serde"] }
 futures = { version = "0.3.30", optional = true }
+hmac = "0.12.1"
 parking_lot = "0.12.2"
+rand = "0.8.5"
 serde = { version = "1.0.188", features = ["derive"] }
 sha2 = "0.10.7"
 sqlx = { version = "0.7.1", features = [

+ 3 - 1
utxo/src/config.rs

@@ -1,4 +1,4 @@
-use crate::{status::StatusManager, storage::Storage};
+use crate::{status::StatusManager, storage::Storage, token::TokenManager};
 
 #[derive(Debug)]
 pub struct Config<S>
@@ -6,6 +6,7 @@ where
     S: Storage + Sync + Send,
 {
     pub storage: S,
+    pub token_manager: TokenManager,
     pub status: StatusManager,
 }
 
@@ -16,6 +17,7 @@ where
     fn from(storage: S) -> Self {
         Self {
             storage,
+            token_manager: TokenManager(b"test".to_vec()),
             status: StatusManager::default(),
         }
     }

+ 13 - 1
utxo/src/error.rs

@@ -1,4 +1,4 @@
-use crate::{amount, asset::Asset, storage, transaction, AccountId};
+use crate::{amount, asset::Asset, status, storage, token, transaction, AccountId};
 use serde::Serialize;
 
 /// The errors that can happen in the Verax crate
@@ -24,6 +24,10 @@ pub enum Error {
     #[error("Storage: {0}")]
     Storage(#[from] storage::Error),
 
+    /// A status error
+    #[error("Status update: {0}")]
+    Status(#[from] status::Error),
+
     /// The asset is not defined
     #[error("Asset {0} is not defined")]
     AssetIdNotFound(Asset),
@@ -39,4 +43,12 @@ pub enum Error {
     /// The amount is invalid
     #[error("Invalid amount: {0}")]
     InvalidAmount(#[from] amount::Error),
+
+    /// Invalid token
+    #[error("Invalid update token: {0}")]
+    InvalidToken(#[from] token::Error),
+
+    /// Valid update token is required
+    #[error("Valid update token is required")]
+    ValidUpdateTokenRequired,
 }

+ 26 - 12
utxo/src/id/binary.rs

@@ -9,6 +9,7 @@ macro_rules! BinaryId {
             PartialOrd,
             Ord,
             Hash,
+            Default,
             PartialEq,
             borsh::BorshSerialize,
             borsh::BorshDeserialize,
@@ -18,18 +19,30 @@ macro_rules! BinaryId {
             bytes: [u8; 32],
         }
 
+        impl From<[u8; 32]> for $id {
+            fn from(bytes: [u8; 32]) -> Self {
+                Self { bytes }
+            }
+        }
+
         impl $id {
             /// Creates a new instance of $id from the raw bytes
             pub fn new(bytes: [u8; 32]) -> Self {
                 Self { bytes }
             }
+
+            /// Returns the raw bytes of the identifier
+            pub fn as_bytes(&self) -> &[u8; 32] {
+                &self.bytes
+            }
         }
 
-        impl FromStr for $id {
-            type Err = Error;
+        impl std::str::FromStr for $id {
+            type Err = crate::id::Error;
             fn from_str(value: &str) -> Result<Self, Self::Err> {
                 Ok(Self::try_from(value).unwrap_or_else(|_| {
-                    let mut hasher = Sha256::new();
+                    use sha2::Digest;
+                    let mut hasher = sha2::Sha256::new();
                     hasher.update(&value);
                     Self {
                         bytes: hasher.finalize().into(),
@@ -50,21 +63,22 @@ macro_rules! BinaryId {
         impl<'de> Deserialize<'de> for $id {
             fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
             where
-                D: Deserializer<'de>,
+                D: serde::Deserializer<'de>,
             {
-                let s = String::deserialize(deserializer)?;
+                use std::str::FromStr;
+                let s = <String as serde::Deserialize>::deserialize(deserializer)?;
                 // Use FromStr to parse the string and construct the struct
                 $id::from_str(&s).map_err(serde::de::Error::custom)
             }
         }
 
         impl TryFrom<&str> for $id {
-            type Error = Error;
+            type Error = crate::id::Error;
             fn try_from(value: &str) -> Result<Self, Self::Error> {
                 let (hrp, bytes) = bech32::decode(&value)?;
                 let hrp = hrp.to_string();
                 if hrp != $suffix {
-                    return Err(Error::InvalidPrefix(
+                    return Err(crate::id::Error::InvalidPrefix(
                         stringify!($id).to_owned(),
                         $suffix.to_owned(),
                         hrp,
@@ -76,11 +90,11 @@ macro_rules! BinaryId {
         }
 
         impl TryFrom<&[u8]> for $id {
-            type Error = Error;
+            type Error = crate::id::Error;
 
             fn try_from(value: &[u8]) -> Result<Self, Self::Error> {
                 if value.len() != 32 {
-                    return Err(Error::InvalidLength(
+                    return Err(crate::id::Error::InvalidLength(
                         stringify!($id).to_owned(),
                         value.len(),
                         32,
@@ -93,11 +107,11 @@ macro_rules! BinaryId {
         }
 
         impl TryFrom<Vec<u8>> for $id {
-            type Error = Error;
+            type Error = crate::id::Error;
 
             fn try_from(value: Vec<u8>) -> Result<Self, Self::Error> {
                 if value.len() != 32 {
-                    return Err(Error::InvalidLength(
+                    return Err(crate::id::Error::InvalidLength(
                         stringify!($id).to_owned(),
                         value.len(),
                         32,
@@ -109,7 +123,7 @@ macro_rules! BinaryId {
             }
         }
 
-        impl Display for $id {
+        impl std::fmt::Display for $id {
             fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
                 let hrp = bech32::Hrp::parse($suffix).map_err(|_| std::fmt::Error)?;
                 write!(

+ 0 - 1
utxo/src/id/mod.rs

@@ -1,5 +1,4 @@
 use serde::{de, Deserialize, Deserializer, Serialize};
-use sha2::{Digest, Sha256};
 use std::{fmt::Display, ops::Deref, str::FromStr};
 
 #[derive(

+ 104 - 39
utxo/src/ledger.rs

@@ -4,8 +4,8 @@ use crate::{
     config::Config,
     status::{InternalStatus, StatusManager},
     storage::{AccountTransactionType, Batch, ReceivedPaymentStatus, Storage},
-    transaction::Error as TxError,
-    transaction::Type,
+    token::TokenPayload,
+    transaction::{Error as TxError, Type},
     worker::WorkerManager,
     AccountId, Amount, Error, Filter, PaymentFrom, PaymentId, RevId, Status, Tag, Transaction,
     TxId,
@@ -258,54 +258,92 @@ where
     pub async fn store(&self, transaction: Transaction) -> Result<Transaction, Error> {
         transaction.validate()?;
 
+        let is_updating_status = if let Some(previous) = &transaction.revision.previous {
+            // Although this operation to check the previous version is being performed outside of
+            // the writer batch, it is safe to do so because the previous version is
+            // already stored in the storage layer, and the batch will make sure
+            // the previous version is the current revision, or else the entire operation will be
+            // revered
+            let previous = self
+                .config
+                .storage
+                .find(Filter {
+                    revisions: vec![previous.clone()],
+                    ..Default::default()
+                })
+                .await?
+                .pop()
+                .ok_or(Error::TxNotFound)?;
+
+            if let Some(lock_token) = previous.revision.locked.as_ref() {
+                self.config
+                    .token_manager
+                    .verify(lock_token.to_owned(), &transaction.revision.update_token)?
+            }
+
+            if previous.revision.status != transaction.revision.status {
+                self.config
+                    .status
+                    .is_valid_transition(&previous.revision.status, &transaction.revision.status)?;
+                true
+            } else {
+                false
+            }
+        } else {
+            true
+        };
+
         let mut batch = self.config.storage.begin().await?;
         if transaction.revision.previous.is_none() {
             Self::store_base_transaction(&transaction, &mut batch).await?;
         }
 
-        let (created_updated, spent_updated) = match self
-            .config
-            .status
-            .internal_type(&transaction.revision.status)
-        {
-            InternalStatus::Reverted => {
-                batch
-                    .update_transaction_payments(
-                        &transaction.id,
-                        ReceivedPaymentStatus::Failed,
-                        ReceivedPaymentStatus::Spendable,
-                    )
-                    .await?
+        if is_updating_status {
+            let (created_updated, spent_updated) = match self
+                .config
+                .status
+                .internal_type(&transaction.revision.status)
+            {
+                InternalStatus::Reverted => {
+                    batch
+                        .update_transaction_payments(
+                            &transaction.id,
+                            ReceivedPaymentStatus::Failed,
+                            ReceivedPaymentStatus::Spendable,
+                        )
+                        .await?
+                }
+                InternalStatus::Spendable => {
+                    batch
+                        .update_transaction_payments(
+                            &transaction.id,
+                            ReceivedPaymentStatus::Spendable,
+                            ReceivedPaymentStatus::Spent,
+                        )
+                        .await?
+                }
+                _ => (transaction.creates.len(), transaction.spends.len()),
+            };
+
+            if transaction.creates.len() != created_updated
+                || transaction.spends.len() != spent_updated
+            {
+                return Err(Error::Transaction(TxError::NoUpdate));
             }
-            InternalStatus::Spendable => {
+
+            if self
+                .config
+                .status
+                .is_spendable(&transaction.revision.status)
+            {
                 batch
                     .update_transaction_payments(
                         &transaction.id,
                         ReceivedPaymentStatus::Spendable,
                         ReceivedPaymentStatus::Spent,
                     )
-                    .await?
+                    .await?;
             }
-            _ => (transaction.creates.len(), transaction.spends.len()),
-        };
-
-        if transaction.creates.len() != created_updated || transaction.spends.len() != spent_updated
-        {
-            return Err(Error::Transaction(TxError::NoUpdate));
-        }
-
-        if self
-            .config
-            .status
-            .is_spendable(&transaction.revision.status)
-        {
-            batch
-                .update_transaction_payments(
-                    &transaction.id,
-                    ReceivedPaymentStatus::Spendable,
-                    ReceivedPaymentStatus::Spent,
-                )
-                .await?;
         }
 
         batch
@@ -461,12 +499,38 @@ where
         &self.config.status
     }
 
+    /// Locks the transaction and returns a token that can be used to unlock it.
+    ///
+    /// Locked transactions cannot be updated without the TokenPayload or until it expires.
+    pub async fn lock_transaction(
+        &self,
+        transaction_id: TxId,
+        owner: String,
+    ) -> Result<(Transaction, TokenPayload), Error> {
+        let filter = Filter {
+            ids: vec![transaction_id.clone()],
+            limit: 1,
+            ..Default::default()
+        };
+
+        let (new_revision, secret) = self
+            .config
+            .storage
+            .find(filter)
+            .await?
+            .pop()
+            .ok_or(Error::TxNotFound)?
+            .lock_transaction(owner, &self.config.token_manager)?;
+        Ok((self.store(new_revision).await?, secret))
+    }
+
     /// Updates a transaction and updates their tags to this given set
     pub async fn set_tags(
         &self,
         revision_id: RevId,
         tags: Vec<Tag>,
         reason: String,
+        update_token: Option<TokenPayload>,
     ) -> Result<Transaction, Error> {
         let filter = Filter {
             revisions: vec![revision_id],
@@ -480,7 +544,7 @@ where
                 .await?
                 .pop()
                 .ok_or(Error::TxNotFound)?
-                .set_tags(tags, reason)?,
+                .set_tags(tags, reason, update_token)?,
         )
         .await
     }
@@ -492,6 +556,7 @@ where
         revision_id: RevId,
         new_status: Status,
         reason: String,
+        update_token: Option<TokenPayload>,
     ) -> Result<Transaction, Error> {
         let filter = Filter {
             revisions: vec![revision_id],
@@ -505,7 +570,7 @@ where
                 .await?
                 .pop()
                 .ok_or(Error::TxNotFound)?
-                .change_status(&self.config, new_status, reason)?,
+                .change_status(&self.config, new_status, reason, update_token)?,
         )
         .await
     }

+ 2 - 0
utxo/src/lib.rs

@@ -37,6 +37,7 @@ mod status;
 pub mod storage;
 #[cfg(test)]
 mod tests;
+pub mod token;
 mod transaction;
 mod worker;
 
@@ -53,5 +54,6 @@ pub use self::{
     payment::PaymentFrom,
     serde::*,
     status::{Status, StatusManager},
+    token::TokenPayload,
     transaction::*,
 };

+ 82 - 0
utxo/src/storage/mod.rs

@@ -374,15 +374,88 @@ pub mod test {
             $crate::storage_unit_test!(find_transactions_by_status);
             $crate::storage_unit_test!(not_spendable_new_payments_not_spendable);
             $crate::storage_unit_test!(subscribe_realtime);
+            $crate::storage_unit_test!(transaction_locking);
         };
     }
 
+    pub async fn transaction_locking<T>(storage: T)
+    where
+        T: Storage + Send + Sync,
+    {
+        let config = Config {
+            storage,
+            token_manager: Default::default(),
+            status: StatusManager::default(),
+        };
+
+        let ledger = Ledger::new(config);
+
+        let asset: Asset = "USD/2".parse().expect("valid asset");
+        let deposit = Transaction::new_external_deposit(
+            "test reference".to_owned(),
+            "pending".into(),
+            vec![],
+            vec![(
+                "alice".parse().expect("account"),
+                asset.from_human("100.99").expect("valid amount"),
+            )],
+        )
+        .expect("valid tx");
+
+        let deposit = ledger.store(deposit).await.expect("valid insert");
+
+        let (deposit, update_token) = ledger
+            .lock_transaction(deposit.id, "tester".to_owned())
+            .await
+            .expect("valid locking");
+
+        assert_eq!(
+            "Invalid update token: Missing update token".to_owned(),
+            ledger
+                .change_status(
+                    deposit.revision_id.clone(),
+                    "processing".into(),
+                    "some text".to_owned(),
+                    None,
+                )
+                .await
+                .unwrap_err()
+                .to_string()
+        );
+
+        assert_eq!(
+            "Invalid update token: Invalid signature".to_owned(),
+            ledger
+                .change_status(
+                    deposit.revision_id.clone(),
+                    "processing".into(),
+                    "some text".to_owned(),
+                    Some(Default::default()),
+                )
+                .await
+                .unwrap_err()
+                .to_string()
+        );
+        let new_deposit = ledger
+            .change_status(
+                deposit.revision_id.clone(),
+                "processing".into(),
+                "some text".to_owned(),
+                Some(update_token),
+            )
+            .await
+            .expect("successful update");
+        assert!(new_deposit.revision.locked.is_none());
+        assert!(new_deposit.revision.update_token.is_some());
+    }
+
     pub async fn transaction_does_not_update_stale_revision<T>(storage: T)
     where
         T: Storage + Send + Sync,
     {
         let config = Config {
             storage,
+            token_manager: Default::default(),
             status: StatusManager::default(),
         };
 
@@ -407,6 +480,7 @@ pub mod test {
                 deposit.revision_id.clone(),
                 "processing".into(),
                 "some text".to_owned(),
+                None,
             )
             .await
             .expect("valid updated");
@@ -416,6 +490,7 @@ pub mod test {
                 deposit.revision_id.clone(),
                 "processing".into(),
                 "some text".to_owned(),
+                None,
             )
             .await
             .expect_err("stale updates are rejected by storage");
@@ -631,6 +706,7 @@ pub mod test {
         let ledger = Ledger::new(Config {
             storage,
             status: Default::default(),
+            token_manager: Default::default(),
         });
 
         let (_, mut subscription) = ledger
@@ -667,6 +743,7 @@ pub mod test {
                             "all".parse().expect("valid tag"),
                         ],
                         "add tags".to_owned(),
+                        None,
                     )
                     .await
                     .expect("tag tx");
@@ -679,6 +756,7 @@ pub mod test {
                             "all".parse().expect("valid tag"),
                         ],
                         "add tags".to_owned(),
+                        None,
                     )
                     .await
                     .expect("tag tx");
@@ -735,6 +813,7 @@ pub mod test {
         let ledger = Ledger::new(Config {
             storage,
             status: Default::default(),
+            token_manager: Default::default(),
         });
 
         for i in 0..10 {
@@ -762,6 +841,7 @@ pub mod test {
                             "all".parse().expect("valid tag"),
                         ],
                         "add tags".to_owned(),
+                        None,
                     )
                     .await
                     .expect("tag tx");
@@ -774,6 +854,7 @@ pub mod test {
                             "all".parse().expect("valid tag"),
                         ],
                         "add tags".to_owned(),
+                        None,
                     )
                     .await
                     .expect("tag tx");
@@ -824,6 +905,7 @@ pub mod test {
         let ledger = Ledger::new(Config {
             storage,
             status: Default::default(),
+            token_manager: Default::default(),
         });
 
         for i in 0..10 {

+ 22 - 6
utxo/src/tests/deposit.rs

@@ -26,7 +26,7 @@ async fn pending_deposit_and_failure() {
         .is_empty());
 
     ledger
-        .change_status(rev_id, "failed".into(), "failed due test".to_owned())
+        .change_status(rev_id, "failed".into(), "failed due test".to_owned(), None)
         .await
         .expect("valid tx");
 
@@ -131,7 +131,7 @@ async fn balance_decreases_while_pending_spending_and_confirm() {
     assert!(ledger.get_balance(&fee).await.expect("balance").is_empty());
 
     ledger
-        .change_status(rev_id, "settled".into(), "ready".to_owned())
+        .change_status(rev_id, "settled".into(), "ready".to_owned(), None)
         .await
         .expect("valid tx");
 
@@ -188,7 +188,12 @@ async fn balance_decreases_while_pending_spending_and_cancel() {
     assert!(ledger.get_balance(&fee).await.expect("balance").is_empty());
 
     ledger
-        .change_status(rev_id, "cancelled".into(), "cancelled by test".to_owned())
+        .change_status(
+            rev_id,
+            "cancelled".into(),
+            "cancelled by test".to_owned(),
+            None,
+        )
         .await
         .expect("valid tx");
 
@@ -239,7 +244,12 @@ async fn balance_decreases_while_pending_spending_and_failed() {
     assert!(ledger.get_balance(&fee).await.expect("balance").is_empty());
 
     let new_rev_id = ledger
-        .change_status(rev_id, "processing".into(), "processing now".to_owned())
+        .change_status(
+            rev_id,
+            "processing".into(),
+            "processing now".to_owned(),
+            None,
+        )
         .await
         .expect("valid tx")
         .revision_id;
@@ -257,7 +267,8 @@ async fn balance_decreases_while_pending_spending_and_failed() {
             .change_status(
                 new_rev_id.clone(),
                 "cancelled".into(),
-                "cancelled by user".to_owned()
+                "cancelled by user".to_owned(),
+                None,
             )
             .await
             .unwrap_err()
@@ -272,7 +283,12 @@ async fn balance_decreases_while_pending_spending_and_failed() {
     assert!(ledger.get_balance(&fee).await.expect("balance").is_empty());
 
     ledger
-        .change_status(new_rev_id, "failed".into(), "it has failed".to_owned())
+        .change_status(
+            new_rev_id,
+            "failed".into(),
+            "it has failed".to_owned(),
+            None,
+        )
         .await
         .expect("valid");
 

+ 6 - 1
utxo/src/tests/withdrawal.rs

@@ -198,7 +198,12 @@ async fn cancelled_withdrawal() {
     );
 
     ledger
-        .change_status(rev_id, "cancelled".into(), "cancelled by test".to_owned())
+        .change_status(
+            rev_id,
+            "cancelled".into(),
+            "cancelled by test".to_owned(),
+            None,
+        )
         .await
         .expect("valid tx");
 

+ 118 - 0
utxo/src/token.rs

@@ -0,0 +1,118 @@
+//! Lock Token module
+use borsh::{BorshDeserialize, BorshSerialize};
+use chrono::{DateTime, Duration, Utc};
+use hmac::{Hmac, Mac};
+use rand::Rng;
+use serde::{Deserialize, Serialize};
+use sha2::Sha256;
+use std::ops::Deref;
+
+type HmacSha256 = Hmac<Sha256>;
+
+crate::BinaryId!(TokenPayload, "token");
+crate::BinaryId!(TokenSignature, "sig");
+
+#[derive(thiserror::Error, Debug, Serialize)]
+/// Error type
+pub enum Error {
+    /// Missing update token
+    #[error("Missing update token")]
+    MissingUpdateToken,
+
+    /// Invalid signature
+    #[error("Invalid signature")]
+    InvalidSignature,
+
+    /// I/O error
+    #[error("IO error: {0}")]
+    #[serde(serialize_with = "crate::serialize_error_to_string")]
+    IO(#[from] std::io::Error),
+}
+
+#[derive(Debug)]
+/// Lock Token Manager
+pub struct TokenManager(pub Vec<u8>);
+
+impl Default for TokenManager {
+    fn default() -> Self {
+        let mut rng = rand::thread_rng();
+        let mut payload = [0u8; 10];
+        rng.fill(&mut payload);
+        Self(payload.to_vec())
+    }
+}
+
+impl TokenManager {
+    /// Checks if the given token is valid and still not expired
+    pub fn verify(&self, token: Token, update_token: &Option<TokenPayload>) -> Result<(), Error> {
+        if !token.is_valid() {
+            return Ok(());
+        }
+
+        let update_token = if let Some(update_token) = update_token {
+            update_token
+        } else {
+            return Err(Error::MissingUpdateToken);
+        };
+
+        let mut mac = HmacSha256::new_from_slice(&self.0).expect("HMAC can take key of any size");
+        mac.update(update_token.deref());
+
+        let result = mac.finalize().into_bytes();
+        if &result[..] != *token.signature {
+            Err(Error::InvalidSignature)
+        } else {
+            Ok(())
+        }
+    }
+
+    /// Creates a new instance of the token
+    pub fn new_token(&self, owner: String, duration: Duration) -> (Token, TokenPayload) {
+        let mut rng = rand::thread_rng();
+        let mut payload = [0u8; 32];
+        rng.fill(&mut payload);
+
+        let mut mac = HmacSha256::new_from_slice(&self.0).expect("HMAC can take key of any size");
+        mac.update(&payload);
+
+        let signature: [u8; 32] = mac.finalize().into_bytes().into();
+
+        (
+            // The token cannot be altered once it is commited, as the revision ID is the hash of
+            // the entire content, therefore it is safer to only HMAC the
+            Token {
+                expires_at: Utc::now() + duration,
+                owner,
+                signature: signature.into(),
+            },
+            payload.into(),
+        )
+    }
+}
+
+#[derive(Serialize, Deserialize, BorshSerialize, BorshDeserialize, Clone, Debug, PartialEq)]
+/// Inner token
+///
+/// This token has the transaction ID, the expiration date and the owner string. This is the data to
+/// be signed by the HMAC and sent to the client.
+pub struct Token {
+    #[borsh(
+        serialize_with = "crate::to_ts_microseconds",
+        deserialize_with = "crate::from_ts_microseconds"
+    )]
+    expires_at: DateTime<Utc>,
+    signature: TokenSignature,
+    owner: String,
+}
+
+impl Token {
+    /// Converts the token to bytes
+    pub fn to_bytes(&self) -> Result<Vec<u8>, Error> {
+        Ok(borsh::to_vec(&self)?)
+    }
+
+    /// Checks if the token is valid
+    pub fn is_valid(&self) -> bool {
+        self.expires_at > Utc::now()
+    }
+}

+ 53 - 4
utxo/src/transaction/mod.rs

@@ -1,8 +1,11 @@
 use crate::{
-    config::Config, payment::PaymentTo, storage::Storage, AccountId, Amount, FilterableValue,
-    MaxLengthString, PaymentFrom, RevId, Status, TxId,
+    config::Config,
+    payment::PaymentTo,
+    storage::Storage,
+    token::{TokenManager, TokenPayload},
+    AccountId, Amount, FilterableValue, MaxLengthString, PaymentFrom, RevId, Status, TxId,
 };
-use chrono::{DateTime, TimeZone, Utc};
+use chrono::{DateTime, Duration, TimeZone, Utc};
 use serde::{Deserialize, Serialize};
 use std::ops::Deref;
 
@@ -198,11 +201,54 @@ impl Transaction {
         })
     }
 
+    /// Locks a transaction for updates
+    ///
+    /// The transaction cannot be updated without the secret, which is returned in the tuple, or
+    /// until the lock expires.
+    pub fn lock_transaction(
+        self,
+        owner: String,
+        token_manager: &TokenManager,
+    ) -> Result<(Self, TokenPayload), Error> {
+        let (update_token, secret) = token_manager.new_token(owner, Duration::hours(1));
+        let new_revision = Revision {
+            transaction_id: self.revision.transaction_id,
+            changelog: "Lock transaction for updates".to_owned(),
+            locked: Some(update_token),
+            update_token: None,
+            previous: Some(self.revision_id),
+            tags: self.revision.tags,
+            status: self.revision.status,
+            created_at: Utc::now(),
+        };
+        let revision_id = new_revision.rev_id()?;
+        let mut revisions = self.revisions;
+        revisions.push(revision_id.clone());
+
+        Ok((
+            Transaction {
+                id: self.id,
+                revisions,
+                revision_id,
+                transaction: self.transaction,
+                revision: new_revision,
+            },
+            secret,
+        ))
+    }
+
     /// Updates the transaction tags
-    pub fn set_tags(self, new_tags: Vec<Tag>, reason: String) -> Result<Self, Error> {
+    pub fn set_tags(
+        self,
+        new_tags: Vec<Tag>,
+        reason: String,
+        update_token: Option<TokenPayload>,
+    ) -> Result<Self, Error> {
         let new_revision = Revision {
             transaction_id: self.revision.transaction_id,
             changelog: reason,
+            locked: None,
+            update_token,
             previous: Some(self.revision_id),
             tags: new_tags,
             status: self.revision.status,
@@ -234,6 +280,7 @@ impl Transaction {
         config: &Config<S>,
         new_status: Status,
         reason: String,
+        update_token: Option<TokenPayload>,
     ) -> Result<Self, Error>
     where
         S: Storage + Sync + Send,
@@ -248,6 +295,8 @@ impl Transaction {
             tags: self.revision.tags,
             status: new_status,
             created_at: Utc::now(),
+            locked: None,
+            update_token,
         };
         let revision_id = new_revision.rev_id()?;
         let mut revisions = self.revisions;

+ 25 - 1
utxo/src/transaction/revision.rs

@@ -1,4 +1,8 @@
-use crate::{transaction::Error, RevId, Status, Tag, TxId};
+use crate::{
+    token::{Token, TokenPayload},
+    transaction::Error,
+    RevId, Status, Tag, TxId,
+};
 use chrono::{serde::ts_milliseconds, DateTime, Utc};
 use serde::{Deserialize, Serialize};
 use sha2::{Digest, Sha256};
@@ -23,6 +27,24 @@ pub struct Revision {
     /// Transaction status
     pub status: Status,
 
+    /// If set, the next update must include this token's signature; otherwise, updates will not be
+    /// available until the token expires.
+    ///
+    /// This is a straightforward and user-friendly mechanism to lock transactions for further
+    /// updates. It operates without needing any external semaphore-like system to coordinate
+    /// updates, making it easy to understand and manage.
+    ///
+    /// The database plays a crucial role in ensuring correct concurrencies with its revision
+    /// design. It allows only one update to succeed, making all others stale updates. This forces
+    /// clients of failed updates to fetch the latest revision and re-submit their updates.
+    #[serde(rename = "_locked_by", skip_serializing_if = "Option::is_none")]
+    pub locked: Option<Token>,
+
+    #[serde(rename = "_update_token", skip_serializing_if = "Option::is_none")]
+    /// If the previous revision has an update token, in order to update the update_token or secret
+    /// must be provided, and it is attached to the revision.
+    pub update_token: Option<TokenPayload>,
+
     #[serde(rename = "updated_at", with = "ts_milliseconds")]
     #[borsh(
         serialize_with = "super::to_ts_microseconds",
@@ -45,6 +67,8 @@ impl Revision {
             transaction_id,
             previous,
             changelog,
+            locked: None,
+            update_token: None,
             tags,
             status,
             created_at: Utc::now(),