mod.rs 40 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218
  1. //! Redb Wallet
  2. use std::cmp::Ordering;
  3. use std::collections::HashMap;
  4. use std::path::Path;
  5. use std::str::FromStr;
  6. use std::sync::Arc;
  7. use async_trait::async_trait;
  8. use cdk_common::common::ProofInfo;
  9. use cdk_common::database::{
  10. validate_kvstore_params, validate_kvstore_string, DbTransactionFinalizer, KVStore,
  11. KVStoreDatabase, KVStoreTransaction, WalletDatabase, WalletDatabaseTransaction,
  12. };
  13. use cdk_common::mint_url::MintUrl;
  14. use cdk_common::util::unix_time;
  15. use cdk_common::wallet::{self, MintQuote, Transaction, TransactionDirection, TransactionId};
  16. use cdk_common::{
  17. database, Amount, CurrencyUnit, Id, KeySet, KeySetInfo, Keys, MintInfo, PaymentMethod,
  18. PublicKey, SpendingConditions, State,
  19. };
  20. use redb::{Database, MultimapTableDefinition, ReadableTable, TableDefinition};
  21. use tracing::instrument;
  22. use super::error::Error;
  23. use crate::migrations::migrate_00_to_01;
  24. use crate::wallet::migrations::{migrate_01_to_02, migrate_02_to_03, migrate_03_to_04};
  25. mod migrations;
  26. // <Mint_url, Info>
  27. const MINTS_TABLE: TableDefinition<&str, &str> = TableDefinition::new("mints_table");
  28. // <Mint_Url, Keyset_id>
  29. const MINT_KEYSETS_TABLE: MultimapTableDefinition<&str, &[u8]> =
  30. MultimapTableDefinition::new("mint_keysets");
  31. // <Keyset_id, KeysetInfo>
  32. const KEYSETS_TABLE: TableDefinition<&[u8], &str> = TableDefinition::new("keysets");
  33. // <Quote_id, quote>
  34. const MINT_QUOTES_TABLE: TableDefinition<&str, &str> = TableDefinition::new("mint_quotes");
  35. // <Quote_id, quote>
  36. const MELT_QUOTES_TABLE: TableDefinition<&str, &str> = TableDefinition::new("melt_quotes");
  37. const MINT_KEYS_TABLE: TableDefinition<&str, &str> = TableDefinition::new("mint_keys");
  38. // <Y, Proof Info>
  39. const PROOFS_TABLE: TableDefinition<&[u8], &str> = TableDefinition::new("proofs");
  40. const CONFIG_TABLE: TableDefinition<&str, &str> = TableDefinition::new("config");
  41. const KEYSET_COUNTER: TableDefinition<&str, u32> = TableDefinition::new("keyset_counter");
  42. // <Transaction_id, Transaction>
  43. const TRANSACTIONS_TABLE: TableDefinition<&[u8], &str> = TableDefinition::new("transactions");
  44. const KEYSET_U32_MAPPING: TableDefinition<u32, &str> = TableDefinition::new("keyset_u32_mapping");
  45. // <(primary_namespace, secondary_namespace, key), value>
  46. const KV_STORE_TABLE: TableDefinition<(&str, &str, &str), &[u8]> = TableDefinition::new("kv_store");
  47. const DATABASE_VERSION: u32 = 4;
  48. /// Wallet Redb Database
  49. #[derive(Debug, Clone)]
  50. pub struct WalletRedbDatabase {
  51. db: Arc<Database>,
  52. }
  53. /// Redb Wallet Transaction
  54. pub struct RedbWalletTransaction {
  55. write_txn: Option<redb::WriteTransaction>,
  56. }
  57. impl RedbWalletTransaction {
  58. /// Create a new transaction
  59. fn new(write_txn: redb::WriteTransaction) -> Self {
  60. Self {
  61. write_txn: Some(write_txn),
  62. }
  63. }
  64. /// Get a mutable reference to the write transaction
  65. fn txn(&mut self) -> Result<&mut redb::WriteTransaction, Error> {
  66. self.write_txn.as_mut().ok_or_else(|| {
  67. Error::CDKDatabase(database::Error::Internal(
  68. "Transaction already consumed".to_owned(),
  69. ))
  70. })
  71. }
  72. }
  73. impl WalletRedbDatabase {
  74. /// Create new [`WalletRedbDatabase`]
  75. pub fn new(path: &Path) -> Result<Self, Error> {
  76. {
  77. // Check if parent directory exists before attempting to create database
  78. if let Some(parent) = path.parent() {
  79. if !parent.exists() {
  80. return Err(Error::Io(std::io::Error::new(
  81. std::io::ErrorKind::NotFound,
  82. format!("Parent directory does not exist: {parent:?}"),
  83. )));
  84. }
  85. }
  86. let db = Arc::new(Database::create(path)?);
  87. let db_version: Option<String>;
  88. {
  89. // Check database version
  90. let read_txn = db.begin_read()?;
  91. let table = read_txn.open_table(CONFIG_TABLE);
  92. db_version = match table {
  93. Ok(table) => table.get("db_version")?.map(|v| v.value().to_string()),
  94. Err(_) => None,
  95. };
  96. }
  97. match db_version {
  98. Some(db_version) => {
  99. let mut current_file_version = u32::from_str(&db_version)?;
  100. tracing::info!("Current file version {}", current_file_version);
  101. match current_file_version.cmp(&DATABASE_VERSION) {
  102. Ordering::Less => {
  103. tracing::info!(
  104. "Database needs to be upgraded at {} current is {}",
  105. current_file_version,
  106. DATABASE_VERSION
  107. );
  108. if current_file_version == 0 {
  109. current_file_version = migrate_00_to_01(Arc::clone(&db))?;
  110. }
  111. if current_file_version == 1 {
  112. current_file_version = migrate_01_to_02(Arc::clone(&db))?;
  113. }
  114. if current_file_version == 2 {
  115. current_file_version = migrate_02_to_03(Arc::clone(&db))?;
  116. }
  117. if current_file_version == 3 {
  118. current_file_version = migrate_03_to_04(Arc::clone(&db))?;
  119. }
  120. if current_file_version != DATABASE_VERSION {
  121. tracing::warn!(
  122. "Database upgrade did not complete at {} current is {}",
  123. current_file_version,
  124. DATABASE_VERSION
  125. );
  126. return Err(Error::UnknownDatabaseVersion);
  127. }
  128. let write_txn = db.begin_write()?;
  129. {
  130. let mut table = write_txn.open_table(CONFIG_TABLE)?;
  131. table
  132. .insert("db_version", DATABASE_VERSION.to_string().as_str())?;
  133. }
  134. write_txn.commit()?;
  135. }
  136. Ordering::Equal => {
  137. tracing::info!("Database is at current version {}", DATABASE_VERSION);
  138. }
  139. Ordering::Greater => {
  140. tracing::warn!(
  141. "Database upgrade did not complete at {} current is {}",
  142. current_file_version,
  143. DATABASE_VERSION
  144. );
  145. return Err(Error::UnknownDatabaseVersion);
  146. }
  147. }
  148. }
  149. None => {
  150. let write_txn = db.begin_write()?;
  151. {
  152. let mut table = write_txn.open_table(CONFIG_TABLE)?;
  153. // Open all tables to init a new db
  154. let _ = write_txn.open_table(MINTS_TABLE)?;
  155. let _ = write_txn.open_multimap_table(MINT_KEYSETS_TABLE)?;
  156. let _ = write_txn.open_table(KEYSETS_TABLE)?;
  157. let _ = write_txn.open_table(MINT_QUOTES_TABLE)?;
  158. let _ = write_txn.open_table(MELT_QUOTES_TABLE)?;
  159. let _ = write_txn.open_table(MINT_KEYS_TABLE)?;
  160. let _ = write_txn.open_table(PROOFS_TABLE)?;
  161. let _ = write_txn.open_table(KEYSET_COUNTER)?;
  162. let _ = write_txn.open_table(TRANSACTIONS_TABLE)?;
  163. let _ = write_txn.open_table(KEYSET_U32_MAPPING)?;
  164. let _ = write_txn.open_table(KV_STORE_TABLE)?;
  165. table.insert("db_version", DATABASE_VERSION.to_string().as_str())?;
  166. }
  167. write_txn.commit()?;
  168. }
  169. }
  170. drop(db);
  171. }
  172. // Check parent directory again for final database creation
  173. if let Some(parent) = path.parent() {
  174. if !parent.exists() {
  175. return Err(Error::Io(std::io::Error::new(
  176. std::io::ErrorKind::NotFound,
  177. format!("Parent directory does not exist: {parent:?}"),
  178. )));
  179. }
  180. }
  181. let mut db = Database::create(path)?;
  182. db.upgrade()?;
  183. Ok(Self { db: Arc::new(db) })
  184. }
  185. }
  186. #[async_trait]
  187. impl WalletDatabase for WalletRedbDatabase {
  188. type Err = database::Error;
  189. #[instrument(skip(self))]
  190. async fn get_mint(&self, mint_url: MintUrl) -> Result<Option<MintInfo>, Self::Err> {
  191. let read_txn = self.db.begin_read().map_err(Into::<Error>::into)?;
  192. let table = read_txn.open_table(MINTS_TABLE).map_err(Error::from)?;
  193. if let Some(mint_info) = table
  194. .get(mint_url.to_string().as_str())
  195. .map_err(Error::from)?
  196. {
  197. return Ok(serde_json::from_str(mint_info.value()).map_err(Error::from)?);
  198. }
  199. Ok(None)
  200. }
  201. #[instrument(skip(self))]
  202. async fn get_mints(&self) -> Result<HashMap<MintUrl, Option<MintInfo>>, Self::Err> {
  203. let read_txn = self.db.begin_read().map_err(Error::from)?;
  204. let table = read_txn.open_table(MINTS_TABLE).map_err(Error::from)?;
  205. let mints = table
  206. .iter()
  207. .map_err(Error::from)?
  208. .flatten()
  209. .filter_map(|(mint, mint_info)| {
  210. MintUrl::from_str(mint.value())
  211. .ok()
  212. .map(|url| (url, serde_json::from_str(mint_info.value()).ok()))
  213. })
  214. .collect();
  215. Ok(mints)
  216. }
  217. #[instrument(skip(self))]
  218. async fn get_mint_keysets(
  219. &self,
  220. mint_url: MintUrl,
  221. ) -> Result<Option<Vec<KeySetInfo>>, Self::Err> {
  222. let read_txn = self.db.begin_read().map_err(Into::<Error>::into)?;
  223. let table = read_txn
  224. .open_multimap_table(MINT_KEYSETS_TABLE)
  225. .map_err(Error::from)?;
  226. let keyset_ids = table
  227. .get(mint_url.to_string().as_str())
  228. .map_err(Error::from)?
  229. .flatten()
  230. .map(|k| Id::from_bytes(k.value()))
  231. .collect::<Result<Vec<_>, _>>()?;
  232. let mut keysets = vec![];
  233. let keysets_t = read_txn.open_table(KEYSETS_TABLE).map_err(Error::from)?;
  234. for keyset_id in keyset_ids {
  235. if let Some(keyset) = keysets_t
  236. .get(keyset_id.to_bytes().as_slice())
  237. .map_err(Error::from)?
  238. {
  239. let keyset = serde_json::from_str(keyset.value()).map_err(Error::from)?;
  240. keysets.push(keyset);
  241. }
  242. }
  243. match keysets.is_empty() {
  244. true => Ok(None),
  245. false => Ok(Some(keysets)),
  246. }
  247. }
  248. #[instrument(skip(self), fields(keyset_id = %keyset_id))]
  249. async fn get_keyset_by_id(&self, keyset_id: &Id) -> Result<Option<KeySetInfo>, Self::Err> {
  250. let read_txn = self.db.begin_read().map_err(Into::<Error>::into)?;
  251. let table = read_txn.open_table(KEYSETS_TABLE).map_err(Error::from)?;
  252. match table
  253. .get(keyset_id.to_bytes().as_slice())
  254. .map_err(Error::from)?
  255. {
  256. Some(keyset) => {
  257. let keyset: KeySetInfo =
  258. serde_json::from_str(keyset.value()).map_err(Error::from)?;
  259. Ok(Some(keyset))
  260. }
  261. None => Ok(None),
  262. }
  263. }
  264. #[instrument(skip_all)]
  265. async fn get_mint_quote(&self, quote_id: &str) -> Result<Option<MintQuote>, Self::Err> {
  266. let read_txn = self.db.begin_read().map_err(Into::<Error>::into)?;
  267. let table = read_txn
  268. .open_table(MINT_QUOTES_TABLE)
  269. .map_err(Error::from)?;
  270. if let Some(mint_info) = table.get(quote_id).map_err(Error::from)? {
  271. return Ok(serde_json::from_str(mint_info.value()).map_err(Error::from)?);
  272. }
  273. Ok(None)
  274. }
  275. #[instrument(skip_all)]
  276. async fn get_mint_quotes(&self) -> Result<Vec<MintQuote>, Self::Err> {
  277. let read_txn = self.db.begin_read().map_err(Into::<Error>::into)?;
  278. let table = read_txn
  279. .open_table(MINT_QUOTES_TABLE)
  280. .map_err(Error::from)?;
  281. Ok(table
  282. .iter()
  283. .map_err(Error::from)?
  284. .flatten()
  285. .flat_map(|(_id, quote)| serde_json::from_str(quote.value()))
  286. .collect())
  287. }
  288. async fn get_unissued_mint_quotes(&self) -> Result<Vec<MintQuote>, Self::Err> {
  289. let read_txn = self.db.begin_read().map_err(Into::<Error>::into)?;
  290. let table = read_txn
  291. .open_table(MINT_QUOTES_TABLE)
  292. .map_err(Error::from)?;
  293. Ok(table
  294. .iter()
  295. .map_err(Error::from)?
  296. .flatten()
  297. .flat_map(|(_id, quote)| serde_json::from_str::<MintQuote>(quote.value()).ok())
  298. .filter(|quote| {
  299. quote.amount_issued == Amount::ZERO || quote.payment_method == PaymentMethod::Bolt12
  300. })
  301. .collect())
  302. }
  303. #[instrument(skip_all)]
  304. async fn get_melt_quote(&self, quote_id: &str) -> Result<Option<wallet::MeltQuote>, Self::Err> {
  305. let read_txn = self.db.begin_read().map_err(Error::from)?;
  306. let table = read_txn
  307. .open_table(MELT_QUOTES_TABLE)
  308. .map_err(Error::from)?;
  309. if let Some(mint_info) = table.get(quote_id).map_err(Error::from)? {
  310. return Ok(serde_json::from_str(mint_info.value()).map_err(Error::from)?);
  311. }
  312. Ok(None)
  313. }
  314. #[instrument(skip_all)]
  315. async fn get_melt_quotes(&self) -> Result<Vec<wallet::MeltQuote>, Self::Err> {
  316. let read_txn = self.db.begin_read().map_err(Error::from)?;
  317. let table = read_txn
  318. .open_table(MELT_QUOTES_TABLE)
  319. .map_err(Error::from)?;
  320. Ok(table
  321. .iter()
  322. .map_err(Error::from)?
  323. .flatten()
  324. .flat_map(|(_id, quote)| serde_json::from_str(quote.value()))
  325. .collect())
  326. }
  327. #[instrument(skip(self), fields(keyset_id = %keyset_id))]
  328. async fn get_keys(&self, keyset_id: &Id) -> Result<Option<Keys>, Self::Err> {
  329. let read_txn = self.db.begin_read().map_err(Error::from)?;
  330. let table = read_txn.open_table(MINT_KEYS_TABLE).map_err(Error::from)?;
  331. if let Some(mint_info) = table
  332. .get(keyset_id.to_string().as_str())
  333. .map_err(Error::from)?
  334. {
  335. return Ok(serde_json::from_str(mint_info.value()).map_err(Error::from)?);
  336. }
  337. Ok(None)
  338. }
  339. #[instrument(skip_all)]
  340. async fn get_proofs(
  341. &self,
  342. mint_url: Option<MintUrl>,
  343. unit: Option<CurrencyUnit>,
  344. state: Option<Vec<State>>,
  345. spending_conditions: Option<Vec<SpendingConditions>>,
  346. ) -> Result<Vec<ProofInfo>, Self::Err> {
  347. let read_txn = self.db.begin_read().map_err(Error::from)?;
  348. let table = read_txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
  349. let proofs: Vec<ProofInfo> = table
  350. .iter()
  351. .map_err(Error::from)?
  352. .flatten()
  353. .filter_map(|(_k, v)| {
  354. let mut proof = None;
  355. if let Ok(proof_info) = serde_json::from_str::<ProofInfo>(v.value()) {
  356. if proof_info.matches_conditions(&mint_url, &unit, &state, &spending_conditions)
  357. {
  358. proof = Some(proof_info)
  359. }
  360. }
  361. proof
  362. })
  363. .collect();
  364. Ok(proofs)
  365. }
  366. #[instrument(skip(self, ys))]
  367. async fn get_proofs_by_ys(&self, ys: Vec<PublicKey>) -> Result<Vec<ProofInfo>, Self::Err> {
  368. if ys.is_empty() {
  369. return Ok(Vec::new());
  370. }
  371. let read_txn = self.db.begin_read().map_err(Error::from)?;
  372. let table = read_txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
  373. let mut proofs = Vec::new();
  374. for y in ys {
  375. if let Some(proof) = table.get(y.to_bytes().as_slice()).map_err(Error::from)? {
  376. let proof_info =
  377. serde_json::from_str::<ProofInfo>(proof.value()).map_err(Error::from)?;
  378. proofs.push(proof_info);
  379. }
  380. }
  381. Ok(proofs)
  382. }
  383. async fn get_balance(
  384. &self,
  385. mint_url: Option<MintUrl>,
  386. unit: Option<CurrencyUnit>,
  387. state: Option<Vec<State>>,
  388. ) -> Result<u64, database::Error> {
  389. // For redb, we still need to fetch all proofs and sum them
  390. // since redb doesn't have SQL aggregation
  391. let proofs = self.get_proofs(mint_url, unit, state, None).await?;
  392. Ok(proofs.iter().map(|p| u64::from(p.proof.amount)).sum())
  393. }
  394. #[instrument(skip(self))]
  395. async fn get_transaction(
  396. &self,
  397. transaction_id: TransactionId,
  398. ) -> Result<Option<Transaction>, Self::Err> {
  399. let read_txn = self.db.begin_read().map_err(Error::from)?;
  400. let table = read_txn
  401. .open_table(TRANSACTIONS_TABLE)
  402. .map_err(Error::from)?;
  403. if let Some(transaction) = table.get(transaction_id.as_slice()).map_err(Error::from)? {
  404. return Ok(serde_json::from_str(transaction.value()).map_err(Error::from)?);
  405. }
  406. Ok(None)
  407. }
  408. #[instrument(skip(self))]
  409. async fn list_transactions(
  410. &self,
  411. mint_url: Option<MintUrl>,
  412. direction: Option<TransactionDirection>,
  413. unit: Option<CurrencyUnit>,
  414. ) -> Result<Vec<Transaction>, Self::Err> {
  415. let read_txn = self.db.begin_read().map_err(Error::from)?;
  416. let table = read_txn
  417. .open_table(TRANSACTIONS_TABLE)
  418. .map_err(Error::from)?;
  419. let transactions: Vec<Transaction> = table
  420. .iter()
  421. .map_err(Error::from)?
  422. .flatten()
  423. .filter_map(|(_k, v)| {
  424. let mut transaction = None;
  425. if let Ok(tx) = serde_json::from_str::<Transaction>(v.value()) {
  426. if tx.matches_conditions(&mint_url, &direction, &unit) {
  427. transaction = Some(tx)
  428. }
  429. }
  430. transaction
  431. })
  432. .collect();
  433. Ok(transactions)
  434. }
  435. async fn begin_db_transaction(
  436. &self,
  437. ) -> Result<Box<dyn WalletDatabaseTransaction<Self::Err> + Send + Sync>, Self::Err> {
  438. let write_txn = self.db.begin_write().map_err(Error::from)?;
  439. Ok(Box::new(RedbWalletTransaction::new(write_txn)))
  440. }
  441. }
  442. #[async_trait]
  443. impl KVStoreDatabase for WalletRedbDatabase {
  444. type Err = database::Error;
  445. #[instrument(skip_all)]
  446. async fn kv_read(
  447. &self,
  448. primary_namespace: &str,
  449. secondary_namespace: &str,
  450. key: &str,
  451. ) -> Result<Option<Vec<u8>>, Self::Err> {
  452. // Validate parameters according to KV store requirements
  453. validate_kvstore_params(primary_namespace, secondary_namespace, key)?;
  454. let read_txn = self.db.begin_read().map_err(Error::from)?;
  455. let table = read_txn.open_table(KV_STORE_TABLE).map_err(Error::from)?;
  456. let result = table
  457. .get((primary_namespace, secondary_namespace, key))
  458. .map_err(Error::from)?
  459. .map(|v| v.value().to_vec());
  460. Ok(result)
  461. }
  462. #[instrument(skip_all)]
  463. async fn kv_list(
  464. &self,
  465. primary_namespace: &str,
  466. secondary_namespace: &str,
  467. ) -> Result<Vec<String>, Self::Err> {
  468. // Validate namespace parameters according to KV store requirements
  469. validate_kvstore_string(primary_namespace)?;
  470. validate_kvstore_string(secondary_namespace)?;
  471. // Check empty namespace rules
  472. if primary_namespace.is_empty() && !secondary_namespace.is_empty() {
  473. return Err(database::Error::KVStoreInvalidKey(
  474. "If primary_namespace is empty, secondary_namespace must also be empty".to_string(),
  475. ));
  476. }
  477. let read_txn = self.db.begin_read().map_err(Error::from)?;
  478. let table = read_txn.open_table(KV_STORE_TABLE).map_err(Error::from)?;
  479. let mut keys = Vec::new();
  480. // Use range iterator for efficient lookup by namespace prefix
  481. // Range from (primary, secondary, "") to (primary, secondary, "\u{10FFFF}") to get all keys in this namespace
  482. let start = (primary_namespace, secondary_namespace, "");
  483. let end = (primary_namespace, secondary_namespace, "\u{10FFFF}");
  484. for result in table.range(start..=end).map_err(Error::from)? {
  485. let (key_tuple, _) = result.map_err(Error::from)?;
  486. let (_primary, _secondary, k) = key_tuple.value();
  487. keys.push(k.to_string());
  488. }
  489. // Keys are already sorted by the B-tree structure
  490. Ok(keys)
  491. }
  492. }
  493. #[async_trait]
  494. impl KVStore for WalletRedbDatabase {
  495. async fn begin_transaction<'a>(
  496. &'a self,
  497. ) -> Result<Box<dyn KVStoreTransaction<'a, Self::Err> + Send + Sync + 'a>, database::Error>
  498. {
  499. let write_txn = self.db.begin_write().map_err(Error::from)?;
  500. Ok(Box::new(RedbWalletTransaction::new(write_txn)))
  501. }
  502. }
  503. #[async_trait]
  504. impl WalletDatabaseTransaction<database::Error> for RedbWalletTransaction {
  505. #[instrument(skip(self), fields(keyset_id = %keyset_id))]
  506. async fn get_keyset_by_id(
  507. &mut self,
  508. keyset_id: &Id,
  509. ) -> Result<Option<KeySetInfo>, database::Error> {
  510. let txn = self.txn().map_err(Into::<database::Error>::into)?;
  511. let table = txn.open_table(KEYSETS_TABLE).map_err(Error::from)?;
  512. let result = match table
  513. .get(keyset_id.to_bytes().as_slice())
  514. .map_err(Error::from)?
  515. {
  516. Some(keyset) => {
  517. let keyset: KeySetInfo =
  518. serde_json::from_str(keyset.value()).map_err(Error::from)?;
  519. Ok(Some(keyset))
  520. }
  521. None => Ok(None),
  522. };
  523. result
  524. }
  525. #[instrument(skip(self), fields(keyset_id = %keyset_id))]
  526. async fn get_keys(&mut self, keyset_id: &Id) -> Result<Option<Keys>, database::Error> {
  527. let txn = self.txn().map_err(Into::<database::Error>::into)?;
  528. let table = txn.open_table(MINT_KEYS_TABLE).map_err(Error::from)?;
  529. if let Some(mint_info) = table
  530. .get(keyset_id.to_string().as_str())
  531. .map_err(Error::from)?
  532. {
  533. return Ok(serde_json::from_str(mint_info.value()).map_err(Error::from)?);
  534. }
  535. Ok(None)
  536. }
  537. #[instrument(skip(self))]
  538. async fn add_mint(
  539. &mut self,
  540. mint_url: MintUrl,
  541. mint_info: Option<MintInfo>,
  542. ) -> Result<(), database::Error> {
  543. let txn = self.txn()?;
  544. let mut table = txn.open_table(MINTS_TABLE).map_err(Error::from)?;
  545. table
  546. .insert(
  547. mint_url.to_string().as_str(),
  548. serde_json::to_string(&mint_info)
  549. .map_err(Error::from)?
  550. .as_str(),
  551. )
  552. .map_err(Error::from)?;
  553. Ok(())
  554. }
  555. #[instrument(skip(self))]
  556. async fn remove_mint(&mut self, mint_url: MintUrl) -> Result<(), database::Error> {
  557. let txn = self.txn()?;
  558. let mut table = txn.open_table(MINTS_TABLE).map_err(Error::from)?;
  559. table
  560. .remove(mint_url.to_string().as_str())
  561. .map_err(Error::from)?;
  562. Ok(())
  563. }
  564. #[instrument(skip(self))]
  565. async fn update_mint_url(
  566. &mut self,
  567. old_mint_url: MintUrl,
  568. new_mint_url: MintUrl,
  569. ) -> Result<(), database::Error> {
  570. // Update proofs table
  571. {
  572. let proofs = self
  573. .get_proofs(Some(old_mint_url.clone()), None, None, None)
  574. .await
  575. .map_err(Error::from)?;
  576. // Proofs with new url
  577. let updated_proofs: Vec<ProofInfo> = proofs
  578. .clone()
  579. .into_iter()
  580. .map(|mut p| {
  581. p.mint_url = new_mint_url.clone();
  582. p
  583. })
  584. .collect();
  585. if !updated_proofs.is_empty() {
  586. self.update_proofs(updated_proofs, vec![]).await?;
  587. }
  588. }
  589. // Update mint quotes
  590. {
  591. let read_txn = self.txn()?;
  592. let mut table = read_txn
  593. .open_table(MINT_QUOTES_TABLE)
  594. .map_err(Error::from)?;
  595. let unix_time = unix_time();
  596. let quotes = table
  597. .iter()
  598. .map_err(Error::from)?
  599. .flatten()
  600. .filter_map(|(_, quote)| {
  601. let mut q: MintQuote = serde_json::from_str(quote.value())
  602. .inspect_err(|err| {
  603. tracing::warn!(
  604. "Failed to deserialize {} with error {}",
  605. quote.value(),
  606. err
  607. )
  608. })
  609. .ok()?;
  610. if q.expiry < unix_time {
  611. q.mint_url = new_mint_url.clone();
  612. Some(q)
  613. } else {
  614. None
  615. }
  616. })
  617. .collect::<Vec<_>>();
  618. for quote in quotes {
  619. table
  620. .insert(
  621. quote.id.as_str(),
  622. serde_json::to_string(&quote).map_err(Error::from)?.as_str(),
  623. )
  624. .map_err(Error::from)?;
  625. }
  626. }
  627. Ok(())
  628. }
  629. #[instrument(skip(self))]
  630. async fn add_mint_keysets(
  631. &mut self,
  632. mint_url: MintUrl,
  633. keysets: Vec<KeySetInfo>,
  634. ) -> Result<(), database::Error> {
  635. let txn = self.txn()?;
  636. let mut table = txn
  637. .open_multimap_table(MINT_KEYSETS_TABLE)
  638. .map_err(Error::from)?;
  639. let mut keysets_table = txn.open_table(KEYSETS_TABLE).map_err(Error::from)?;
  640. let mut u32_table = txn.open_table(KEYSET_U32_MAPPING).map_err(Error::from)?;
  641. let mut existing_u32 = false;
  642. for keyset in keysets {
  643. // Check if keyset already exists
  644. let existing_keyset = {
  645. let existing_keyset = keysets_table
  646. .get(keyset.id.to_bytes().as_slice())
  647. .map_err(Error::from)?;
  648. existing_keyset.map(|r| r.value().to_string())
  649. };
  650. let existing = u32_table
  651. .insert(u32::from(keyset.id), keyset.id.to_string().as_str())
  652. .map_err(Error::from)?;
  653. match existing {
  654. None => existing_u32 = false,
  655. Some(id) => {
  656. let id = Id::from_str(id.value())?;
  657. if id == keyset.id {
  658. existing_u32 = false;
  659. } else {
  660. existing_u32 = true;
  661. break;
  662. }
  663. }
  664. }
  665. let keyset = if let Some(existing_keyset) = existing_keyset {
  666. let mut existing_keyset: KeySetInfo = serde_json::from_str(&existing_keyset)?;
  667. existing_keyset.active = keyset.active;
  668. existing_keyset.input_fee_ppk = keyset.input_fee_ppk;
  669. existing_keyset
  670. } else {
  671. table
  672. .insert(
  673. mint_url.to_string().as_str(),
  674. keyset.id.to_bytes().as_slice(),
  675. )
  676. .map_err(Error::from)?;
  677. keyset
  678. };
  679. keysets_table
  680. .insert(
  681. keyset.id.to_bytes().as_slice(),
  682. serde_json::to_string(&keyset)
  683. .map_err(Error::from)?
  684. .as_str(),
  685. )
  686. .map_err(Error::from)?;
  687. }
  688. if existing_u32 {
  689. tracing::warn!("Keyset already exists for keyset id");
  690. return Err(database::Error::Duplicate);
  691. }
  692. Ok(())
  693. }
  694. #[instrument(skip_all)]
  695. async fn get_mint_quote(
  696. &mut self,
  697. quote_id: &str,
  698. ) -> Result<Option<MintQuote>, database::Error> {
  699. let txn = self.txn()?;
  700. let table = txn.open_table(MINT_QUOTES_TABLE).map_err(Error::from)?;
  701. if let Some(mint_info) = table.get(quote_id).map_err(Error::from)? {
  702. return Ok(serde_json::from_str(mint_info.value()).map_err(Error::from)?);
  703. }
  704. Ok(None)
  705. }
  706. #[instrument(skip_all)]
  707. async fn add_mint_quote(&mut self, quote: MintQuote) -> Result<(), database::Error> {
  708. let txn = self.txn()?;
  709. let mut table = txn.open_table(MINT_QUOTES_TABLE).map_err(Error::from)?;
  710. table
  711. .insert(
  712. quote.id.as_str(),
  713. serde_json::to_string(&quote).map_err(Error::from)?.as_str(),
  714. )
  715. .map_err(Error::from)?;
  716. Ok(())
  717. }
  718. #[instrument(skip_all)]
  719. async fn remove_mint_quote(&mut self, quote_id: &str) -> Result<(), database::Error> {
  720. let txn = self.txn()?;
  721. let mut table = txn.open_table(MINT_QUOTES_TABLE).map_err(Error::from)?;
  722. table.remove(quote_id).map_err(Error::from)?;
  723. Ok(())
  724. }
  725. #[instrument(skip_all)]
  726. async fn get_melt_quote(
  727. &mut self,
  728. quote_id: &str,
  729. ) -> Result<Option<wallet::MeltQuote>, database::Error> {
  730. let txn = self.txn()?;
  731. let table = txn.open_table(MELT_QUOTES_TABLE).map_err(Error::from)?;
  732. if let Some(mint_info) = table.get(quote_id).map_err(Error::from)? {
  733. return Ok(serde_json::from_str(mint_info.value()).map_err(Error::from)?);
  734. }
  735. Ok(None)
  736. }
  737. #[instrument(skip_all)]
  738. async fn add_melt_quote(&mut self, quote: wallet::MeltQuote) -> Result<(), database::Error> {
  739. let txn = self.txn()?;
  740. let mut table = txn.open_table(MELT_QUOTES_TABLE).map_err(Error::from)?;
  741. table
  742. .insert(
  743. quote.id.as_str(),
  744. serde_json::to_string(&quote).map_err(Error::from)?.as_str(),
  745. )
  746. .map_err(Error::from)?;
  747. Ok(())
  748. }
  749. #[instrument(skip_all)]
  750. async fn remove_melt_quote(&mut self, quote_id: &str) -> Result<(), database::Error> {
  751. let txn = self.txn()?;
  752. let mut table = txn.open_table(MELT_QUOTES_TABLE).map_err(Error::from)?;
  753. table.remove(quote_id).map_err(Error::from)?;
  754. Ok(())
  755. }
  756. #[instrument(skip_all)]
  757. async fn add_keys(&mut self, keyset: KeySet) -> Result<(), database::Error> {
  758. let txn = self.txn()?;
  759. keyset.verify_id()?;
  760. let mut table = txn.open_table(MINT_KEYS_TABLE).map_err(Error::from)?;
  761. let existing_keys = table
  762. .insert(
  763. keyset.id.to_string().as_str(),
  764. serde_json::to_string(&keyset.keys)
  765. .map_err(Error::from)?
  766. .as_str(),
  767. )
  768. .map_err(Error::from)?
  769. .is_some();
  770. let mut table = txn.open_table(KEYSET_U32_MAPPING).map_err(Error::from)?;
  771. let existing = table
  772. .insert(u32::from(keyset.id), keyset.id.to_string().as_str())
  773. .map_err(Error::from)?;
  774. let existing_u32 = match existing {
  775. None => false,
  776. Some(id) => {
  777. let id = Id::from_str(id.value())?;
  778. id != keyset.id
  779. }
  780. };
  781. if existing_keys || existing_u32 {
  782. tracing::warn!("Keys already exist for keyset id");
  783. return Err(database::Error::Duplicate);
  784. }
  785. Ok(())
  786. }
  787. #[instrument(skip(self), fields(keyset_id = %keyset_id))]
  788. async fn remove_keys(&mut self, keyset_id: &Id) -> Result<(), database::Error> {
  789. let txn = self.txn()?;
  790. let mut table = txn.open_table(MINT_KEYS_TABLE).map_err(Error::from)?;
  791. table
  792. .remove(keyset_id.to_string().as_str())
  793. .map_err(Error::from)?;
  794. Ok(())
  795. }
  796. #[instrument(skip_all)]
  797. async fn get_proofs(
  798. &mut self,
  799. mint_url: Option<MintUrl>,
  800. unit: Option<CurrencyUnit>,
  801. state: Option<Vec<State>>,
  802. spending_conditions: Option<Vec<SpendingConditions>>,
  803. ) -> Result<Vec<ProofInfo>, database::Error> {
  804. let txn = self.txn()?;
  805. let table = txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
  806. let proofs: Vec<ProofInfo> = table
  807. .iter()
  808. .map_err(Error::from)?
  809. .flatten()
  810. .filter_map(|(_k, v)| {
  811. let mut proof = None;
  812. if let Ok(proof_info) = serde_json::from_str::<ProofInfo>(v.value()) {
  813. if proof_info.matches_conditions(&mint_url, &unit, &state, &spending_conditions)
  814. {
  815. proof = Some(proof_info)
  816. }
  817. }
  818. proof
  819. })
  820. .collect();
  821. Ok(proofs)
  822. }
  823. #[instrument(skip(self, added, deleted_ys))]
  824. async fn update_proofs(
  825. &mut self,
  826. added: Vec<ProofInfo>,
  827. deleted_ys: Vec<PublicKey>,
  828. ) -> Result<(), database::Error> {
  829. let txn = self.txn()?;
  830. let mut table = txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
  831. for proof_info in added.iter() {
  832. table
  833. .insert(
  834. proof_info.y.to_bytes().as_slice(),
  835. serde_json::to_string(&proof_info)
  836. .map_err(Error::from)?
  837. .as_str(),
  838. )
  839. .map_err(Error::from)?;
  840. }
  841. for y in deleted_ys.iter() {
  842. table.remove(y.to_bytes().as_slice()).map_err(Error::from)?;
  843. }
  844. Ok(())
  845. }
  846. async fn update_proofs_state(
  847. &mut self,
  848. ys: Vec<PublicKey>,
  849. state: State,
  850. ) -> Result<(), database::Error> {
  851. let txn = self.txn()?;
  852. let mut table = txn.open_table(PROOFS_TABLE).map_err(Error::from)?;
  853. for y in ys {
  854. let y_slice = y.to_bytes();
  855. let proof = table
  856. .get(y_slice.as_slice())
  857. .map_err(Error::from)?
  858. .ok_or(Error::UnknownY)?;
  859. let mut proof_info =
  860. serde_json::from_str::<ProofInfo>(proof.value()).map_err(Error::from)?;
  861. drop(proof);
  862. proof_info.state = state;
  863. table
  864. .insert(
  865. y_slice.as_slice(),
  866. serde_json::to_string(&proof_info)
  867. .map_err(Error::from)?
  868. .as_str(),
  869. )
  870. .map_err(Error::from)?;
  871. }
  872. Ok(())
  873. }
  874. #[instrument(skip(self), fields(keyset_id = %keyset_id))]
  875. async fn increment_keyset_counter(
  876. &mut self,
  877. keyset_id: &Id,
  878. count: u32,
  879. ) -> Result<u32, database::Error> {
  880. let txn = self.txn()?;
  881. let mut table = txn.open_table(KEYSET_COUNTER).map_err(Error::from)?;
  882. let current_counter = table
  883. .get(keyset_id.to_string().as_str())
  884. .map_err(Error::from)?
  885. .map(|x| x.value())
  886. .unwrap_or_default();
  887. let new_counter = current_counter + count;
  888. table
  889. .insert(keyset_id.to_string().as_str(), new_counter)
  890. .map_err(Error::from)?;
  891. Ok(new_counter)
  892. }
  893. #[instrument(skip(self))]
  894. async fn add_transaction(&mut self, transaction: Transaction) -> Result<(), database::Error> {
  895. let id = transaction.id();
  896. let txn = self.txn()?;
  897. let mut table = txn.open_table(TRANSACTIONS_TABLE).map_err(Error::from)?;
  898. table
  899. .insert(
  900. id.as_slice(),
  901. serde_json::to_string(&transaction)
  902. .map_err(Error::from)?
  903. .as_str(),
  904. )
  905. .map_err(Error::from)?;
  906. Ok(())
  907. }
  908. #[instrument(skip(self))]
  909. async fn remove_transaction(
  910. &mut self,
  911. transaction_id: TransactionId,
  912. ) -> Result<(), database::Error> {
  913. let txn = self.txn()?;
  914. let mut table = txn.open_table(TRANSACTIONS_TABLE).map_err(Error::from)?;
  915. table
  916. .remove(transaction_id.as_slice())
  917. .map_err(Error::from)?;
  918. Ok(())
  919. }
  920. }
  921. #[async_trait]
  922. impl KVStoreTransaction<'_, database::Error> for RedbWalletTransaction {
  923. #[instrument(skip_all)]
  924. async fn kv_read(
  925. &mut self,
  926. primary_namespace: &str,
  927. secondary_namespace: &str,
  928. key: &str,
  929. ) -> Result<Option<Vec<u8>>, database::Error> {
  930. // Validate parameters according to KV store requirements
  931. validate_kvstore_params(primary_namespace, secondary_namespace, key)?;
  932. let txn = self.txn()?;
  933. let table = txn.open_table(KV_STORE_TABLE).map_err(Error::from)?;
  934. let result = table
  935. .get((primary_namespace, secondary_namespace, key))
  936. .map_err(Error::from)?
  937. .map(|v| v.value().to_vec());
  938. Ok(result)
  939. }
  940. #[instrument(skip_all)]
  941. async fn kv_write(
  942. &mut self,
  943. primary_namespace: &str,
  944. secondary_namespace: &str,
  945. key: &str,
  946. value: &[u8],
  947. ) -> Result<(), database::Error> {
  948. // Validate parameters according to KV store requirements
  949. validate_kvstore_params(primary_namespace, secondary_namespace, key)?;
  950. let txn = self.txn()?;
  951. let mut table = txn.open_table(KV_STORE_TABLE).map_err(Error::from)?;
  952. table
  953. .insert((primary_namespace, secondary_namespace, key), value)
  954. .map_err(Error::from)?;
  955. Ok(())
  956. }
  957. #[instrument(skip_all)]
  958. async fn kv_remove(
  959. &mut self,
  960. primary_namespace: &str,
  961. secondary_namespace: &str,
  962. key: &str,
  963. ) -> Result<(), database::Error> {
  964. // Validate parameters according to KV store requirements
  965. validate_kvstore_params(primary_namespace, secondary_namespace, key)?;
  966. let txn = self.txn()?;
  967. let mut table = txn.open_table(KV_STORE_TABLE).map_err(Error::from)?;
  968. table
  969. .remove((primary_namespace, secondary_namespace, key))
  970. .map_err(Error::from)?;
  971. Ok(())
  972. }
  973. #[instrument(skip_all)]
  974. async fn kv_list(
  975. &mut self,
  976. primary_namespace: &str,
  977. secondary_namespace: &str,
  978. ) -> Result<Vec<String>, database::Error> {
  979. // Validate namespace parameters according to KV store requirements
  980. validate_kvstore_string(primary_namespace)?;
  981. validate_kvstore_string(secondary_namespace)?;
  982. // Check empty namespace rules
  983. if primary_namespace.is_empty() && !secondary_namespace.is_empty() {
  984. return Err(database::Error::KVStoreInvalidKey(
  985. "If primary_namespace is empty, secondary_namespace must also be empty".to_string(),
  986. ));
  987. }
  988. let txn = self.txn()?;
  989. let table = txn.open_table(KV_STORE_TABLE).map_err(Error::from)?;
  990. let mut keys = Vec::new();
  991. // Use range iterator for efficient lookup by namespace prefix
  992. // Range from (primary, secondary, "") to (primary, secondary, "\u{10FFFF}") to get all keys in this namespace
  993. let start = (primary_namespace, secondary_namespace, "");
  994. let end = (primary_namespace, secondary_namespace, "\u{10FFFF}");
  995. for result in table.range(start..=end).map_err(Error::from)? {
  996. let (key_tuple, _) = result.map_err(Error::from)?;
  997. let (_primary, _secondary, k) = key_tuple.value();
  998. keys.push(k.to_string());
  999. }
  1000. // Keys are already sorted by the B-tree structure
  1001. Ok(keys)
  1002. }
  1003. }
  1004. #[async_trait]
  1005. impl DbTransactionFinalizer for RedbWalletTransaction {
  1006. type Err = database::Error;
  1007. async fn commit(mut self: Box<Self>) -> Result<(), database::Error> {
  1008. if let Some(txn) = self.write_txn.take() {
  1009. txn.commit().map_err(Error::from)?;
  1010. }
  1011. Ok(())
  1012. }
  1013. async fn rollback(mut self: Box<Self>) -> Result<(), database::Error> {
  1014. if let Some(txn) = self.write_txn.take() {
  1015. txn.abort().map_err(Error::from)?;
  1016. }
  1017. Ok(())
  1018. }
  1019. }
  1020. impl Drop for RedbWalletTransaction {
  1021. fn drop(&mut self) {
  1022. if let Some(txn) = self.write_txn.take() {
  1023. let _ = txn.abort();
  1024. }
  1025. }
  1026. }