浏览代码

Moving `persist` from the transaction to the ledger struct

Cesar Rodas 10 月之前
父节点
当前提交
4a3954bc6a
共有 4 个文件被更改,包括 183 次插入162 次删除
  1. 0 11
      TODO.md
  2. 157 22
      utxo/src/ledger.rs
  3. 18 13
      utxo/src/storage/mod.rs
  4. 8 116
      utxo/src/transaction/mod.rs

+ 0 - 11
TODO.md

@@ -1,11 +0,0 @@
-- [x] Improve subscription API to detect when the client drops and stop listening
-- [ ] Improve the detection of drop-client reactive, instead of waiting until an attempted delivery fails.
-- [ ] Implement the filter by status
-- [ ] Implement the filter by since / until (this is implemented at the primary key level at the cursor)
-- [ ] Have more tests
-- [ ] Move the broadcasting of events to a different thread
-- [ ] Stop the working thread when the ledger goes out of scope
-
-Bonus
-
-- [ ] Improve the HashMap of Filter to use the filter much more efficiently with the primary key (similar to the cursor)

+ 157 - 22
utxo/src/ledger.rs

@@ -1,7 +1,14 @@
 use crate::{
-    amount::AmountCents, broadcaster::Broadcaster, config::Config, status::StatusManager,
-    storage::Storage, transaction::Type, worker::WorkerManager, AccountId, Amount, Error, Filter,
-    PaymentFrom, PaymentId, RevId, Status, Tag, Transaction, TxId,
+    amount::AmountCents,
+    broadcaster::Broadcaster,
+    config::Config,
+    status::{InternalStatus, StatusManager},
+    storage::{Batch, ReceivedPaymentStatus, Storage},
+    transaction::Error as TxError,
+    transaction::Type,
+    worker::WorkerManager,
+    AccountId, Amount, Error, Filter, PaymentFrom, PaymentId, RevId, Status, Tag, Transaction,
+    TxId,
 };
 use std::{cmp::Ordering, collections::HashMap, sync::Arc};
 use tokio::sync::mpsc::Receiver;
@@ -174,11 +181,138 @@ where
         Ok((exchange_tx, payments))
     }
 
-    /// TODO: Move the whole logic of persisting the transaction and the revision into this layer,
-    /// instead of having them at the transaction layer
-    async fn persist(&self, mut transaction: Transaction) -> Result<Transaction, Error> {
-        transaction.persist(&self.config).await?;
+    #[inline]
+    /// Persist a new base transaction
+    ///
+    /// This operation should only happen once, because if it is executed multiple times the storage
+    /// layer should fail. Base transactions are not allowed to be ammened, only revisions.
+    async fn store_base_transaction(
+        transaction: &Transaction,
+        batch: &mut S::Batch<'_>,
+    ) -> Result<(), Error> {
+        let spends = transaction
+            .spends
+            .iter()
+            .map(|x| x.id.clone())
+            .collect::<Vec<_>>();
+        batch
+            .spend_payments(
+                &transaction.revision.transaction_id,
+                spends,
+                ReceivedPaymentStatus::Locked,
+            )
+            .await?;
+        batch
+            .create_payments(
+                &transaction.revision.transaction_id,
+                &transaction.creates,
+                ReceivedPaymentStatus::Locked,
+            )
+            .await?;
+
+        for account in transaction.accounts() {
+            batch
+                .relate_account_to_transaction(
+                    &transaction.revision.transaction_id,
+                    &account,
+                    transaction.typ,
+                )
+                .await?;
+        }
+        batch
+            .store_base_transaction(
+                &transaction.revision.transaction_id,
+                &transaction.transaction,
+            )
+            .await?;
+        Ok(())
+    }
+
+    /// Stores the current transaction object to the storage layer.
+    ///
+    /// This method is not idempotent, and it will fail if the transaction if the requested update
+    /// is not allowed.
+    ///
+    /// This function will store the base transaction if it is the first revision, and will create a
+    /// new revision otherwise.
+    pub async fn store(&self, transaction: Transaction) -> Result<Transaction, Error> {
+        transaction.validate()?;
+
+        let mut batch = self.config.storage.begin().await?;
+        if transaction.revision.previous.is_none() {
+            Self::store_base_transaction(&transaction, &mut batch).await?;
+        }
+
+        let (created_updated, spent_updated) = match self
+            .config
+            .status
+            .internal_type(&transaction.revision.status)
+        {
+            InternalStatus::Reverted => {
+                batch
+                    .update_transaction_payments(
+                        &transaction.id,
+                        ReceivedPaymentStatus::Failed,
+                        ReceivedPaymentStatus::Spendable,
+                    )
+                    .await?
+            }
+            InternalStatus::Spendable => {
+                batch
+                    .update_transaction_payments(
+                        &transaction.id,
+                        ReceivedPaymentStatus::Spendable,
+                        ReceivedPaymentStatus::Spent,
+                    )
+                    .await?
+            }
+            _ => (transaction.creates.len(), transaction.spends.len()),
+        };
+
+        if transaction.creates.len() != created_updated || transaction.spends.len() != spent_updated
+        {
+            return Err(Error::Transaction(TxError::NoUpdate));
+        }
+
+        if self
+            .config
+            .status
+            .is_spendable(&transaction.revision.status)
+        {
+            batch
+                .update_transaction_payments(
+                    &transaction.id,
+                    ReceivedPaymentStatus::Spendable,
+                    ReceivedPaymentStatus::Spent,
+                )
+                .await?;
+        }
+
+        batch
+            .store_revision(&transaction.revision_id, &transaction.revision)
+            .await?;
+
+        batch
+            .tag_transaction(
+                &transaction.id,
+                &transaction.transaction,
+                &transaction.revision.tags,
+            )
+            .await?;
+
+        batch
+            .update_transaction_revision(
+                &transaction.id,
+                &transaction.revision_id,
+                transaction.revision.previous.as_ref(),
+            )
+            .await?;
+
+        batch.commit().await?;
+
+        // The transaction is persisted and now it is time to broadcast it to any possible listener
         self.brodcaster.process(transaction.clone());
+
         Ok(transaction)
     }
 
@@ -211,9 +345,9 @@ where
     ) -> Result<Transaction, Error> {
         let (change_transaction, payments) = self.select_payments_from_accounts(from).await?;
         if let Some(change_tx) = change_transaction {
-            self.persist(change_tx).await?;
+            self.store(change_tx).await?;
         }
-        self.persist(Transaction::new(reference, status, Type::Transaction, payments, to).await?)
+        self.store(Transaction::new(reference, status, Type::Transaction, payments, to).await?)
             .await
     }
 
@@ -241,7 +375,7 @@ where
         tags: Vec<Tag>,
         reference: String,
     ) -> Result<Transaction, Error> {
-        self.persist(Transaction::new_external_deposit(
+        self.store(Transaction::new_external_deposit(
             reference,
             status,
             tags,
@@ -267,9 +401,9 @@ where
             .select_payments_from_accounts(vec![(account.clone(), amount)])
             .await?;
         for change_tx in change_transactions.into_iter() {
-            self.persist(change_tx).await?;
+            self.store(change_tx).await?;
         }
-        self.persist(Transaction::new_external_withdrawal(
+        self.store(Transaction::new_external_withdrawal(
             reference, status, payments,
         )?)
         .await
@@ -319,7 +453,7 @@ where
             limit: 1,
             ..Default::default()
         };
-        self.persist(
+        self.store(
             self.config
                 .storage
                 .find(filter)
@@ -344,14 +478,15 @@ where
             limit: 1,
             ..Default::default()
         };
-        Ok(self
-            .config
-            .storage
-            .find(filter)
-            .await?
-            .pop()
-            .ok_or(Error::TxNotFound)?
-            .change_status(&self.config, new_status, reason)
-            .await?)
+        self.store(
+            self.config
+                .storage
+                .find(filter)
+                .await?
+                .pop()
+                .ok_or(Error::TxNotFound)?
+                .change_status(&self.config, new_status, reason)?,
+        )
+        .await
     }
 }

+ 18 - 13
utxo/src/storage/mod.rs

@@ -344,7 +344,7 @@ pub mod test {
     macro_rules! storage_test_suite {
         () => {
             $crate::storage_unit_test!(transaction);
-            $crate::storage_unit_test!(transaction_does_not_update_stale_transactions);
+            $crate::storage_unit_test!(transaction_does_not_update_stale_revision);
             $crate::storage_unit_test!(transaction_not_available_until_commit);
             $crate::storage_unit_test!(payments_always_include_negative_amounts);
             $crate::storage_unit_test!(does_not_update_spent_payments);
@@ -357,7 +357,7 @@ pub mod test {
         };
     }
 
-    pub async fn transaction_does_not_update_stale_transactions<T>(storage: T)
+    pub async fn transaction_does_not_update_stale_revision<T>(storage: T)
     where
         T: Storage + Send + Sync,
     {
@@ -366,8 +366,10 @@ pub mod test {
             status: StatusManager::default(),
         };
 
+        let ledger = Ledger::new(config);
+
         let asset: Asset = "USD/2".parse().expect("valid asset");
-        let mut pending = Transaction::new_external_deposit(
+        let deposit = Transaction::new_external_deposit(
             "test reference".to_owned(),
             "pending".into(),
             vec![],
@@ -377,20 +379,23 @@ pub mod test {
             )],
         )
         .expect("valid tx");
-        pending.persist(&config).await.expect("valid insert");
 
-        pending
-            .clone()
-            .change_status(&config, "processing".into(), "some text".to_owned())
+        let deposit = ledger.store(deposit).await.expect("valid insert");
+
+        ledger
+            .change_status(
+                deposit.revision_id.clone(),
+                "processing".into(),
+                "some text".to_owned(),
+            )
             .await
-            .expect("valid update");
+            .expect("valid updated");
 
-        pending
+        ledger
             .change_status(
-                &config,
-                "failed".into(),
-                "update from pending to failed (which is not the latest state and should fail)"
-                    .to_owned(),
+                deposit.revision_id.clone(),
+                "processing".into(),
+                "some text".to_owned(),
             )
             .await
             .expect_err("stale updates are rejected by storage");

+ 8 - 116
utxo/src/transaction/mod.rs

@@ -1,9 +1,6 @@
 use crate::{
-    config::Config,
-    payment::PaymentTo,
-    status::InternalStatus,
-    storage::{Batch, ReceivedPaymentStatus, Storage},
-    AccountId, Amount, MaxLengthString, PaymentFrom, RevId, Status, TxId,
+    config::Config, payment::PaymentTo, storage::Storage, AccountId, Amount, MaxLengthString,
+    PaymentFrom, RevId, Status, TxId,
 };
 use chrono::{DateTime, TimeZone, Utc};
 use serde::{Deserialize, Serialize};
@@ -200,11 +197,11 @@ impl Transaction {
     ///
     /// If the status transaction is not allowed, it will return an error.
     ///
-    /// The new transaction with revision is returned, which is already persisted. The previous
-    /// struct is consumed and the latest revision is preserved for historical purposes but it is no
-    /// longer the latest revision
+    /// The new transaction with revision is returned, which should be persisted. When it is
+    /// persisted, the previous struct is consumed and the latest revision is preserved for
+    /// historical purposes but it is no longer the latest revision
     #[inline]
-    pub async fn change_status<S>(
+    pub fn change_status<S>(
         self,
         config: &Config<S>,
         new_status: Status,
@@ -234,11 +231,11 @@ impl Transaction {
             revision: new_revision,
         };
         new_transaction.revisions.push(revision_id);
-        new_transaction.persist(config).await?;
         Ok(new_transaction)
     }
 
-    fn validate(&self) -> Result<(), Error> {
+    /// Validates the transaction and its revisions
+    pub fn validate(&self) -> Result<(), Error> {
         let rev_id = self.revision.rev_id()?;
         let tx_id = self.transaction.id()?;
         if self.revision_id != rev_id {
@@ -253,109 +250,4 @@ impl Transaction {
         self.transaction.validate()?;
         Ok(())
     }
-
-    #[inline]
-    async fn store_transaction<'a, S>(&mut self, batch: &mut S::Batch<'a>) -> Result<(), Error>
-    where
-        S: Storage + Sync + Send,
-    {
-        self.validate()?;
-        let spends = self.spends.iter().map(|x| x.id.clone()).collect::<Vec<_>>();
-        batch
-            .spend_payments(
-                &self.revision.transaction_id,
-                spends,
-                ReceivedPaymentStatus::Locked,
-            )
-            .await?;
-        batch
-            .create_payments(
-                &self.revision.transaction_id,
-                &self.creates,
-                ReceivedPaymentStatus::Locked,
-            )
-            .await?;
-
-        for account in self.accounts() {
-            batch
-                .relate_account_to_transaction(&self.revision.transaction_id, &account, self.typ)
-                .await?;
-        }
-        batch
-            .store_base_transaction(&self.revision.transaction_id, &self.transaction)
-            .await?;
-        Ok(())
-    }
-
-    /// Persists the changes done to this transaction object.
-    /// This method is not idempotent, and it will fail if the transaction if the requested update
-    /// is not allowed.
-    pub async fn persist<'a, S>(&mut self, config: &'a Config<S>) -> Result<(), Error>
-    where
-        S: Storage + Sync + Send,
-    {
-        self.validate()?;
-        let mut batch = config.storage.begin().await?;
-        if self.revision.previous.is_none() {
-            self.store_transaction::<S>(&mut batch).await?;
-        }
-
-        let (created_updated, spent_updated) =
-            match config.status.internal_type(&self.revision.status) {
-                InternalStatus::Reverted => {
-                    batch
-                        .update_transaction_payments(
-                            &self.id,
-                            ReceivedPaymentStatus::Failed,
-                            ReceivedPaymentStatus::Spendable,
-                        )
-                        .await?
-                }
-                InternalStatus::Spendable => {
-                    batch
-                        .update_transaction_payments(
-                            &self.id,
-                            ReceivedPaymentStatus::Spendable,
-                            ReceivedPaymentStatus::Spent,
-                        )
-                        .await?
-                }
-                _ => (self.creates.len(), self.spends.len()),
-            };
-
-        if self.creates.len() != created_updated || self.spends.len() != spent_updated {
-            return Err(Error::NoUpdate);
-        }
-
-        if config.status.is_spendable(&self.revision.status) {
-            batch
-                .update_transaction_payments(
-                    &self.id,
-                    ReceivedPaymentStatus::Spendable,
-                    ReceivedPaymentStatus::Spent,
-                )
-                .await?;
-        }
-
-        batch
-            .store_revision(&self.revision_id, &self.revision)
-            .await?;
-
-        batch
-            .tag_transaction(&self.id, &self.transaction, &self.revision.tags)
-            .await?;
-
-        batch
-            .update_transaction_revision(
-                &self.id,
-                &self.revision_id,
-                self.revision.previous.as_ref(),
-            )
-            .await
-            .expect("foo3");
-
-        batch.commit().await?;
-
-        Ok(())
-    }
 }