|
@@ -1,27 +1,33 @@
|
|
|
+//! SQLite storage layer for Verax
|
|
|
use crate::{
|
|
|
amount::AmountCents,
|
|
|
asset::AssetId,
|
|
|
+ changelog::Changelog,
|
|
|
+ payment::{self, SpentInfo},
|
|
|
storage::{Error, Storage},
|
|
|
- transaction::{from_db, Type},
|
|
|
+ transaction::{self, from_db, Type},
|
|
|
AccountId, Amount, Asset, AssetManager, Payment, PaymentId, Status, TransactionId,
|
|
|
};
|
|
|
use chrono::NaiveDateTime;
|
|
|
use futures::TryStreamExt;
|
|
|
-use sqlx::{sqlite::SqliteRow, Executor, Row};
|
|
|
-use std::{collections::HashMap, marker::PhantomData};
|
|
|
+use serde::{de::DeserializeOwned, Serialize};
|
|
|
+use sqlx::{pool::PoolConnection, sqlite::SqliteRow, Executor, Row, Sqlite, SqliteConnection};
|
|
|
+use std::{collections::HashMap, marker::PhantomData, ops::Deref};
|
|
|
|
|
|
mod batch;
|
|
|
|
|
|
pub use batch::Batch;
|
|
|
-pub use sqlx::sqlite::SqlitePoolOptions;
|
|
|
+pub use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions};
|
|
|
|
|
|
-pub struct Sqlite<'a> {
|
|
|
+/// SQLite storage layer for Verax
|
|
|
+pub struct SQLite<'a> {
|
|
|
db: sqlx::SqlitePool,
|
|
|
asset_manager: AssetManager,
|
|
|
_phantom: PhantomData<&'a ()>,
|
|
|
}
|
|
|
|
|
|
-impl<'a> Sqlite<'a> {
|
|
|
+impl<'a> SQLite<'a> {
|
|
|
+ /// Creates a new Verax SQLite storage layer. It takes an instance of the asset_manager
|
|
|
pub fn new(db: sqlx::SqlitePool, asset_manager: AssetManager) -> Self {
|
|
|
Self {
|
|
|
db,
|
|
@@ -30,6 +36,7 @@ impl<'a> Sqlite<'a> {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /// Creates all the tables and indexes that are needed
|
|
|
pub async fn setup(&self) -> Result<(), sqlx::Error> {
|
|
|
let mut x = self.db.begin().await?;
|
|
|
x.execute(
|
|
@@ -41,6 +48,7 @@ impl<'a> Sqlite<'a> {
|
|
|
"status" INTEGER NOT NULL,
|
|
|
"to" VARCHAR(71) NOT NULL,
|
|
|
"spent_by" TEXT,
|
|
|
+ "spent_by_status" INTEGER DEFAULT NULL,
|
|
|
"created_at" DATETIME DEFAULT CURRENT_TIMESTAMP,
|
|
|
"updated_at" DATETIME DEFAULT CURRENT_TIMESTAMP,
|
|
|
PRIMARY KEY ("transaction_id", "position_id")
|
|
@@ -51,6 +59,7 @@ impl<'a> Sqlite<'a> {
|
|
|
"status" INTEGER NOT NULL,
|
|
|
"type" INTEGER NOT NULL,
|
|
|
"reference" TEXT NOT NULL,
|
|
|
+ "last_version" BLOB NOT NULL,
|
|
|
"created_at" DATETIME DEFAULT CURRENT_TIMESTAMP,
|
|
|
"updated_at" DATETIME DEFAULT CURRENT_TIMESTAMP,
|
|
|
PRIMARY KEY ("transaction_id")
|
|
@@ -72,6 +81,15 @@ impl<'a> Sqlite<'a> {
|
|
|
"updated_at" DATETIME DEFAULT CURRENT_TIMESTAMP,
|
|
|
PRIMARY KEY ("transaction_id", "payment_transaction_id", "payment_position_id")
|
|
|
);
|
|
|
+ CREATE TABLE IF NOT EXISTS "changelog" (
|
|
|
+ "id" BLOB NOT NULL,
|
|
|
+ "object_id" BLOB NOT NULL,
|
|
|
+ "previous" BLOB DEFAULT NULL,
|
|
|
+ "change" BLOB NOT NULL,
|
|
|
+ "created_at" DATETIME DEFAULT CURRENT_TIMESTAMP,
|
|
|
+ PRIMARY KEY ("id")
|
|
|
+ );
|
|
|
+ CREATE INDEX IF NOT EXISTS "changelog_object_id" ON "changelog" ("object_id", "created_at");
|
|
|
"#,
|
|
|
)
|
|
|
.await
|
|
@@ -81,7 +99,46 @@ impl<'a> Sqlite<'a> {
|
|
|
}
|
|
|
|
|
|
#[inline]
|
|
|
- fn sql_row_to_payment(&self, row: SqliteRow) -> Result<Payment, Error> {
|
|
|
+ async fn get_changelogs_internal<T: DeserializeOwned + Serialize + Send + Sync>(
|
|
|
+ &self,
|
|
|
+ conn: &mut SqliteConnection,
|
|
|
+ object_id: Vec<u8>,
|
|
|
+ ) -> Result<Vec<Changelog<T>>, Error> {
|
|
|
+ let rows = sqlx::query(
|
|
|
+ r#"SELECT "previous", "change", "created_at", "id" FROM "changelog" WHERE "object_id" = ? ORDER BY "created_at" ASC"#,
|
|
|
+ ).bind(&object_id).fetch_all(&mut *conn).await.map_err(|e| Error::Storage(e.to_string()))?;
|
|
|
+
|
|
|
+ let mut results = vec![];
|
|
|
+
|
|
|
+ for row in rows.into_iter() {
|
|
|
+ let change: T = bincode::deserialize(
|
|
|
+ &row.try_get::<Vec<u8>, usize>(1)
|
|
|
+ .map_err(|_| Error::Storage("Invalid change content".to_string()))?,
|
|
|
+ )
|
|
|
+ .map_err(|e| Error::Storage(e.to_string()))?;
|
|
|
+ let created_at = row
|
|
|
+ .try_get::<NaiveDateTime, usize>(2)
|
|
|
+ .map_err(|e| Error::InvalidDate(e.to_string()))?
|
|
|
+ .and_utc();
|
|
|
+
|
|
|
+ results.push(Changelog::new_from_db(
|
|
|
+ row.try_get::<Option<Vec<u8>>, usize>(0)
|
|
|
+ .map_err(|_| Error::Storage("Invalid previous content".to_string()))?,
|
|
|
+ object_id.clone(),
|
|
|
+ change,
|
|
|
+ created_at,
|
|
|
+ ));
|
|
|
+ }
|
|
|
+
|
|
|
+ Ok(results)
|
|
|
+ }
|
|
|
+
|
|
|
+ #[inline]
|
|
|
+ async fn sql_row_to_payment(
|
|
|
+ &self,
|
|
|
+ conn: &mut SqliteConnection,
|
|
|
+ row: SqliteRow,
|
|
|
+ ) -> Result<Payment, Error> {
|
|
|
let id = PaymentId {
|
|
|
transaction: row
|
|
|
.try_get::<String, usize>(0)
|
|
@@ -101,6 +158,31 @@ impl<'a> Sqlite<'a> {
|
|
|
.map_err(|_| Error::Storage("Invalid cents".to_string()))?
|
|
|
.into();
|
|
|
|
|
|
+ let changelog = self
|
|
|
+ .get_changelogs_internal::<payment::ChangelogEntry>(conn, id.bytes().to_vec())
|
|
|
+ .await?;
|
|
|
+
|
|
|
+ let spent_by: Option<TransactionId> = row
|
|
|
+ .try_get::<Option<String>, usize>(6)
|
|
|
+ .map_err(|_| Error::Storage("Invalid spent_by".to_string()))?
|
|
|
+ .map(|s| s.as_str().try_into())
|
|
|
+ .transpose()
|
|
|
+ .map_err(|_| Error::Storage("Invalid spent_by".to_string()))?;
|
|
|
+
|
|
|
+ let spent_status = row
|
|
|
+ .try_get::<Option<u32>, usize>(7)
|
|
|
+ .map_err(|_| Error::Storage("Invalid spent_by_status".to_string()))?
|
|
|
+ .map(|status| Status::try_from(status))
|
|
|
+ .transpose()
|
|
|
+ .map_err(|_| Error::Storage("Invalid spent_by_status".to_string()))?;
|
|
|
+
|
|
|
+ if spent_by.is_some() != spent_status.is_some() {
|
|
|
+ panic!("{:?} {:?}", spent_by, spent_status);
|
|
|
+ return Err(Error::Storage(
|
|
|
+ "Invalid spent_by and spent_by_status combination".to_string(),
|
|
|
+ ));
|
|
|
+ }
|
|
|
+
|
|
|
Ok(Payment {
|
|
|
id,
|
|
|
amount: self
|
|
@@ -124,18 +206,17 @@ impl<'a> Sqlite<'a> {
|
|
|
.map_err(|_| Error::Storage("Invalid `status`".to_string()))?
|
|
|
.try_into()
|
|
|
.map_err(|_| Error::Storage("Invalid status".to_string()))?,
|
|
|
- spent_by: row
|
|
|
- .try_get::<Option<String>, usize>(6)
|
|
|
- .map_err(|_| Error::Storage("Invalid spent_by".to_string()))?
|
|
|
- .map(|s| s.as_str().try_into())
|
|
|
- .transpose()
|
|
|
- .map_err(|_| Error::Storage("Invalid spent_by".to_string()))?,
|
|
|
+ changelog,
|
|
|
+ spent: spent_by.map(|status| SpentInfo {
|
|
|
+ by: status,
|
|
|
+ status: spent_status.unwrap_or_default(),
|
|
|
+ }),
|
|
|
})
|
|
|
}
|
|
|
}
|
|
|
|
|
|
#[async_trait::async_trait]
|
|
|
-impl<'a> Storage<'a, Batch<'a>> for Sqlite<'a> {
|
|
|
+impl<'a> Storage<'a, Batch<'a>> for SQLite<'a> {
|
|
|
async fn begin(&'a self) -> Result<Batch<'a>, Error> {
|
|
|
self.db
|
|
|
.begin()
|
|
@@ -144,7 +225,7 @@ impl<'a> Storage<'a, Batch<'a>> for Sqlite<'a> {
|
|
|
.map_err(|x| Error::Storage(x.to_string()))
|
|
|
}
|
|
|
|
|
|
- async fn get_payment(&self, id: PaymentId) -> Result<Payment, Error> {
|
|
|
+ async fn get_payment(&self, id: &PaymentId) -> Result<Payment, Error> {
|
|
|
let mut conn = self
|
|
|
.db
|
|
|
.acquire()
|
|
@@ -160,7 +241,8 @@ impl<'a> Storage<'a, Batch<'a>> for Sqlite<'a> {
|
|
|
"p"."cents",
|
|
|
"p"."to",
|
|
|
"p"."status",
|
|
|
- "p"."spent_by"
|
|
|
+ "p"."spent_by",
|
|
|
+ "p"."spent_by_status"
|
|
|
FROM
|
|
|
"payments" "p"
|
|
|
WHERE
|
|
@@ -176,7 +258,7 @@ impl<'a> Storage<'a, Batch<'a>> for Sqlite<'a> {
|
|
|
.map_err(|e| Error::Storage(e.to_string()))?
|
|
|
.ok_or(Error::NotFound)?;
|
|
|
|
|
|
- self.sql_row_to_payment(row)
|
|
|
+ Ok(self.sql_row_to_payment(&mut *conn, row).await?)
|
|
|
}
|
|
|
|
|
|
async fn get_balance(&self, account: &AccountId) -> Result<Vec<Amount>, Error> {
|
|
@@ -249,7 +331,7 @@ impl<'a> Storage<'a, Batch<'a>> for Sqlite<'a> {
|
|
|
.acquire()
|
|
|
.await
|
|
|
.map_err(|e| Error::Storage(e.to_string()))?;
|
|
|
- let mut result = sqlx::query(
|
|
|
+ let results = sqlx::query(
|
|
|
r#"
|
|
|
SELECT
|
|
|
"p"."transaction_id",
|
|
@@ -258,7 +340,8 @@ impl<'a> Storage<'a, Batch<'a>> for Sqlite<'a> {
|
|
|
"p"."cents",
|
|
|
"p"."to",
|
|
|
"p"."status",
|
|
|
- "p"."spent_by"
|
|
|
+ "p"."spent_by",
|
|
|
+ "p"."spent_by_status"
|
|
|
FROM
|
|
|
"payments" as "p"
|
|
|
WHERE
|
|
@@ -269,15 +352,13 @@ impl<'a> Storage<'a, Batch<'a>> for Sqlite<'a> {
|
|
|
.bind(account.to_string())
|
|
|
.bind(asset.to_string())
|
|
|
.bind::<u32>(Status::Settled.into())
|
|
|
- .fetch(&mut *conn);
|
|
|
+ .fetch_all(&mut *conn)
|
|
|
+ .await
|
|
|
+ .map_err(|e| Error::Storage(e.to_string()))?;
|
|
|
|
|
|
let mut to_return = vec![];
|
|
|
- while let Some(row) = result
|
|
|
- .try_next()
|
|
|
- .await
|
|
|
- .map_err(|e| Error::Storage(e.to_string()))?
|
|
|
- {
|
|
|
- let row = self.sql_row_to_payment(row)?;
|
|
|
+ for row in results.into_iter() {
|
|
|
+ let row = self.sql_row_to_payment(&mut *conn, row).await?;
|
|
|
target_amount -= row.amount.cents();
|
|
|
to_return.push(row);
|
|
|
if target_amount <= 0 {
|
|
@@ -308,6 +389,7 @@ impl<'a> Storage<'a, Batch<'a>> for Sqlite<'a> {
|
|
|
"t"."status",
|
|
|
"t"."type",
|
|
|
"t"."reference",
|
|
|
+ "t"."last_version",
|
|
|
"t"."created_at",
|
|
|
"t"."updated_at"
|
|
|
FROM
|
|
@@ -322,7 +404,7 @@ impl<'a> Storage<'a, Batch<'a>> for Sqlite<'a> {
|
|
|
.map_err(|e| Error::Storage(e.to_string()))?
|
|
|
.ok_or(Error::NotFound)?;
|
|
|
|
|
|
- let mut spend_result = sqlx::query(
|
|
|
+ let results = sqlx::query(
|
|
|
r#"
|
|
|
SELECT
|
|
|
"p"."transaction_id",
|
|
@@ -331,7 +413,8 @@ impl<'a> Storage<'a, Batch<'a>> for Sqlite<'a> {
|
|
|
"p"."cents",
|
|
|
"p"."to",
|
|
|
"p"."status",
|
|
|
- "p"."spent_by"
|
|
|
+ "p"."spent_by",
|
|
|
+ "p"."spent_by_status"
|
|
|
FROM
|
|
|
"transaction_input_payments" "tp"
|
|
|
INNER JOIN
|
|
@@ -345,21 +428,16 @@ impl<'a> Storage<'a, Batch<'a>> for Sqlite<'a> {
|
|
|
"#,
|
|
|
)
|
|
|
.bind(transaction_id.to_string())
|
|
|
- .fetch(&mut *conn);
|
|
|
+ .fetch_all(&mut *conn)
|
|
|
+ .await
|
|
|
+ .map_err(|e| Error::Storage(e.to_string()))?;
|
|
|
|
|
|
let mut spend = vec![];
|
|
|
-
|
|
|
- while let Some(row) = spend_result
|
|
|
- .try_next()
|
|
|
- .await
|
|
|
- .map_err(|e| Error::Storage(e.to_string()))?
|
|
|
- {
|
|
|
- spend.push(self.sql_row_to_payment(row)?);
|
|
|
+ for row in results.into_iter() {
|
|
|
+ spend.push(self.sql_row_to_payment(&mut *conn, row).await?);
|
|
|
}
|
|
|
|
|
|
- drop(spend_result);
|
|
|
-
|
|
|
- let mut create_result = sqlx::query(
|
|
|
+ let results = sqlx::query(
|
|
|
r#"
|
|
|
SELECT
|
|
|
"p"."transaction_id",
|
|
@@ -368,7 +446,8 @@ impl<'a> Storage<'a, Batch<'a>> for Sqlite<'a> {
|
|
|
"p"."cents",
|
|
|
"p"."to",
|
|
|
"p"."status",
|
|
|
- "p"."spent_by"
|
|
|
+ "p"."spent_by",
|
|
|
+ "p"."spent_by_status"
|
|
|
FROM
|
|
|
"payments" "p"
|
|
|
WHERE
|
|
@@ -376,16 +455,14 @@ impl<'a> Storage<'a, Batch<'a>> for Sqlite<'a> {
|
|
|
"#,
|
|
|
)
|
|
|
.bind(transaction_id.to_string())
|
|
|
- .fetch(&mut *conn);
|
|
|
+ .fetch_all(&mut *conn)
|
|
|
+ .await
|
|
|
+ .map_err(|e| Error::Storage(e.to_string()))?;
|
|
|
|
|
|
let mut create = vec![];
|
|
|
|
|
|
- while let Some(row) = create_result
|
|
|
- .try_next()
|
|
|
- .await
|
|
|
- .map_err(|e| Error::Storage(e.to_string()))?
|
|
|
- {
|
|
|
- create.push(self.sql_row_to_payment(row)?);
|
|
|
+ for row in results.into_iter() {
|
|
|
+ create.push(self.sql_row_to_payment(&mut *conn, row).await?);
|
|
|
}
|
|
|
|
|
|
let status = transaction_row
|
|
@@ -403,21 +480,34 @@ impl<'a> Storage<'a, Batch<'a>> for Sqlite<'a> {
|
|
|
.try_get::<String, usize>(2)
|
|
|
.map_err(|_| Error::Storage("Invalid reference".to_string()))?;
|
|
|
|
|
|
+ let last_change = transaction_row
|
|
|
+ .try_get::<Vec<u8>, usize>(3)
|
|
|
+ .map_err(|_| Error::Storage("Invalid last_version".to_string()))?;
|
|
|
+
|
|
|
let created_at = transaction_row
|
|
|
- .try_get::<NaiveDateTime, usize>(3)
|
|
|
+ .try_get::<NaiveDateTime, usize>(4)
|
|
|
.map_err(|e| Error::InvalidDate(e.to_string()))?
|
|
|
.and_utc();
|
|
|
|
|
|
let updated_at = transaction_row
|
|
|
- .try_get::<NaiveDateTime, usize>(4)
|
|
|
+ .try_get::<NaiveDateTime, usize>(5)
|
|
|
.map_err(|e| Error::InvalidDate(e.to_string()))?
|
|
|
.and_utc();
|
|
|
|
|
|
+ let changelog = self
|
|
|
+ .get_changelogs_internal::<transaction::ChangelogEntry>(
|
|
|
+ &mut *conn,
|
|
|
+ transaction_id.deref().to_vec(),
|
|
|
+ )
|
|
|
+ .await?;
|
|
|
+
|
|
|
Ok(from_db::Transaction {
|
|
|
id: transaction_id.clone(),
|
|
|
spend,
|
|
|
create,
|
|
|
typ,
|
|
|
+ last_change,
|
|
|
+ changelog,
|
|
|
status,
|
|
|
reference,
|
|
|
created_at,
|
|
@@ -428,7 +518,7 @@ impl<'a> Storage<'a, Batch<'a>> for Sqlite<'a> {
|
|
|
async fn get_transactions(
|
|
|
&self,
|
|
|
account: &AccountId,
|
|
|
- typ: Option<Type>,
|
|
|
+ types: Vec<Type>,
|
|
|
) -> Result<Vec<from_db::Transaction>, Error> {
|
|
|
let mut conn = self
|
|
|
.db
|
|
@@ -436,14 +526,24 @@ impl<'a> Storage<'a, Batch<'a>> for Sqlite<'a> {
|
|
|
.await
|
|
|
.map_err(|e| Error::Storage(e.to_string()))?;
|
|
|
|
|
|
- let sql = sqlx::query(if typ.is_some() {
|
|
|
- r#"SELECT "transaction_id" FROM "transaction_accounts" WHERE "account_id" = ? AND "type" = ?" ORDER BY "created_at" DESC"#
|
|
|
+ let sql = if types.is_empty() {
|
|
|
+ r#"SELECT "transaction_id" FROM "transaction_accounts" WHERE "account_id" = ? ORDER BY "created_at" DESC"#.to_owned()
|
|
|
} else {
|
|
|
- r#"SELECT "transaction_id" FROM "transaction_accounts" WHERE "account_id" = ? ORDER BY "created_at" DESC"#
|
|
|
- }).bind(account.to_string());
|
|
|
+ let params = format!("?{}", ", ?".repeat(types.len() - 1));
|
|
|
+ format!(
|
|
|
+ r#"SELECT "transaction_id" FROM "transaction_accounts" WHERE "account_id" = ? AND "type" IN ({}) ORDER BY "created_at" DESC"#,
|
|
|
+ params
|
|
|
+ )
|
|
|
+ };
|
|
|
|
|
|
- let ids = if let Some(typ) = typ {
|
|
|
- sql.bind::<u32>(typ.into())
|
|
|
+ let sql = sqlx::query(&sql).bind(account.to_string());
|
|
|
+
|
|
|
+ let ids = if !types.is_empty() {
|
|
|
+ let mut sql = sql;
|
|
|
+ for typ in types.into_iter() {
|
|
|
+ sql = sql.bind::<u32>(typ.into());
|
|
|
+ }
|
|
|
+ sql
|
|
|
} else {
|
|
|
sql
|
|
|
}
|
|
@@ -471,4 +571,45 @@ impl<'a> Storage<'a, Batch<'a>> for Sqlite<'a> {
|
|
|
|
|
|
Ok(transactions)
|
|
|
}
|
|
|
+
|
|
|
+ async fn get_changelogs<T: DeserializeOwned + Serialize + Send + Sync>(
|
|
|
+ &self,
|
|
|
+ object_id: Vec<u8>,
|
|
|
+ ) -> Result<Vec<Changelog<T>>, Error> {
|
|
|
+ let mut conn = self
|
|
|
+ .db
|
|
|
+ .acquire()
|
|
|
+ .await
|
|
|
+ .map_err(|e| Error::Storage(e.to_string()))?;
|
|
|
+
|
|
|
+ self.get_changelogs_internal(&mut *conn, object_id).await
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+#[cfg(test)]
|
|
|
+mod test {
|
|
|
+ use super::*;
|
|
|
+ use crate::{storage_test_suite, AssetDefinition};
|
|
|
+ use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions};
|
|
|
+ use std::{fs::remove_file, path::Path};
|
|
|
+
|
|
|
+ async fn get_instance(assets: AssetManager, test: &str) -> SQLite<'static> {
|
|
|
+ let path = format!("/tmp/{}.db", test);
|
|
|
+ let _ = remove_file(&path);
|
|
|
+ let settings = path
|
|
|
+ .parse::<SqliteConnectOptions>()
|
|
|
+ .expect("valid settings")
|
|
|
+ .create_if_missing(true);
|
|
|
+
|
|
|
+ let pool = SqlitePoolOptions::new()
|
|
|
+ .connect_with(settings)
|
|
|
+ .await
|
|
|
+ .expect("pool");
|
|
|
+
|
|
|
+ let db = SQLite::new(pool, assets);
|
|
|
+ db.setup().await.expect("setup");
|
|
|
+ db
|
|
|
+ }
|
|
|
+
|
|
|
+ storage_test_suite!();
|
|
|
}
|