|
@@ -0,0 +1,234 @@
|
|
|
+use crate::{
|
|
|
+ amount::AmountCents,
|
|
|
+ asset::AssetId,
|
|
|
+ changelog::Changelog,
|
|
|
+ storage::{Batch, Error, Storage},
|
|
|
+ transaction::from_db::Transaction,
|
|
|
+ AccountId, Amount, Payment, PaymentId, TransactionId, Type,
|
|
|
+};
|
|
|
+use serde::{de::DeserializeOwned, Serialize};
|
|
|
+use std::{collections::HashMap, marker::PhantomData, sync::Arc};
|
|
|
+use tokio::sync::RwLock;
|
|
|
+
|
|
|
+pub struct Cache<'a, S>
|
|
|
+where
|
|
|
+ S: Storage<'a> + Sync + Send,
|
|
|
+{
|
|
|
+ payments: Arc<RwLock<HashMap<PaymentId, Payment>>>,
|
|
|
+ balances: Arc<RwLock<HashMap<AccountId, Vec<Amount>>>>,
|
|
|
+ transactions: Arc<RwLock<HashMap<TransactionId, Transaction>>>,
|
|
|
+ inner: S,
|
|
|
+ _phantom: PhantomData<&'a ()>,
|
|
|
+}
|
|
|
+
|
|
|
+impl<'a, S> Cache<'a, S>
|
|
|
+where
|
|
|
+ S: Storage<'a> + Sync + Send,
|
|
|
+{
|
|
|
+ pub fn new(storage: S) -> Self {
|
|
|
+ Self {
|
|
|
+ payments: Arc::new(RwLock::new(HashMap::new())),
|
|
|
+ balances: Arc::new(RwLock::new(HashMap::new())),
|
|
|
+ transactions: Arc::new(RwLock::new(HashMap::new())),
|
|
|
+ inner: storage,
|
|
|
+ _phantom: PhantomData,
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+#[async_trait::async_trait]
|
|
|
+impl<'a, S> Storage<'a> for Cache<'a, S>
|
|
|
+where
|
|
|
+ S: Storage<'a> + Sync + Send,
|
|
|
+{
|
|
|
+ type Batch = S::Batch;
|
|
|
+
|
|
|
+ async fn begin(&'a self) -> Result<Self::Batch, Error> {
|
|
|
+ self.inner.begin().await
|
|
|
+ }
|
|
|
+
|
|
|
+ async fn get_payment(&self, id: &PaymentId) -> Result<Payment, Error> {
|
|
|
+ let payments = self.payments.read().await;
|
|
|
+ if let Some(payment) = payments.get(id).cloned() {
|
|
|
+ Ok(payment)
|
|
|
+ } else {
|
|
|
+ drop(payments);
|
|
|
+ let result = self.inner.get_payment(id).await?;
|
|
|
+ self.payments
|
|
|
+ .write()
|
|
|
+ .await
|
|
|
+ .insert(id.clone(), result.clone());
|
|
|
+ Ok(result)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ async fn get_balance(&self, account_id: &AccountId) -> Result<Vec<Amount>, Error> {
|
|
|
+ let cache = self.balances.read().await;
|
|
|
+ if let Some(balances) = cache.get(account_id).cloned() {
|
|
|
+ Ok(balances)
|
|
|
+ } else {
|
|
|
+ drop(cache);
|
|
|
+ let result = self.inner.get_balance(account_id).await?;
|
|
|
+ self.balances
|
|
|
+ .write()
|
|
|
+ .await
|
|
|
+ .insert(account_id.clone(), result.clone());
|
|
|
+ Ok(result)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ async fn get_transaction(&self, id: &TransactionId) -> Result<Transaction, Error> {
|
|
|
+ let transactions = self.transactions.read().await;
|
|
|
+ if let Some(transaction) = transactions.get(id).cloned() {
|
|
|
+ Ok(transaction)
|
|
|
+ } else {
|
|
|
+ drop(transactions);
|
|
|
+ let result = self.inner.get_transaction(id).await?;
|
|
|
+ self.transactions
|
|
|
+ .write()
|
|
|
+ .await
|
|
|
+ .insert(id.clone(), result.clone());
|
|
|
+ Ok(result)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ async fn get_changelogs<T: DeserializeOwned + Serialize + Send + Sync>(
|
|
|
+ &self,
|
|
|
+ object_id: Vec<u8>,
|
|
|
+ ) -> Result<Vec<Changelog<T>>, Error> {
|
|
|
+ self.inner.get_changelogs(object_id).await
|
|
|
+ }
|
|
|
+
|
|
|
+ async fn get_unspent_payments(
|
|
|
+ &self,
|
|
|
+ account: &AccountId,
|
|
|
+ asset: AssetId,
|
|
|
+ target_amount: AmountCents,
|
|
|
+ ) -> Result<Vec<Payment>, Error> {
|
|
|
+ self.inner
|
|
|
+ .get_unspent_payments(account, asset, target_amount)
|
|
|
+ .await
|
|
|
+ }
|
|
|
+
|
|
|
+ async fn get_transactions(
|
|
|
+ &self,
|
|
|
+ account: &AccountId,
|
|
|
+ types: &[Type],
|
|
|
+ tags: &[String],
|
|
|
+ ) -> Result<Vec<Transaction>, Error> {
|
|
|
+ self.inner.get_transactions(account, types, tags).await
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+#[cfg(test)]
|
|
|
+mod test {
|
|
|
+ use super::*;
|
|
|
+ use crate::{
|
|
|
+ storage_test_suite, tests::deposit, AssetDefinition, AssetManager, Ledger, SQLite, Status,
|
|
|
+ };
|
|
|
+ use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions};
|
|
|
+ use std::{fs::remove_file, path::Path};
|
|
|
+
|
|
|
+ async fn get_instance(assets: AssetManager, test: &str) -> Cache<'static, SQLite<'static>> {
|
|
|
+ let path = format!("/tmp/cache-{}.db", test);
|
|
|
+ let _ = remove_file(&path);
|
|
|
+ let settings = path
|
|
|
+ .parse::<SqliteConnectOptions>()
|
|
|
+ .expect("valid settings")
|
|
|
+ .create_if_missing(true);
|
|
|
+
|
|
|
+ let pool = SqlitePoolOptions::new()
|
|
|
+ .connect_with(settings)
|
|
|
+ .await
|
|
|
+ .expect("pool");
|
|
|
+
|
|
|
+ let db = SQLite::new(pool, assets);
|
|
|
+ db.setup().await.expect("setup");
|
|
|
+ Cache::new(db)
|
|
|
+ }
|
|
|
+
|
|
|
+ storage_test_suite!();
|
|
|
+
|
|
|
+ pub async fn get_ledger_and_asset_manager() -> (
|
|
|
+ AssetManager,
|
|
|
+ Ledger<'static, Cache<'static, SQLite<'static>>>,
|
|
|
+ ) {
|
|
|
+ let pool = SqlitePoolOptions::new()
|
|
|
+ .max_connections(1)
|
|
|
+ .idle_timeout(None)
|
|
|
+ .max_lifetime(None)
|
|
|
+ .connect(":memory:")
|
|
|
+ .await
|
|
|
+ .expect("pool");
|
|
|
+
|
|
|
+ let assets = AssetManager::new(vec![
|
|
|
+ AssetDefinition::new(1, "BTC", 8),
|
|
|
+ AssetDefinition::new(2, "USD", 4),
|
|
|
+ ]);
|
|
|
+
|
|
|
+ let db = SQLite::new(pool, assets.clone());
|
|
|
+ db.setup().await.expect("setup");
|
|
|
+ (assets.clone(), Ledger::new(Cache::new(db), assets))
|
|
|
+ }
|
|
|
+
|
|
|
+ #[tokio::test]
|
|
|
+ async fn balances_updates() {
|
|
|
+ let source = "account1".parse::<AccountId>().expect("account");
|
|
|
+ let dest = "account2".parse::<AccountId>().expect("account");
|
|
|
+ let fee = "fee".parse::<AccountId>().expect("account");
|
|
|
+ let (assets, ledger) = get_ledger_and_asset_manager().await;
|
|
|
+
|
|
|
+ deposit(
|
|
|
+ &ledger,
|
|
|
+ &source,
|
|
|
+ assets.amount_by_and_cents(2, 1000).expect("amount"),
|
|
|
+ )
|
|
|
+ .await;
|
|
|
+ deposit(
|
|
|
+ &ledger,
|
|
|
+ &source,
|
|
|
+ assets.amount_by_and_cents(2, 2000).expect("amount"),
|
|
|
+ )
|
|
|
+ .await;
|
|
|
+
|
|
|
+ assert_eq!(
|
|
|
+ vec![assets.amount_by_and_cents(2, 3000).expect("amount")],
|
|
|
+ ledger.get_balance(&source).await.expect("balance")
|
|
|
+ );
|
|
|
+
|
|
|
+ ledger
|
|
|
+ .new_transaction(
|
|
|
+ "Exchange one".to_owned(),
|
|
|
+ Status::Settled,
|
|
|
+ vec![(
|
|
|
+ source.clone(),
|
|
|
+ assets.amount_by_and_cents(2, 1300).expect("amount"),
|
|
|
+ )],
|
|
|
+ vec![
|
|
|
+ (
|
|
|
+ dest.clone(),
|
|
|
+ assets.amount_by_and_cents(2, 1250).expect("amount"),
|
|
|
+ ),
|
|
|
+ (
|
|
|
+ fee.clone(),
|
|
|
+ assets.amount_by_and_cents(2, 50).expect("amount"),
|
|
|
+ ),
|
|
|
+ ],
|
|
|
+ )
|
|
|
+ .await
|
|
|
+ .expect("valid tx");
|
|
|
+
|
|
|
+ assert_eq!(
|
|
|
+ vec![assets.amount_by_and_cents(2, 1700).expect("amount")],
|
|
|
+ ledger.get_balance(&source).await.expect("balance")
|
|
|
+ );
|
|
|
+ assert_eq!(
|
|
|
+ vec![assets.amount_by_and_cents(2, 1250).expect("amount")],
|
|
|
+ ledger.get_balance(&dest).await.expect("balance")
|
|
|
+ );
|
|
|
+ assert_eq!(
|
|
|
+ vec![assets.amount_by_and_cents(2, 50).expect("amount")],
|
|
|
+ ledger.get_balance(&fee).await.expect("balance")
|
|
|
+ );
|
|
|
+ }
|
|
|
+}
|