Procházet zdrojové kódy

Implement KVStore traits for wallet database

Add KVStore support to cdk-redb wallet implementation, completing the
KVStore unification across all database backends. This implementation
uses redb's native tuple keys for efficient namespace-based storage.

Changes:
- Add KV_STORE_TABLE definition with composite key (primary_namespace,
  secondary_namespace, key)
- Implement KVStoreTransaction for RedbWalletTransaction with support
  for transactional read, write, remove, and list operations
- Implement KVStoreDatabase for WalletRedbDatabase with non-transactional
  read and list operations
- Implement KVStore trait to provide transaction creation
- Initialize KV_STORE_TABLE during database creation

The implementation follows the same validation and namespace rules as
the SQL-based implementations, ensuring consistency across all database
backends.
Cesar Rodas před 1 měsícem
rodič
revize
a784782919
1 změnil soubory, kde provedl 186 přidání a 7 odebrání
  1. 186 7
      crates/cdk-redb/src/wallet/mod.rs

+ 186 - 7
crates/cdk-redb/src/wallet/mod.rs

@@ -8,7 +8,10 @@ use std::sync::Arc;
 
 use async_trait::async_trait;
 use cdk_common::common::ProofInfo;
-use cdk_common::database::WalletDatabase;
+use cdk_common::database::{
+    validate_kvstore_params, validate_kvstore_string, DbTransactionFinalizer, KVStore,
+    KVStoreDatabase, KVStoreTransaction, WalletDatabase, WalletDatabaseTransaction,
+};
 use cdk_common::mint_url::MintUrl;
 use cdk_common::util::unix_time;
 use cdk_common::wallet::{self, MintQuote, Transaction, TransactionDirection, TransactionId};
@@ -45,6 +48,8 @@ const KEYSET_COUNTER: TableDefinition<&str, u32> = TableDefinition::new("keyset_
 const TRANSACTIONS_TABLE: TableDefinition<&[u8], &str> = TableDefinition::new("transactions");
 
 const KEYSET_U32_MAPPING: TableDefinition<u32, &str> = TableDefinition::new("keyset_u32_mapping");
+// <(primary_namespace, secondary_namespace, key), value>
+const KV_STORE_TABLE: TableDefinition<(&str, &str, &str), &[u8]> = TableDefinition::new("kv_store");
 
 const DATABASE_VERSION: u32 = 4;
 
@@ -180,6 +185,7 @@ impl WalletRedbDatabase {
                         let _ = write_txn.open_table(KEYSET_COUNTER)?;
                         let _ = write_txn.open_table(TRANSACTIONS_TABLE)?;
                         let _ = write_txn.open_table(KEYSET_U32_MAPPING)?;
+                        let _ = write_txn.open_table(KV_STORE_TABLE)?;
                         table.insert("db_version", DATABASE_VERSION.to_string().as_str())?;
                     }
 
@@ -510,17 +516,88 @@ impl WalletDatabase for WalletRedbDatabase {
 
     async fn begin_db_transaction(
         &self,
-    ) -> Result<
-        Box<dyn cdk_common::database::WalletDatabaseTransaction<Self::Err> + Send + Sync>,
-        Self::Err,
-    > {
+    ) -> Result<Box<dyn WalletDatabaseTransaction<Self::Err> + Send + Sync>, Self::Err> {
+        let write_txn = self.db.begin_write().map_err(Error::from)?;
+        Ok(Box::new(RedbWalletTransaction::new(write_txn)))
+    }
+}
+
+#[async_trait]
+impl KVStoreDatabase for WalletRedbDatabase {
+    type Err = database::Error;
+
+    #[instrument(skip_all)]
+    async fn kv_read(
+        &self,
+        primary_namespace: &str,
+        secondary_namespace: &str,
+        key: &str,
+    ) -> Result<Option<Vec<u8>>, Self::Err> {
+        // Validate parameters according to KV store requirements
+        validate_kvstore_params(primary_namespace, secondary_namespace, key)?;
+
+        let read_txn = self.db.begin_read().map_err(Error::from)?;
+        let table = read_txn.open_table(KV_STORE_TABLE).map_err(Error::from)?;
+
+        let result = table
+            .get((primary_namespace, secondary_namespace, key))
+            .map_err(Error::from)?
+            .map(|v| v.value().to_vec());
+
+        Ok(result)
+    }
+
+    #[instrument(skip_all)]
+    async fn kv_list(
+        &self,
+        primary_namespace: &str,
+        secondary_namespace: &str,
+    ) -> Result<Vec<String>, Self::Err> {
+        // Validate namespace parameters according to KV store requirements
+        validate_kvstore_string(primary_namespace)?;
+        validate_kvstore_string(secondary_namespace)?;
+
+        // Check empty namespace rules
+        if primary_namespace.is_empty() && !secondary_namespace.is_empty() {
+            return Err(database::Error::KVStoreInvalidKey(
+                "If primary_namespace is empty, secondary_namespace must also be empty".to_string(),
+            ));
+        }
+
+        let read_txn = self.db.begin_read().map_err(Error::from)?;
+        let table = read_txn.open_table(KV_STORE_TABLE).map_err(Error::from)?;
+
+        let mut keys = Vec::new();
+
+        // Use range iterator for efficient lookup by namespace prefix
+        // Range from (primary, secondary, "") to (primary, secondary, "\u{10FFFF}") to get all keys in this namespace
+        let start = (primary_namespace, secondary_namespace, "");
+        let end = (primary_namespace, secondary_namespace, "\u{10FFFF}");
+
+        for result in table.range(start..=end).map_err(Error::from)? {
+            let (key_tuple, _) = result.map_err(Error::from)?;
+            let (_primary, _secondary, k) = key_tuple.value();
+            keys.push(k.to_string());
+        }
+
+        // Keys are already sorted by the B-tree structure
+        Ok(keys)
+    }
+}
+
+#[async_trait]
+impl KVStore for WalletRedbDatabase {
+    async fn begin_transaction<'a>(
+        &'a self,
+    ) -> Result<Box<dyn KVStoreTransaction<'a, Self::Err> + Send + Sync + 'a>, database::Error>
+    {
         let write_txn = self.db.begin_write().map_err(Error::from)?;
         Ok(Box::new(RedbWalletTransaction::new(write_txn)))
     }
 }
 
 #[async_trait]
-impl cdk_common::database::WalletDatabaseTransaction<database::Error> for RedbWalletTransaction {
+impl WalletDatabaseTransaction<database::Error> for RedbWalletTransaction {
     #[instrument(skip(self), fields(keyset_id = %keyset_id))]
     async fn get_keyset_by_id(
         &mut self,
@@ -1012,7 +1089,109 @@ impl cdk_common::database::WalletDatabaseTransaction<database::Error> for RedbWa
 }
 
 #[async_trait]
-impl cdk_common::database::DbTransactionFinalizer for RedbWalletTransaction {
+impl KVStoreTransaction<'_, database::Error> for RedbWalletTransaction {
+    #[instrument(skip_all)]
+    async fn kv_read(
+        &mut self,
+        primary_namespace: &str,
+        secondary_namespace: &str,
+        key: &str,
+    ) -> Result<Option<Vec<u8>>, database::Error> {
+        // Validate parameters according to KV store requirements
+        validate_kvstore_params(primary_namespace, secondary_namespace, key)?;
+
+        let txn = self.txn()?;
+        let table = txn.open_table(KV_STORE_TABLE).map_err(Error::from)?;
+
+        let result = table
+            .get((primary_namespace, secondary_namespace, key))
+            .map_err(Error::from)?
+            .map(|v| v.value().to_vec());
+
+        Ok(result)
+    }
+
+    #[instrument(skip_all)]
+    async fn kv_write(
+        &mut self,
+        primary_namespace: &str,
+        secondary_namespace: &str,
+        key: &str,
+        value: &[u8],
+    ) -> Result<(), database::Error> {
+        // Validate parameters according to KV store requirements
+        validate_kvstore_params(primary_namespace, secondary_namespace, key)?;
+
+        let txn = self.txn()?;
+        let mut table = txn.open_table(KV_STORE_TABLE).map_err(Error::from)?;
+
+        table
+            .insert((primary_namespace, secondary_namespace, key), value)
+            .map_err(Error::from)?;
+
+        Ok(())
+    }
+
+    #[instrument(skip_all)]
+    async fn kv_remove(
+        &mut self,
+        primary_namespace: &str,
+        secondary_namespace: &str,
+        key: &str,
+    ) -> Result<(), database::Error> {
+        // Validate parameters according to KV store requirements
+        validate_kvstore_params(primary_namespace, secondary_namespace, key)?;
+
+        let txn = self.txn()?;
+        let mut table = txn.open_table(KV_STORE_TABLE).map_err(Error::from)?;
+
+        table
+            .remove((primary_namespace, secondary_namespace, key))
+            .map_err(Error::from)?;
+
+        Ok(())
+    }
+
+    #[instrument(skip_all)]
+    async fn kv_list(
+        &mut self,
+        primary_namespace: &str,
+        secondary_namespace: &str,
+    ) -> Result<Vec<String>, database::Error> {
+        // Validate namespace parameters according to KV store requirements
+        validate_kvstore_string(primary_namespace)?;
+        validate_kvstore_string(secondary_namespace)?;
+
+        // Check empty namespace rules
+        if primary_namespace.is_empty() && !secondary_namespace.is_empty() {
+            return Err(database::Error::KVStoreInvalidKey(
+                "If primary_namespace is empty, secondary_namespace must also be empty".to_string(),
+            ));
+        }
+
+        let txn = self.txn()?;
+        let table = txn.open_table(KV_STORE_TABLE).map_err(Error::from)?;
+
+        let mut keys = Vec::new();
+
+        // Use range iterator for efficient lookup by namespace prefix
+        // Range from (primary, secondary, "") to (primary, secondary, "\u{10FFFF}") to get all keys in this namespace
+        let start = (primary_namespace, secondary_namespace, "");
+        let end = (primary_namespace, secondary_namespace, "\u{10FFFF}");
+
+        for result in table.range(start..=end).map_err(Error::from)? {
+            let (key_tuple, _) = result.map_err(Error::from)?;
+            let (_primary, _secondary, k) = key_tuple.value();
+            keys.push(k.to_string());
+        }
+
+        // Keys are already sorted by the B-tree structure
+        Ok(keys)
+    }
+}
+
+#[async_trait]
+impl DbTransactionFinalizer for RedbWalletTransaction {
     type Err = database::Error;
 
     async fn commit(mut self: Box<Self>) -> Result<(), database::Error> {