mod.rs 36 KB


  1. //! Redb Wallet
  2. use std::cmp::Ordering;
  3. use std::collections::HashMap;
  4. use std::path::Path;
  5. use std::str::FromStr;
  6. use std::sync::Arc;
  7. use async_trait::async_trait;
  8. use cdk_common::common::ProofInfo;
  9. use cdk_common::database::{validate_kvstore_params, KVStoreDatabase, WalletDatabase};
  10. use cdk_common::mint_url::MintUrl;
  11. use cdk_common::nut00::KnownMethod;
  12. use cdk_common::util::unix_time;
  13. use cdk_common::wallet::{self, MintQuote, Transaction, TransactionDirection, TransactionId};
  14. use cdk_common::{
  15. database, Amount, CurrencyUnit, Id, KeySet, KeySetInfo, Keys, MintInfo, PaymentMethod,
  16. PublicKey, SpendingConditions, State,
  17. };
  18. use redb::{Database, MultimapTableDefinition, ReadableTable, TableDefinition};
  19. use tracing::instrument;
  20. use super::error::Error;
  21. use crate::migrations::migrate_00_to_01;
  22. use crate::wallet::migrations::{migrate_01_to_02, migrate_02_to_03, migrate_03_to_04};
  23. mod migrations;
  24. // <Mint_url, Info>
  25. const MINTS_TABLE: TableDefinition<&str, &str> = TableDefinition::new("mints_table");
  26. // <Mint_Url, Keyset_id>
  27. const MINT_KEYSETS_TABLE: MultimapTableDefinition<&str, &[u8]> =
  28. MultimapTableDefinition::new("mint_keysets");
  29. // <Keyset_id, KeysetInfo>
  30. const KEYSETS_TABLE: TableDefinition<&[u8], &str> = TableDefinition::new("keysets");
  31. // <Quote_id, quote>
  32. const MINT_QUOTES_TABLE: TableDefinition<&str, &str> = TableDefinition::new("mint_quotes");
  33. // <Quote_id, quote>
  34. const MELT_QUOTES_TABLE: TableDefinition<&str, &str> = TableDefinition::new("melt_quotes");
  35. const MINT_KEYS_TABLE: TableDefinition<&str, &str> = TableDefinition::new("mint_keys");
  36. // <Y, Proof Info>
  37. const PROOFS_TABLE: TableDefinition<&[u8], &str> = TableDefinition::new("proofs");
  38. const CONFIG_TABLE: TableDefinition<&str, &str> = TableDefinition::new("config");
  39. const KEYSET_COUNTER: TableDefinition<&str, u32> = TableDefinition::new("keyset_counter");
  40. // <Transaction_id, Transaction>
  41. const TRANSACTIONS_TABLE: TableDefinition<&[u8], &str> = TableDefinition::new("transactions");
  42. const KEYSET_U32_MAPPING: TableDefinition<u32, &str> = TableDefinition::new("keyset_u32_mapping");
  43. // <(primary_namespace, secondary_namespace, key), value>
  44. const KV_STORE_TABLE: TableDefinition<(&str, &str, &str), &[u8]> = TableDefinition::new("kv_store");
  45. const DATABASE_VERSION: u32 = 4;
  46. /// Wallet Redb Database
  47. #[derive(Debug, Clone)]
  48. pub struct WalletRedbDatabase {
  49. db: Arc<Database>,
  50. }
  51. impl WalletRedbDatabase {
  52. /// Create new [`WalletRedbDatabase`]
  53. pub fn new(path: &Path) -> Result<Self, Error> {
  54. {
  55. // Check if parent directory exists before attempting to create database
  56. if let Some(parent) = path.parent() {
  57. if !parent.exists() {
  58. return Err(Error::Io(std::io::Error::new(
  59. std::io::ErrorKind::NotFound,
  60. format!("Parent directory does not exist: {parent:?}"),
  61. )));
  62. }
  63. }
  64. let db = Arc::new(Database::create(path)?);
  65. let db_version: Option<String>;
  66. {
  67. // Check database version
  68. let read_txn = db.begin_read()?;
  69. let table = read_txn.open_table(CONFIG_TABLE);
  70. db_version = match table {
  71. Ok(table) => table.get("db_version")?.map(|v| v.value().to_string()),
  72. Err(_) => None,
  73. };
  74. }
  75. match db_version {
  76. Some(db_version) => {
  77. let mut current_file_version = u32::from_str(&db_version)?;
  78. tracing::info!("Current file version {}", current_file_version);
  79. match current_file_version.cmp(&DATABASE_VERSION) {
  80. Ordering::Less => {
  81. tracing::info!(
  82. "Database needs to be upgraded at {} current is {}",
  83. current_file_version,
  84. DATABASE_VERSION
  85. );
  86. if current_file_version == 0 {
  87. current_file_version = migrate_00_to_01(Arc::clone(&db))?;
  88. }
  89. if current_file_version == 1 {
  90. current_file_version = migrate_01_to_02(Arc::clone(&db))?;
  91. }
  92. if current_file_version == 2 {
  93. current_file_version = migrate_02_to_03(Arc::clone(&db))?;
  94. }
  95. if current_file_version == 3 {
  96. current_file_version = migrate_03_to_04(Arc::clone(&db))?;
  97. }
  98. if current_file_version != DATABASE_VERSION {
  99. tracing::warn!(
  100. "Database upgrade did not complete at {} current is {}",
  101. current_file_version,
  102. DATABASE_VERSION
  103. );
  104. return Err(Error::UnknownDatabaseVersion);
  105. }
  106. let write_txn = db.begin_write()?;
  107. {
  108. let mut table = write_txn.open_table(CONFIG_TABLE)?;
  109. table
  110. .insert("db_version", DATABASE_VERSION.to_string().as_str())?;
  111. }
  112. write_txn.commit()?;
  113. }
  114. Ordering::Equal => {
  115. tracing::info!("Database is at current version {}", DATABASE_VERSION);
  116. }
  117. Ordering::Greater => {
  118. tracing::warn!(
  119. "Database upgrade did not complete at {} current is {}",
  120. current_file_version,
  121. DATABASE_VERSION
  122. );
  123. return Err(Error::UnknownDatabaseVersion);
  124. }
  125. }
  126. }
  127. None => {
  128. let write_txn = db.begin_write()?;
  129. {
  130. let mut table = write_txn.open_table(CONFIG_TABLE)?;
  131. // Open all tables to init a new db
  132. let _ = write_txn.open_table(MINTS_TABLE)?;
  133. let _ = write_txn.open_multimap_table(MINT_KEYSETS_TABLE)?;
  134. let _ = write_txn.open_table(KEYSETS_TABLE)?;
  135. let _ = write_txn.open_table(MINT_QUOTES_TABLE)?;
  136. let _ = write_txn.open_table(MELT_QUOTES_TABLE)?;
  137. let _ = write_txn.open_table(MINT_KEYS_TABLE)?;
  138. let _ = write_txn.open_table(PROOFS_TABLE)?;
  139. let _ = write_txn.open_table(KEYSET_COUNTER)?;
  140. let _ = write_txn.open_table(TRANSACTIONS_TABLE)?;
  141. let _ = write_txn.open_table(KEYSET_U32_MAPPING)?;
  142. let _ = write_txn.open_table(KV_STORE_TABLE)?;
  143. table.insert("db_version", DATABASE_VERSION.to_string().as_str())?;
  144. }
  145. write_txn.commit()?;
  146. }
  147. }
  148. drop(db);
  149. }
  150. // Check parent directory again for final database creation
  151. if let Some(parent) = path.parent() {
  152. if !parent.exists() {
  153. return Err(Error::Io(std::io::Error::new(
  154. std::io::ErrorKind::NotFound,
  155. format!("Parent directory does not exist: {parent:?}"),
  156. )));
  157. }
  158. }
  159. let mut db = Database::create(path)?;
  160. db.upgrade()?;
  161. Ok(Self { db: Arc::new(db) })
  162. }
  163. }
  164. #[async_trait]
  165. impl WalletDatabase<database::Error> for WalletRedbDatabase {
  166. #[instrument(skip(self))]
  167. async fn get_mint(&self, mint_url: MintUrl) -> Result<Option<MintInfo>, database::Error> {
  168. let read_txn = self.db.begin_read().map_err(Into::<Error>::into)?;
  169. let table = read_txn.open_table(MINTS_TABLE).map_err(Error::from)?;
  170. if let Some(mint_info) = table
  171. .get(mint_url.to_string().as_str())
  172. .map_err(Error::from)?
  173. {
  174. return Ok(serde_json::from_str(mint_info.value()).map_err(Error::from)?);
  175. }
  176. Ok(None)
  177. }
  178. #[instrument(skip(self))]
  179. async fn get_mints(&self) -> Result<HashMap<MintUrl, Option<MintInfo>>, database::Error> {
  180. let read_txn = self.db.begin_read().map_err(Error::from)?;
  181. let table = read_txn.open_table(MINTS_TABLE).map_err(Error::from)?;
  182. let mints = table
  183. .iter()
  184. .map_err(Error::from)?
  185. .flatten()
  186. .filter_map(|(mint, mint_info)| {
  187. MintUrl::from_str(mint.value())
  188. .ok()
  189. .map(|url| (url, serde_json::from_str(mint_info.value()).ok()))
  190. })
  191. .collect();
  192. Ok(mints)
  193. }
  194. #[instrument(skip(self))]
  195. async fn get_mint_keysets(
  196. &self,
  197. mint_url: MintUrl,
  198. ) -> Result<Option<Vec<KeySetInfo>>, database::Error> {
  199. let read_txn = self.db.begin_read().map_err(Into::<Error>::into)?;
  200. let table = read_txn
  201. .open_multimap_table(MINT_KEYSETS_TABLE)
  202. .map_err(Error::from)?;
  203. let keyset_ids = table
  204. .get(mint_url.to_string().as_str())
  205. .map_err(Error::from)?
  206. .flatten()
  207. .map(|k| Id::from_bytes(k.value()))
  208. .collect::<Result<Vec<_>, _>>()?;
  209. let mut keysets = vec![];
  210. let keysets_t = read_txn.open_table(KEYSETS_TABLE).map_err(Error::from)?;
  211. for keyset_id in keyset_ids {
  212. if let Some(keyset) = keysets_t
  213. .get(keyset_id.to_bytes().as_slice())
  214. .map_err(Error::from)?
  215. {
  216. let keyset = serde_json::from_str(keyset.value()).map_err(Error::from)?;
  217. keysets.push(keyset);
  218. }
  219. }
  220. match keysets.is_empty() {
  221. true => Ok(None),
  222. false => Ok(Some(keysets)),
  223. }
  224. }
  225. #[instrument(skip(self), fields(keyset_id = %keyset_id))]
  226. async fn get_keyset_by_id(
  227. &self,
  228. keyset_id: &Id,
  229. ) -> Result<Option<KeySetInfo>, database::Error> {
  230. let read_txn = self.db.begin_read().map_err(Into::<Error>::into)?;
  231. let table = read_txn.open_table(KEYSETS_TABLE).map_err(Error::from)?;
  232. match table
  233. .get(keyset_id.to_bytes().as_slice())
  234. .map_err(Error::from)?
  235. {
  236. Some(keyset) => {
  237. let keyset: KeySetInfo =
  238. serde_json::from_str(keyset.value()).map_err(Error::from)?;
  239. Ok(Some(keyset))
  240. }
  241. None => Ok(None),
  242. }
  243. }
  244. #[instrument(skip_all)]
  245. async fn get_mint_quote(&self, quote_id: &str) -> Result<Option<MintQuote>, database::Error> {
  246. let read_txn = self.db.begin_read().map_err(Into::<Error>::into)?;
  247. let table = read_txn
  248. .open_table(MINT_QUOTES_TABLE)
  249. .map_err(Error::from)?;
  250. if let Some(mint_info) = table.get(quote_id).map_err(Error::from)? {
  251. return Ok(serde_json::from_str(mint_info.value()).map_err(Error::from)?);
  252. }
  253. Ok(None)
  254. }
  255. #[instrument(skip_all)]
  256. async fn get_mint_quotes(&self) -> Result<Vec<MintQuote>, database::Error> {
  257. let read_txn = self.db.begin_read().map_err(Into::<Error>::into)?;
  258. let table = read_txn
  259. .open_table(MINT_QUOTES_TABLE)
  260. .map_err(Error::from)?;
  261. Ok(table
  262. .iter()
  263. .map_err(Error::from)?
  264. .flatten()
  265. .flat_map(|(_id, quote)| serde_json::from_str(quote.value()))
  266. .collect())
  267. }
  268. async fn get_unissued_mint_quotes(&self) -> Result<Vec<MintQuote>, Self::Err> {
  269. let read_txn = self.db.begin_read().map_err(Into::<Error>::into)?;
  270. let table = read_txn
  271. .open_table(MINT_QUOTES_TABLE)
  272. .map_err(Error::from)?;
  273. Ok(table
  274. .iter()
  275. .map_err(Error::from)?
  276. .flatten()
  277. .flat_map(|(_id, quote)| serde_json::from_str::<MintQuote>(quote.value()).ok())
  278. .filter(|quote| {
  279. quote.amount_issued == Amount::ZERO
  280. || quote.payment_method == PaymentMethod::Known(KnownMethod::Bolt12)
  281. })
  282. .collect())
  283. }
  284. #[instrument(skip_all)]
  285. async fn get_melt_quote(
  286. &self,
  287. quote_id: &str,
  288. ) -> Result<Option<wallet::MeltQuote>, database::Error> {
  289. let read_txn = self.db.begin_read().map_err(Error::from)?;
  290. let table = read_txn
  291. .open_table(MELT_QUOTES_TABLE)
  292. .map_err(Error::from)?;
  293. if let Some(mint_info) = table.get(quote_id).map_err(Error::from)? {
  294. return Ok(serde_json::from_str(mint_info.value()).map_err(Error::from)?);
  295. }
  296. Ok(None)
  297. }
  298. #[instrument(skip_all)]
  299. async fn get_melt_quotes(&self) -> Result<Vec<wallet::MeltQuote>, database::Error> {
  300. let read_txn = self.db.begin_read().map_err(Error::from)?;
  301. let table = read_txn
  302. .open_table(MELT_QUOTES_TABLE)
  303. .map_err(Error::from)?;
  304. Ok(table
  305. .iter()
  306. .map_err(Error::from)?
  307. .flatten()
  308. .flat_map(|(_id, quote)| serde_json::from_str(quote.value()))
  309. .collect())
  310. }
  311. #[instrument(skip(self), fields(keyset_id = %keyset_id))]
  312. async fn get_keys(&self, keyset_id: &Id) -> Result<Option<Keys>, database::Error> {
  313. let read_txn = self.db.begin_read().map_err(Error::from)?;
  314. let table = read_txn.open_table(MINT_KEYS_TABLE).map_err(Error::from)?;
  315. if let Some(mint_info) = table
  316. .get(keyset_id.to_string().as_str())
  317. .map_err(Error::from)?
  318. {
  319. return Ok(serde_json::from_str(mint_info.value()).map_err(Error::from)?);
  320. }
  321. Ok(None)
  322. }
  323. #[instrument(skip_all)]
  324. async fn get_proofs(
  325. &self,
  326. mint_url: Option<MintUrl>,
  327. unit: Option<CurrencyUnit>,
  328. state: Option<Vec<State>>,
  329. spending_conditions: Option<Vec<SpendingConditions>>,
  330. ) -> Result<Vec<ProofInfo>, database::Error> {
  331. let read_txn = self.db.begin_read().map_err(Error::from)?;
  332. let table = read_txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
  333. let proofs: Vec<ProofInfo> = table
  334. .iter()
  335. .map_err(Error::from)?
  336. .flatten()
  337. .filter_map(|(_k, v)| {
  338. let mut proof = None;
  339. if let Ok(proof_info) = serde_json::from_str::<ProofInfo>(v.value()) {
  340. if proof_info.matches_conditions(&mint_url, &unit, &state, &spending_conditions)
  341. {
  342. proof = Some(proof_info)
  343. }
  344. }
  345. proof
  346. })
  347. .collect();
  348. Ok(proofs)
  349. }
  350. #[instrument(skip(self, ys))]
  351. async fn get_proofs_by_ys(
  352. &self,
  353. ys: Vec<PublicKey>,
  354. ) -> Result<Vec<ProofInfo>, database::Error> {
  355. if ys.is_empty() {
  356. return Ok(Vec::new());
  357. }
  358. let read_txn = self.db.begin_read().map_err(Error::from)?;
  359. let table = read_txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
  360. let mut proofs = Vec::new();
  361. for y in ys {
  362. if let Some(proof) = table.get(y.to_bytes().as_slice()).map_err(Error::from)? {
  363. let proof_info =
  364. serde_json::from_str::<ProofInfo>(proof.value()).map_err(Error::from)?;
  365. proofs.push(proof_info);
  366. }
  367. }
  368. Ok(proofs)
  369. }
  370. async fn get_balance(
  371. &self,
  372. mint_url: Option<MintUrl>,
  373. unit: Option<CurrencyUnit>,
  374. state: Option<Vec<State>>,
  375. ) -> Result<u64, database::Error> {
  376. // For redb, we still need to fetch all proofs and sum them
  377. // since redb doesn't have SQL aggregation
  378. let proofs = self.get_proofs(mint_url, unit, state, None).await?;
  379. Ok(proofs.iter().map(|p| u64::from(p.proof.amount)).sum())
  380. }
  381. #[instrument(skip(self))]
  382. async fn get_transaction(
  383. &self,
  384. transaction_id: TransactionId,
  385. ) -> Result<Option<Transaction>, database::Error> {
  386. let read_txn = self.db.begin_read().map_err(Error::from)?;
  387. let table = read_txn
  388. .open_table(TRANSACTIONS_TABLE)
  389. .map_err(Error::from)?;
  390. if let Some(transaction) = table.get(transaction_id.as_slice()).map_err(Error::from)? {
  391. return Ok(serde_json::from_str(transaction.value()).map_err(Error::from)?);
  392. }
  393. Ok(None)
  394. }
  395. #[instrument(skip(self))]
  396. async fn list_transactions(
  397. &self,
  398. mint_url: Option<MintUrl>,
  399. direction: Option<TransactionDirection>,
  400. unit: Option<CurrencyUnit>,
  401. ) -> Result<Vec<Transaction>, database::Error> {
  402. let read_txn = self.db.begin_read().map_err(Error::from)?;
  403. let table = read_txn
  404. .open_table(TRANSACTIONS_TABLE)
  405. .map_err(Error::from)?;
  406. let transactions: Vec<Transaction> = table
  407. .iter()
  408. .map_err(Error::from)?
  409. .flatten()
  410. .filter_map(|(_k, v)| {
  411. let mut transaction = None;
  412. if let Ok(tx) = serde_json::from_str::<Transaction>(v.value()) {
  413. if tx.matches_conditions(&mint_url, &direction, &unit) {
  414. transaction = Some(tx)
  415. }
  416. }
  417. transaction
  418. })
  419. .collect();
  420. Ok(transactions)
  421. }
  422. #[instrument(skip(self, added, removed_ys))]
  423. async fn update_proofs(
  424. &self,
  425. added: Vec<ProofInfo>,
  426. removed_ys: Vec<PublicKey>,
  427. ) -> Result<(), database::Error> {
  428. let write_txn = self.db.begin_write().map_err(Error::from)?;
  429. {
  430. let mut table = write_txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
  431. for proof_info in added.iter() {
  432. table
  433. .insert(
  434. proof_info.y.to_bytes().as_slice(),
  435. serde_json::to_string(&proof_info)
  436. .map_err(Error::from)?
  437. .as_str(),
  438. )
  439. .map_err(Error::from)?;
  440. }
  441. for y in removed_ys.iter() {
  442. table.remove(y.to_bytes().as_slice()).map_err(Error::from)?;
  443. }
  444. }
  445. write_txn.commit().map_err(Error::from)?;
  446. Ok(())
  447. }
  448. async fn update_proofs_state(
  449. &self,
  450. ys: Vec<PublicKey>,
  451. state: State,
  452. ) -> Result<(), database::Error> {
  453. let write_txn = self.db.begin_write().map_err(Error::from)?;
  454. {
  455. let mut table = write_txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
  456. for y in ys {
  457. let y_slice = y.to_bytes();
  458. let proof = table
  459. .get(y_slice.as_slice())
  460. .map_err(Error::from)?
  461. .ok_or(Error::UnknownY)?;
  462. let mut proof_info =
  463. serde_json::from_str::<ProofInfo>(proof.value()).map_err(Error::from)?;
  464. drop(proof);
  465. proof_info.state = state;
  466. table
  467. .insert(
  468. y_slice.as_slice(),
  469. serde_json::to_string(&proof_info)
  470. .map_err(Error::from)?
  471. .as_str(),
  472. )
  473. .map_err(Error::from)?;
  474. }
  475. }
  476. write_txn.commit().map_err(Error::from)?;
  477. Ok(())
  478. }
  479. #[instrument(skip(self))]
  480. async fn add_transaction(&self, transaction: Transaction) -> Result<(), database::Error> {
  481. let id = transaction.id();
  482. let write_txn = self.db.begin_write().map_err(Error::from)?;
  483. {
  484. let mut table = write_txn
  485. .open_table(TRANSACTIONS_TABLE)
  486. .map_err(Error::from)?;
  487. table
  488. .insert(
  489. id.as_slice(),
  490. serde_json::to_string(&transaction)
  491. .map_err(Error::from)?
  492. .as_str(),
  493. )
  494. .map_err(Error::from)?;
  495. }
  496. write_txn.commit().map_err(Error::from)?;
  497. Ok(())
  498. }
  499. #[instrument(skip(self))]
  500. async fn update_mint_url(
  501. &self,
  502. old_mint_url: MintUrl,
  503. new_mint_url: MintUrl,
  504. ) -> Result<(), database::Error> {
  505. let write_txn = self.db.begin_write().map_err(Error::from)?;
  506. // Update proofs table
  507. {
  508. let read_table = write_txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
  509. let proofs: Vec<ProofInfo> = read_table
  510. .iter()
  511. .map_err(Error::from)?
  512. .flatten()
  513. .filter_map(|(_k, v)| {
  514. let proof_info = serde_json::from_str::<ProofInfo>(v.value()).ok()?;
  515. if proof_info.mint_url == old_mint_url {
  516. Some(proof_info)
  517. } else {
  518. None
  519. }
  520. })
  521. .collect();
  522. drop(read_table);
  523. if !proofs.is_empty() {
  524. let mut write_table = write_txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
  525. for mut proof_info in proofs {
  526. proof_info.mint_url = new_mint_url.clone();
  527. write_table
  528. .insert(
  529. proof_info.y.to_bytes().as_slice(),
  530. serde_json::to_string(&proof_info)
  531. .map_err(Error::from)?
  532. .as_str(),
  533. )
  534. .map_err(Error::from)?;
  535. }
  536. }
  537. }
  538. // Update mint quotes
  539. {
  540. let mut table = write_txn
  541. .open_table(MINT_QUOTES_TABLE)
  542. .map_err(Error::from)?;
  543. let unix_time = unix_time();
  544. let quotes: Vec<MintQuote> = table
  545. .iter()
  546. .map_err(Error::from)?
  547. .flatten()
  548. .filter_map(|(_, quote)| {
  549. let mut q: MintQuote = serde_json::from_str(quote.value()).ok()?;
  550. if q.mint_url == old_mint_url && q.expiry >= unix_time {
  551. q.mint_url = new_mint_url.clone();
  552. Some(q)
  553. } else {
  554. None
  555. }
  556. })
  557. .collect();
  558. for quote in quotes {
  559. table
  560. .insert(
  561. quote.id.as_str(),
  562. serde_json::to_string(&quote).map_err(Error::from)?.as_str(),
  563. )
  564. .map_err(Error::from)?;
  565. }
  566. }
  567. write_txn.commit().map_err(Error::from)?;
  568. Ok(())
  569. }
  570. #[instrument(skip(self), fields(keyset_id = %keyset_id))]
  571. async fn increment_keyset_counter(
  572. &self,
  573. keyset_id: &Id,
  574. count: u32,
  575. ) -> Result<u32, database::Error> {
  576. let write_txn = self.db.begin_write().map_err(Error::from)?;
  577. let new_counter = {
  578. let mut table = write_txn.open_table(KEYSET_COUNTER).map_err(Error::from)?;
  579. let current_counter = table
  580. .get(keyset_id.to_string().as_str())
  581. .map_err(Error::from)?
  582. .map(|x| x.value())
  583. .unwrap_or_default();
  584. let new_counter = current_counter + count;
  585. table
  586. .insert(keyset_id.to_string().as_str(), new_counter)
  587. .map_err(Error::from)?;
  588. new_counter
  589. };
  590. write_txn.commit().map_err(Error::from)?;
  591. Ok(new_counter)
  592. }
  593. #[instrument(skip(self))]
  594. async fn add_mint(
  595. &self,
  596. mint_url: MintUrl,
  597. mint_info: Option<MintInfo>,
  598. ) -> Result<(), database::Error> {
  599. let write_txn = self.db.begin_write().map_err(Error::from)?;
  600. {
  601. let mut table = write_txn.open_table(MINTS_TABLE).map_err(Error::from)?;
  602. table
  603. .insert(
  604. mint_url.to_string().as_str(),
  605. serde_json::to_string(&mint_info)
  606. .map_err(Error::from)?
  607. .as_str(),
  608. )
  609. .map_err(Error::from)?;
  610. }
  611. write_txn.commit().map_err(Error::from)?;
  612. Ok(())
  613. }
  614. #[instrument(skip(self))]
  615. async fn remove_mint(&self, mint_url: MintUrl) -> Result<(), database::Error> {
  616. let write_txn = self.db.begin_write().map_err(Error::from)?;
  617. {
  618. let mut table = write_txn.open_table(MINTS_TABLE).map_err(Error::from)?;
  619. table
  620. .remove(mint_url.to_string().as_str())
  621. .map_err(Error::from)?;
  622. }
  623. write_txn.commit().map_err(Error::from)?;
  624. Ok(())
  625. }
  626. #[instrument(skip(self))]
  627. async fn add_mint_keysets(
  628. &self,
  629. mint_url: MintUrl,
  630. keysets: Vec<KeySetInfo>,
  631. ) -> Result<(), database::Error> {
  632. let write_txn = self.db.begin_write().map_err(Error::from)?;
  633. {
  634. let mut table = write_txn
  635. .open_multimap_table(MINT_KEYSETS_TABLE)
  636. .map_err(Error::from)?;
  637. let mut keysets_table = write_txn.open_table(KEYSETS_TABLE).map_err(Error::from)?;
  638. let mut u32_table = write_txn
  639. .open_table(KEYSET_U32_MAPPING)
  640. .map_err(Error::from)?;
  641. let mut existing_u32 = false;
  642. for keyset in keysets {
  643. // Check if keyset already exists
  644. let existing_keyset = {
  645. let existing_keyset = keysets_table
  646. .get(keyset.id.to_bytes().as_slice())
  647. .map_err(Error::from)?;
  648. existing_keyset.map(|r| r.value().to_string())
  649. };
  650. let existing = u32_table
  651. .insert(u32::from(keyset.id), keyset.id.to_string().as_str())
  652. .map_err(Error::from)?;
  653. match existing {
  654. None => existing_u32 = false,
  655. Some(id) => {
  656. let id = Id::from_str(id.value())?;
  657. if id == keyset.id {
  658. existing_u32 = false;
  659. } else {
  660. existing_u32 = true;
  661. break;
  662. }
  663. }
  664. }
  665. let keyset = if let Some(existing_keyset) = existing_keyset {
  666. let mut existing_keyset: KeySetInfo = serde_json::from_str(&existing_keyset)?;
  667. existing_keyset.active = keyset.active;
  668. existing_keyset.input_fee_ppk = keyset.input_fee_ppk;
  669. existing_keyset
  670. } else {
  671. table
  672. .insert(
  673. mint_url.to_string().as_str(),
  674. keyset.id.to_bytes().as_slice(),
  675. )
  676. .map_err(Error::from)?;
  677. keyset
  678. };
  679. keysets_table
  680. .insert(
  681. keyset.id.to_bytes().as_slice(),
  682. serde_json::to_string(&keyset)
  683. .map_err(Error::from)?
  684. .as_str(),
  685. )
  686. .map_err(Error::from)?;
  687. }
  688. if existing_u32 {
  689. tracing::warn!("Keyset already exists for keyset id");
  690. return Err(database::Error::Duplicate);
  691. }
  692. }
  693. write_txn.commit().map_err(Error::from)?;
  694. Ok(())
  695. }
  696. #[instrument(skip_all)]
  697. async fn add_mint_quote(&self, quote: MintQuote) -> Result<(), database::Error> {
  698. let write_txn = self.db.begin_write().map_err(Error::from)?;
  699. {
  700. let mut table = write_txn
  701. .open_table(MINT_QUOTES_TABLE)
  702. .map_err(Error::from)?;
  703. table
  704. .insert(
  705. quote.id.as_str(),
  706. serde_json::to_string(&quote).map_err(Error::from)?.as_str(),
  707. )
  708. .map_err(Error::from)?;
  709. }
  710. write_txn.commit().map_err(Error::from)?;
  711. Ok(())
  712. }
  713. #[instrument(skip_all)]
  714. async fn remove_mint_quote(&self, quote_id: &str) -> Result<(), database::Error> {
  715. let write_txn = self.db.begin_write().map_err(Error::from)?;
  716. {
  717. let mut table = write_txn
  718. .open_table(MINT_QUOTES_TABLE)
  719. .map_err(Error::from)?;
  720. table.remove(quote_id).map_err(Error::from)?;
  721. }
  722. write_txn.commit().map_err(Error::from)?;
  723. Ok(())
  724. }
  725. #[instrument(skip_all)]
  726. async fn add_melt_quote(&self, quote: wallet::MeltQuote) -> Result<(), database::Error> {
  727. let write_txn = self.db.begin_write().map_err(Error::from)?;
  728. {
  729. let mut table = write_txn
  730. .open_table(MELT_QUOTES_TABLE)
  731. .map_err(Error::from)?;
  732. table
  733. .insert(
  734. quote.id.as_str(),
  735. serde_json::to_string(&quote).map_err(Error::from)?.as_str(),
  736. )
  737. .map_err(Error::from)?;
  738. }
  739. write_txn.commit().map_err(Error::from)?;
  740. Ok(())
  741. }
  742. #[instrument(skip_all)]
  743. async fn remove_melt_quote(&self, quote_id: &str) -> Result<(), database::Error> {
  744. let write_txn = self.db.begin_write().map_err(Error::from)?;
  745. {
  746. let mut table = write_txn
  747. .open_table(MELT_QUOTES_TABLE)
  748. .map_err(Error::from)?;
  749. table.remove(quote_id).map_err(Error::from)?;
  750. }
  751. write_txn.commit().map_err(Error::from)?;
  752. Ok(())
  753. }
  754. #[instrument(skip_all)]
  755. async fn add_keys(&self, keyset: KeySet) -> Result<(), database::Error> {
  756. let write_txn = self.db.begin_write().map_err(Error::from)?;
  757. keyset.verify_id()?;
  758. {
  759. let mut table = write_txn.open_table(MINT_KEYS_TABLE).map_err(Error::from)?;
  760. let existing_keys = table
  761. .insert(
  762. keyset.id.to_string().as_str(),
  763. serde_json::to_string(&keyset.keys)
  764. .map_err(Error::from)?
  765. .as_str(),
  766. )
  767. .map_err(Error::from)?
  768. .is_some();
  769. let mut table = write_txn
  770. .open_table(KEYSET_U32_MAPPING)
  771. .map_err(Error::from)?;
  772. let existing = table
  773. .insert(u32::from(keyset.id), keyset.id.to_string().as_str())
  774. .map_err(Error::from)?;
  775. let existing_u32 = match existing {
  776. None => false,
  777. Some(id) => {
  778. let id = Id::from_str(id.value())?;
  779. id != keyset.id
  780. }
  781. };
  782. if existing_keys || existing_u32 {
  783. tracing::warn!("Keys already exist for keyset id");
  784. return Err(database::Error::Duplicate);
  785. }
  786. }
  787. write_txn.commit().map_err(Error::from)?;
  788. Ok(())
  789. }
  790. #[instrument(skip(self), fields(keyset_id = %keyset_id))]
  791. async fn remove_keys(&self, keyset_id: &Id) -> Result<(), database::Error> {
  792. let write_txn = self.db.begin_write().map_err(Error::from)?;
  793. {
  794. let mut table = write_txn.open_table(MINT_KEYS_TABLE).map_err(Error::from)?;
  795. table
  796. .remove(keyset_id.to_string().as_str())
  797. .map_err(Error::from)?;
  798. }
  799. write_txn.commit().map_err(Error::from)?;
  800. Ok(())
  801. }
  802. #[instrument(skip(self))]
  803. async fn remove_transaction(
  804. &self,
  805. transaction_id: TransactionId,
  806. ) -> Result<(), database::Error> {
  807. let write_txn = self.db.begin_write().map_err(Error::from)?;
  808. {
  809. let mut table = write_txn
  810. .open_table(TRANSACTIONS_TABLE)
  811. .map_err(Error::from)?;
  812. table
  813. .remove(transaction_id.as_slice())
  814. .map_err(Error::from)?;
  815. }
  816. write_txn.commit().map_err(Error::from)?;
  817. Ok(())
  818. }
  819. // KV Store write methods (non-transactional)
  820. #[instrument(skip(self, value))]
  821. async fn kv_write(
  822. &self,
  823. primary_namespace: &str,
  824. secondary_namespace: &str,
  825. key: &str,
  826. value: &[u8],
  827. ) -> Result<(), database::Error> {
  828. // Validate parameters according to KV store requirements
  829. validate_kvstore_params(primary_namespace, secondary_namespace, Some(key))?;
  830. let write_txn = self.db.begin_write().map_err(Error::from)?;
  831. {
  832. let mut table = write_txn.open_table(KV_STORE_TABLE).map_err(Error::from)?;
  833. table
  834. .insert((primary_namespace, secondary_namespace, key), value)
  835. .map_err(Error::from)?;
  836. }
  837. write_txn.commit().map_err(Error::from)?;
  838. Ok(())
  839. }
  840. #[instrument(skip(self))]
  841. async fn kv_remove(
  842. &self,
  843. primary_namespace: &str,
  844. secondary_namespace: &str,
  845. key: &str,
  846. ) -> Result<(), database::Error> {
  847. // Validate parameters according to KV store requirements
  848. validate_kvstore_params(primary_namespace, secondary_namespace, Some(key))?;
  849. let write_txn = self.db.begin_write().map_err(Error::from)?;
  850. {
  851. let mut table = write_txn.open_table(KV_STORE_TABLE).map_err(Error::from)?;
  852. table
  853. .remove((primary_namespace, secondary_namespace, key))
  854. .map_err(Error::from)?;
  855. }
  856. write_txn.commit().map_err(Error::from)?;
  857. Ok(())
  858. }
  859. }
  860. #[async_trait]
  861. impl KVStoreDatabase for WalletRedbDatabase {
  862. type Err = database::Error;
  863. #[instrument(skip_all)]
  864. async fn kv_read(
  865. &self,
  866. primary_namespace: &str,
  867. secondary_namespace: &str,
  868. key: &str,
  869. ) -> Result<Option<Vec<u8>>, Self::Err> {
  870. // Validate parameters according to KV store requirements
  871. validate_kvstore_params(primary_namespace, secondary_namespace, Some(key))?;
  872. let read_txn = self.db.begin_read().map_err(Error::from)?;
  873. let table = read_txn.open_table(KV_STORE_TABLE).map_err(Error::from)?;
  874. let result = table
  875. .get((primary_namespace, secondary_namespace, key))
  876. .map_err(Error::from)?
  877. .map(|v| v.value().to_vec());
  878. Ok(result)
  879. }
  880. #[instrument(skip_all)]
  881. async fn kv_list(
  882. &self,
  883. primary_namespace: &str,
  884. secondary_namespace: &str,
  885. ) -> Result<Vec<String>, Self::Err> {
  886. validate_kvstore_params(primary_namespace, secondary_namespace, None)?;
  887. let read_txn = self.db.begin_read().map_err(Error::from)?;
  888. let table = read_txn.open_table(KV_STORE_TABLE).map_err(Error::from)?;
  889. let mut keys = Vec::new();
  890. let start = (primary_namespace, secondary_namespace, "");
  891. for result in table.range(start..).map_err(Error::from)? {
  892. let (key_tuple, _) = result.map_err(Error::from)?;
  893. let (primary_from_db, secondary_from_db, k) = key_tuple.value();
  894. if primary_from_db != primary_namespace || secondary_from_db != secondary_namespace {
  895. break;
  896. }
  897. keys.push(k.to_string());
  898. }
  899. Ok(keys)
  900. }
  901. }
  902. #[cfg(test)]
  903. mod test {
  904. use std::path::PathBuf;
  905. use cdk_common::wallet_db_test;
  906. use super::WalletRedbDatabase;
  907. async fn provide_db(test_id: String) -> WalletRedbDatabase {
  908. let path = PathBuf::from(format!("/tmp/cdk-test-{}.redb", test_id));
  909. WalletRedbDatabase::new(&path).expect("database")
  910. }
  911. wallet_db_test!(provide_db);
  912. }