ソースを参照

Invalidate cache

Cesar Rodas 1 年間 前
コミット
712729696f
4 ファイル変更165 行追加7 行削除
  1. 1 1
      TODO.md
  2. 149 0
      utxo/src/cache/batch.rs
  3. 14 5
      utxo/src/cache/mod.rs
  4. 1 1
      utxo/src/storage.rs

+ 1 - 1
TODO.md

@@ -1,5 +1,5 @@
 - [x] Optimize `select_inputs_from_accounts` to return a single change operation instead of a vector
-- [ ] Write cache layer on top of the storage layer, specially if accounts are settled
+- [x] Write cache layer on top of the storage layer, specially if accounts are settled
 - [ ] Improve read performance with SQLite
 - [ ] Add a locking mechanism, to either a start a tx per account, or use the storage engine as a lock mechanism (to lock the utxos)
 - [ ] Add ability to query accounts in a point in time

+ 149 - 0
utxo/src/cache/batch.rs

@@ -0,0 +1,149 @@
+use super::CacheStorage;
+use crate::{
+    changelog::Changelog,
+    storage::{Batch, Error, Storage},
+    transaction::from_db,
+    AccountId, Amount, Payment, PaymentId, Status, Transaction, TransactionId,
+};
+use serde::{de::DeserializeOwned, Serialize};
+use std::{collections::HashMap, marker::PhantomData};
+
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+pub enum Ids {
+    Transaction(TransactionId),
+    Payment(PaymentId),
+    Account(AccountId),
+}
+
+pub struct CacheBatch<'a, S>
+where
+    S: Storage<'a>,
+{
+    inner: S::Batch,
+    payments: CacheStorage<PaymentId, Payment>,
+    balances: CacheStorage<AccountId, Vec<Amount>>,
+    transactions: CacheStorage<TransactionId, from_db::Transaction>,
+    to_invalidate: HashMap<Ids, ()>,
+    _phantom: PhantomData<&'a ()>,
+}
+
+impl<'a, S> CacheBatch<'a, S>
+where
+    S: Storage<'a> + Sync + Send,
+{
+    pub fn new(
+        batch: S::Batch,
+        payments: CacheStorage<PaymentId, Payment>,
+        balances: CacheStorage<AccountId, Vec<Amount>>,
+        transactions: CacheStorage<TransactionId, from_db::Transaction>,
+    ) -> Self {
+        Self {
+            inner: batch,
+            payments,
+            balances,
+            transactions,
+            to_invalidate: HashMap::new(),
+            _phantom: PhantomData,
+        }
+    }
+}
+
+#[async_trait::async_trait]
+impl<'a, S> Batch<'a> for CacheBatch<'a, S>
+where
+    S: Storage<'a> + Sync + Send,
+{
+    async fn commit(self) -> Result<(), Error> {
+        let mut payments = self.payments.write().await;
+        let mut balances = self.balances.write().await;
+        let mut transactions = self.transactions.write().await;
+
+        self.inner.commit().await?;
+
+        for (id, _) in self.to_invalidate {
+            match id {
+                Ids::Transaction(id) => {
+                    transactions.remove(&id);
+                }
+                Ids::Payment(id) => {
+                    payments.remove(&id);
+                }
+                Ids::Account(id) => {
+                    balances.remove(&id);
+                }
+            }
+        }
+
+        Ok(())
+    }
+
+    async fn rollback(self) -> Result<(), Error> {
+        self.inner.rollback().await
+    }
+
+    async fn store_changelogs<T: DeserializeOwned + Serialize + Send + Sync>(
+        &mut self,
+        changelog: &[Changelog<T>],
+    ) -> Result<(), Error> {
+        self.inner.store_changelogs(changelog).await
+    }
+
+    async fn update_payment(
+        &mut self,
+        payment_id: &PaymentId,
+        spent_by: &TransactionId,
+        spent_status: Status,
+    ) -> Result<(), Error> {
+        self.to_invalidate
+            .insert(Ids::Payment(payment_id.clone()), ());
+        self.to_invalidate
+            .insert(Ids::Transaction(spent_by.clone()), ());
+        self.inner
+            .update_payment(payment_id, spent_by, spent_status)
+            .await
+    }
+
+    async fn get_payment_status(
+        &mut self,
+        transaction_id: &TransactionId,
+    ) -> Result<Option<Status>, Error> {
+        self.to_invalidate
+            .insert(Ids::Transaction(transaction_id.clone()), ());
+        self.inner.get_payment_status(transaction_id).await
+    }
+
+    async fn store_new_payment(&mut self, payment: &Payment) -> Result<(), Error> {
+        self.to_invalidate
+            .insert(Ids::Payment(payment.id.clone()), ());
+        self.inner.store_new_payment(payment).await
+    }
+
+    async fn store_transaction(&mut self, transaction: &Transaction) -> Result<(), Error> {
+        self.to_invalidate
+            .insert(Ids::Transaction(transaction.id().clone()), ());
+        self.inner.store_transaction(transaction).await
+    }
+
+    async fn tag_transaction(
+        &mut self,
+        transaction: &Transaction,
+        tags: &[String],
+    ) -> Result<(), Error> {
+        self.to_invalidate
+            .insert(Ids::Transaction(transaction.id().clone()), ());
+        self.inner.tag_transaction(transaction, tags).await
+    }
+
+    async fn relate_account_to_transaction(
+        &mut self,
+        transaction: &Transaction,
+        account: &AccountId,
+    ) -> Result<(), Error> {
+        self.to_invalidate
+            .insert(Ids::Transaction(transaction.id().clone()), ());
+        self.to_invalidate.insert(Ids::Account(account.clone()), ());
+        self.inner
+            .relate_account_to_transaction(transaction, account)
+            .await
+    }
+}

+ 14 - 5
utxo/src/cache.rs → utxo/src/cache/mod.rs

@@ -10,13 +10,17 @@ use serde::{de::DeserializeOwned, Serialize};
 use std::{collections::HashMap, marker::PhantomData, sync::Arc};
 use tokio::sync::RwLock;
 
+mod batch;
+
+pub(crate) type CacheStorage<K, V> = Arc<RwLock<HashMap<K, V>>>;
+
 pub struct Cache<'a, S>
 where
     S: Storage<'a> + Sync + Send,
 {
-    payments: Arc<RwLock<HashMap<PaymentId, Payment>>>,
-    balances: Arc<RwLock<HashMap<AccountId, Vec<Amount>>>>,
-    transactions: Arc<RwLock<HashMap<TransactionId, Transaction>>>,
+    payments: CacheStorage<PaymentId, Payment>,
+    balances: CacheStorage<AccountId, Vec<Amount>>,
+    transactions: CacheStorage<TransactionId, Transaction>,
     inner: S,
     _phantom: PhantomData<&'a ()>,
 }
@@ -42,10 +46,15 @@ impl<'a, S> Storage<'a> for Cache<'a, S>
 where
     S: Storage<'a> + Sync + Send,
 {
-    type Batch = S::Batch;
+    type Batch = batch::CacheBatch<'a, S>;
 
     async fn begin(&'a self) -> Result<Self::Batch, Error> {
-        self.inner.begin().await
+        Ok(batch::CacheBatch::new(
+            self.inner.begin().await?,
+            self.payments.clone(),
+            self.balances.clone(),
+            self.transactions.clone(),
+        ))
     }
 
     async fn get_payment(&self, id: &PaymentId) -> Result<Payment, Error> {

+ 1 - 1
utxo/src/storage.rs

@@ -116,7 +116,7 @@ pub trait Storage<'a> {
     ///
     /// This batch is needed to perform the changes in the transactions and
     /// payments in a atomic way
-    type Batch: Batch<'a>;
+    type Batch: Batch<'a> + Send;
 
     /// Begins a transaction
     ///