|
@@ -1,9 +1,10 @@
|
|
|
//! SQLite storage layer for Verax
|
|
|
+use super::{Cursor, PrimaryFilter, ReceivedPaymentStatus};
|
|
|
use crate::{
|
|
|
amount::AmountCents,
|
|
|
- storage::{Error, FilterId, Storage},
|
|
|
- transaction::{Revision, Type},
|
|
|
- AccountId, Amount, Asset, BaseTx, PaymentFrom, PaymentId, RevId, TxId,
|
|
|
+ storage::{Error, Storage},
|
|
|
+ transaction::Revision,
|
|
|
+ AccountId, Amount, Asset, BaseTx, Filter, PaymentFrom, PaymentId, RevId, TxId,
|
|
|
};
|
|
|
use borsh::from_slice;
|
|
|
use futures::TryStreamExt;
|
|
@@ -15,13 +16,238 @@ mod batch;
|
|
|
pub use batch::Batch;
|
|
|
pub use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions};
|
|
|
|
|
|
-use super::ReceivedPaymentStatus;
|
|
|
-
|
|
|
/// SQLite storage layer for Verax
|
|
|
pub struct SQLite {
|
|
|
db: sqlx::SqlitePool,
|
|
|
}
|
|
|
|
|
|
+/// Receives a cursor and does the initial loading of transactions that may match the requested
|
|
|
+/// filter.
|
|
|
+///
|
|
|
+/// The function will load the full transaction of candidates and return them.
|
|
|
+#[allow(warnings)]
|
|
|
+async fn find_candidates<'e, 'c: 'e, E>(
|
|
|
+ executor: E,
|
|
|
+ cursor: &mut Cursor,
|
|
|
+ limit: usize,
|
|
|
+) -> Result<Vec<(BaseTx, Revision)>, Error>
|
|
|
+where
|
|
|
+ E: sqlx::Executor<'c, Database = sqlx::Sqlite>,
|
|
|
+{
|
|
|
+ let since = if let Some(since) = cursor.filter.since {
|
|
|
+ format!("AND created_at <= {}", since.timestamp())
|
|
|
+ } else {
|
|
|
+ "".to_owned()
|
|
|
+ };
|
|
|
+ let until = if let Some(until) = cursor.filter.until {
|
|
|
+ format!("AND created_at >= {}", until.timestamp())
|
|
|
+ } else {
|
|
|
+ "".to_owned()
|
|
|
+ };
|
|
|
+
|
|
|
+ let rows = match &cursor.primary_filter {
|
|
|
+ PrimaryFilter::TxId(ids) => {
|
|
|
+ let sql = format!(
|
|
|
+ r#"
|
|
|
+ SELECT
|
|
|
+ "bt"."blob",
|
|
|
+ "b"."blob"
|
|
|
+ FROM
|
|
|
+ "transactions" as "t",
|
|
|
+ "base_transactions" as "bt",
|
|
|
+ "revisions" as "b"
|
|
|
+ WHERE
|
|
|
+ "t"."transaction_id" IN ({})
|
|
|
+ AND "t"."revision_id" = "b"."revision_id"
|
|
|
+ AND "t"."transaction_id" = "bt"."transaction_id"
|
|
|
+ {} {}
|
|
|
+ ORDER BY "t"."created_at" DESC
|
|
|
+ LIMIT {} OFFSET {}
|
|
|
+ "#,
|
|
|
+ "?,".repeat(ids.len()).trim_end_matches(","),
|
|
|
+ since,
|
|
|
+ until,
|
|
|
+ limit,
|
|
|
+ cursor.filter.skip
|
|
|
+ );
|
|
|
+ let mut query = sqlx::query(&sql);
|
|
|
+ for id in ids.iter() {
|
|
|
+ query = query.bind(id.to_string());
|
|
|
+ }
|
|
|
+ query
|
|
|
+ .bind(limit as i64)
|
|
|
+ .bind(cursor.filter.skip as i64)
|
|
|
+ .fetch_all(executor)
|
|
|
+ .await
|
|
|
+ }
|
|
|
+ PrimaryFilter::Revision(rev_ids) => {
|
|
|
+ let sql = format!(
|
|
|
+ r#"
|
|
|
+ SELECT
|
|
|
+ "bt"."blob",
|
|
|
+ "b"."blob"
|
|
|
+ FROM
|
|
|
+ "base_transactions" as "bt",
|
|
|
+ "revisions" as "b"
|
|
|
+ WHERE
|
|
|
+ "b"."revision_id" IN ({})
|
|
|
+ AND "b"."transaction_id" = "bt"."transaction_id"
|
|
|
+ {} {}
|
|
|
+ ORDER BY "b"."created_at" DESC
|
|
|
+ LIMIT {} OFFSET {}
|
|
|
+ "#,
|
|
|
+ "?,".repeat(rev_ids.len()).trim_end_matches(","),
|
|
|
+ since,
|
|
|
+ until,
|
|
|
+ limit,
|
|
|
+ cursor.filter.skip
|
|
|
+ );
|
|
|
+ let mut query = sqlx::query(&sql);
|
|
|
+ for id in rev_ids.iter() {
|
|
|
+ query = query.bind(id.to_string());
|
|
|
+ }
|
|
|
+ query.fetch_all(executor).await
|
|
|
+ }
|
|
|
+ PrimaryFilter::Account(account_ids) => {
|
|
|
+ let sql = format!(
|
|
|
+ r#"
|
|
|
+ SELECT
|
|
|
+ "bt"."blob",
|
|
|
+ "b"."blob"
|
|
|
+ FROM
|
|
|
+ "transaction_accounts" as "ta",
|
|
|
+ "transactions" as "t",
|
|
|
+ "base_transactions" as "bt",
|
|
|
+ "revisions" as "b"
|
|
|
+ WHERE
|
|
|
+ "ta"."account_id" IN ({})
|
|
|
+ AND "t"."transaction_id" = "ta"."transaction_id"
|
|
|
+ AND "t"."revision_id" = "b"."revision_id"
|
|
|
+ AND "t"."transaction_id" = "bt"."transaction_id"
|
|
|
+ {} {}
|
|
|
+ ORDER BY "ta"."created_at" DESC
|
|
|
+ LIMIT {} OFFSET {}
|
|
|
+ "#,
|
|
|
+ "?,".repeat(account_ids.len()).trim_end_matches(","),
|
|
|
+ since,
|
|
|
+ until,
|
|
|
+ limit,
|
|
|
+ cursor.filter.skip
|
|
|
+ );
|
|
|
+ let mut query = sqlx::query(&sql);
|
|
|
+ for id in account_ids.iter() {
|
|
|
+ query = query.bind(id.to_string());
|
|
|
+ }
|
|
|
+ query.fetch_all(executor).await
|
|
|
+ }
|
|
|
+ PrimaryFilter::Type(types) => {
|
|
|
+ let sql = format!(
|
|
|
+ r#"
|
|
|
+ SELECT
|
|
|
+ "bt"."blob",
|
|
|
+ "b"."blob"
|
|
|
+ FROM
|
|
|
+ "transactions" as "t",
|
|
|
+ "base_transactions" as "bt",
|
|
|
+ "revisions" as "b"
|
|
|
+ WHERE
|
|
|
+ "bt"."type" IN ({})
|
|
|
+ AND "t"."revision_id" = "b"."revision_id"
|
|
|
+ AND "t"."transaction_id" = "bt"."transaction_id"
|
|
|
+ {} {}
|
|
|
+ ORDER BY "bt"."created_at" DESC
|
|
|
+ LIMIT {} OFFSET {}
|
|
|
+ "#,
|
|
|
+ "?,".repeat(types.len()).trim_end_matches(","),
|
|
|
+ since,
|
|
|
+ until,
|
|
|
+ limit,
|
|
|
+ cursor.filter.skip
|
|
|
+ );
|
|
|
+ let mut query = sqlx::query(&sql);
|
|
|
+ for id in types.iter() {
|
|
|
+ query = query.bind(id.to_string());
|
|
|
+ }
|
|
|
+ query.fetch_all(executor).await
|
|
|
+ }
|
|
|
+ PrimaryFilter::Tags(tags) => {
|
|
|
+ let sql = format!(
|
|
|
+ r#"
|
|
|
+ SELECT
|
|
|
+ "bt"."blob",
|
|
|
+ "b"."blob"
|
|
|
+ FROM
|
|
|
+ "transactions" as "t",
|
|
|
+ "base_transactions" as "bt",
|
|
|
+ "revisions" as "b"
|
|
|
+ WHERE
|
|
|
+ "t"."transaction_id" IN (SELECT DISTINCT "transaction_id" FROM "transactions_by_tags" WHERE "tag" IN ({}) ORDER BY "created_at" DESC)
|
|
|
+ AND "t"."revision_id" = "b"."revision_id"
|
|
|
+ AND "t"."transaction_id" = "bt"."transaction_id"
|
|
|
+ {} {}
|
|
|
+ ORDER BY "t"."created_at" DESC
|
|
|
+ LIMIT {} OFFSET {}
|
|
|
+ "#,
|
|
|
+ "?,".repeat(tags.len()).trim_end_matches(","),
|
|
|
+ since,
|
|
|
+ until,
|
|
|
+ limit,
|
|
|
+ cursor.filter.skip
|
|
|
+ );
|
|
|
+
|
|
|
+ let mut query = sqlx::query(&sql);
|
|
|
+ for id in tags.iter() {
|
|
|
+ query = query.bind(id.to_string());
|
|
|
+ }
|
|
|
+ query.fetch_all(executor).await
|
|
|
+ }
|
|
|
+ PrimaryFilter::Stream => {
|
|
|
+ let sql = format!(
|
|
|
+ r#"
|
|
|
+ SELECT
|
|
|
+ "bt"."blob",
|
|
|
+ "b"."blob"
|
|
|
+ FROM
|
|
|
+ "transactions" as "t",
|
|
|
+ "base_transactions" as "bt",
|
|
|
+ "revisions" as "b"
|
|
|
+ WHERE
|
|
|
+ "t"."revision_id" = "b"."revision_id"
|
|
|
+ AND "t"."transaction_id" = "bt"."transaction_id"
|
|
|
+ {} {}
|
|
|
+ ORDER BY "t"."created_at" DESC
|
|
|
+ LIMIT {} OFFSET {}
|
|
|
+ "#,
|
|
|
+ since, until, limit, cursor.filter.skip
|
|
|
+ );
|
|
|
+ sqlx::query(&sql).fetch_all(executor).await
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ let results = rows
|
|
|
+ .map_err(|e| Error::Storage(e.to_string()))?
|
|
|
+ .into_iter()
|
|
|
+ .map(|row| {
|
|
|
+ let base_tx = row
|
|
|
+ .try_get::<Vec<u8>, usize>(0)
|
|
|
+ .map_err(|e| Error::Storage(e.to_string()))?;
|
|
|
+
|
|
|
+ let revision = row
|
|
|
+ .try_get::<Vec<u8>, usize>(1)
|
|
|
+ .map_err(|e| Error::Storage(e.to_string()))?;
|
|
|
+
|
|
|
+ Ok((
|
|
|
+ from_slice::<BaseTx>(&base_tx)?,
|
|
|
+ from_slice::<Revision>(&revision)?,
|
|
|
+ ))
|
|
|
+ })
|
|
|
+ .collect::<Result<Vec<_>, Error>>()?;
|
|
|
+
|
|
|
+ cursor.filter.skip += results.len();
|
|
|
+
|
|
|
+ Ok(results)
|
|
|
+}
|
|
|
+
|
|
|
impl SQLite {
|
|
|
/// Creates a new Verax SQLite storage layer. It takes an instance of the asset_manager
|
|
|
pub fn new(db: sqlx::SqlitePool) -> Self {
|
|
@@ -59,6 +285,13 @@ impl SQLite {
|
|
|
"blob" BINARY NOT NULL,
|
|
|
"created_at" DATETIME DEFAULT CURRENT_TIMESTAMP
|
|
|
);
|
|
|
+ CREATE TABLE IF NOT EXISTS "transactions_by_tags" (
|
|
|
+ "transaction_id" VARCHAR(67),
|
|
|
+ "tag" VARCHAR(250) NOT NULL,
|
|
|
+ "created_at" DATETIME DEFAULT CURRENT_TIMESTAMP,
|
|
|
+ PRIMARY KEY ("tag", "created_at", "transaction_id")
|
|
|
+ );
|
|
|
+ CREATE INDEX IF NOT EXISTS "transaction_id_and_tags" ON "transactions_by_tags" ("transaction_id");
|
|
|
CREATE TABLE IF NOT EXISTS "payments" (
|
|
|
"payment_id" VARCHAR(80) NOT NULL PRIMARY KEY,
|
|
|
"to" VARCHAR(64) NOT NULL,
|
|
@@ -305,156 +538,66 @@ impl Storage for SQLite {
|
|
|
|
|
|
Ok(sqlx::query(
|
|
|
r#"
|
|
|
- SELECT "revision_id" FROM "revisions" WHERE "transaction_id" = ? ORDER BY "created_at" ASC
|
|
|
+ SELECT
|
|
|
+ "revision_id"
|
|
|
+ FROM
|
|
|
+ "revisions"
|
|
|
+ WHERE
|
|
|
+ "transaction_id" = ?
|
|
|
+ ORDER BY "created_at" ASC
|
|
|
"#,
|
|
|
)
|
|
|
.bind(transaction_id.to_string())
|
|
|
.fetch_all(&mut *conn)
|
|
|
.await
|
|
|
.map_err(|e| Error::Storage(e.to_string()))?
|
|
|
- .into_iter().map(|x| {
|
|
|
- x.try_get::<String, usize>(0)
|
|
|
+ .into_iter()
|
|
|
+ .map(|x| {
|
|
|
+ x.try_get::<String, usize>(0)
|
|
|
.map(|x| RevId::from_str(&x))
|
|
|
.map_err(|e| Error::Storage(e.to_string()))
|
|
|
- }).collect::<Result<Result<Vec<_>, _>, _>>()??)
|
|
|
+ })
|
|
|
+ .collect::<Result<Result<Vec<_>, _>, _>>()??)
|
|
|
}
|
|
|
|
|
|
- async fn get_transaction_and_revision(
|
|
|
+ async fn find_base_tx_and_revision(
|
|
|
&self,
|
|
|
- id: FilterId,
|
|
|
- ) -> Result<(BaseTx, Revision), Error> {
|
|
|
+ filter: Filter,
|
|
|
+ ) -> Result<Vec<(BaseTx, Revision)>, Error> {
|
|
|
let mut conn = self
|
|
|
.db
|
|
|
.acquire()
|
|
|
.await
|
|
|
.map_err(|e| Error::Storage(e.to_string()))?;
|
|
|
- let row = sqlx::query(match id {
|
|
|
- FilterId::LatestRevision(_) => {
|
|
|
- r#"
|
|
|
- SELECT
|
|
|
- "bt"."blob",
|
|
|
- "b"."blob"
|
|
|
- FROM
|
|
|
- "transactions" as "t",
|
|
|
- "base_transactions" as "bt",
|
|
|
- "revisions" as "b"
|
|
|
- WHERE
|
|
|
- "t"."transaction_id" = ?
|
|
|
- AND "t"."revision_id" = "b"."revision_id"
|
|
|
- AND "t"."transaction_id" = "bt"."transaction_id"
|
|
|
- "#
|
|
|
- }
|
|
|
- FilterId::RevisionId(_) => {
|
|
|
- r#"
|
|
|
- SELECT
|
|
|
- "bt"."blob",
|
|
|
- "b"."blob"
|
|
|
- FROM
|
|
|
- "base_transactions" as "bt",
|
|
|
- "revisions" as "b"
|
|
|
- WHERE
|
|
|
- "b"."revision_id" = ?
|
|
|
- AND "b"."transaction_id" = "bt"."transaction_id"
|
|
|
- "#
|
|
|
- }
|
|
|
- })
|
|
|
- .bind(id.to_string())
|
|
|
- .fetch_one(&mut *conn)
|
|
|
- .await
|
|
|
- .map_err(|e| Error::Storage(e.to_string()))?;
|
|
|
|
|
|
- let transaction = row
|
|
|
- .try_get::<Vec<u8>, usize>(0)
|
|
|
- .map_err(|e| Error::Storage(e.to_string()))?;
|
|
|
+ let mut cursor: Cursor = filter.into();
|
|
|
|
|
|
- let revision = row
|
|
|
- .try_get::<Vec<u8>, usize>(1)
|
|
|
- .map_err(|e| Error::Storage(e.to_string()))?;
|
|
|
+ let candidate_limits = 20;
|
|
|
+ let mut results = vec![];
|
|
|
|
|
|
- Ok((
|
|
|
- from_slice::<BaseTx>(&transaction)?,
|
|
|
- from_slice::<Revision>(&revision)?,
|
|
|
- ))
|
|
|
- }
|
|
|
-
|
|
|
- async fn get_transactions_with_latest_revision(
|
|
|
- &self,
|
|
|
- account: &AccountId,
|
|
|
- types: &[Type],
|
|
|
- _tags: &[String],
|
|
|
- ) -> Result<Vec<(BaseTx, Revision)>, Error> {
|
|
|
- let mut conn = self
|
|
|
- .db
|
|
|
- .acquire()
|
|
|
- .await
|
|
|
- .map_err(|e| Error::Storage(e.to_string()))?;
|
|
|
+ loop {
|
|
|
+ let candidates = find_candidates(&mut *conn, &mut cursor, candidate_limits).await?;
|
|
|
+ // if there are fewer candidates than requested it means there are no more resultset
|
|
|
+ let no_more_candidates = candidates.len() != candidate_limits;
|
|
|
|
|
|
- let sql = if types.is_empty() {
|
|
|
- r#"SELECT
|
|
|
- "t"."transaction_id",
|
|
|
- "t"."revision_id",
|
|
|
- "bt"."blob",
|
|
|
- "b"."blob"
|
|
|
- FROM
|
|
|
- "transaction_accounts" as "ta",
|
|
|
- "base_transactions" as "bt",
|
|
|
- "transactions" as "t",
|
|
|
- "revisions" as "b"
|
|
|
- WHERE
|
|
|
- "ta"."account_id" = ?
|
|
|
- AND "t"."transaction_id" = "ta"."transaction_id"
|
|
|
- AND "t"."revision_id" = "b"."revision_id"
|
|
|
- AND "t"."transaction_id" = "bt"."transaction_id"
|
|
|
- ORDER BY "ta"."id" DESC"#
|
|
|
- .to_owned()
|
|
|
- } else {
|
|
|
- let types = types
|
|
|
+ let mut iteration_result = candidates
|
|
|
.into_iter()
|
|
|
- .map(|t| u32::from(t).to_string())
|
|
|
- .collect::<Vec<_>>()
|
|
|
- .join(",");
|
|
|
- format!(
|
|
|
- r#"SELECT
|
|
|
- "t"."transaction_id",
|
|
|
- "t"."revision_id",
|
|
|
- "bt"."blob",
|
|
|
- "b"."blob"
|
|
|
- FROM
|
|
|
- "transaction_accounts" as "ta",
|
|
|
- "base_transactions" as "bt",
|
|
|
- "transactions" as "t",
|
|
|
- "revisions" as "b"
|
|
|
- WHERE
|
|
|
- "account_id" = ?
|
|
|
- AND "t"."transaction_id" = "ta"."transaction_id"
|
|
|
- AND "t"."revision_id" = "b"."revision_id"
|
|
|
- AND "t"."transaction_id" = "bt"."transaction_id"
|
|
|
- AND "ta"."type" IN ({types})
|
|
|
- ORDER BY "ta"."id" DESC"#,
|
|
|
- )
|
|
|
- };
|
|
|
-
|
|
|
- sqlx::query(&sql)
|
|
|
- .bind(account.to_string())
|
|
|
- .fetch_all(&mut *conn)
|
|
|
- .await
|
|
|
- .map_err(|e| Error::Storage(e.to_string()))?
|
|
|
- .into_iter()
|
|
|
- .map(|row| {
|
|
|
- let base_tx = row
|
|
|
- .try_get::<Vec<u8>, usize>(2)
|
|
|
- .map_err(|e| Error::Storage(e.to_string()))?;
|
|
|
-
|
|
|
- let revision = row
|
|
|
- .try_get::<Vec<u8>, usize>(3)
|
|
|
- .map_err(|e| Error::Storage(e.to_string()))?;
|
|
|
-
|
|
|
- Ok((
|
|
|
- from_slice::<BaseTx>(&base_tx)?,
|
|
|
- from_slice::<Revision>(&revision)?,
|
|
|
- ))
|
|
|
- })
|
|
|
- .collect::<Result<Vec<_>, _>>()
|
|
|
+ .filter(|(base, revision)| cursor.matches(base, revision))
|
|
|
+ .collect::<Vec<_>>();
|
|
|
+
|
|
|
+ results.append(&mut iteration_result);
|
|
|
+
|
|
|
+ if no_more_candidates
|
|
|
+ || (cursor.filter.limit != 0 && cursor.filter.limit >= results.len())
|
|
|
+ {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if cursor.filter.limit != 0 {
|
|
|
+ results.truncate(cursor.filter.limit);
|
|
|
+ }
|
|
|
+ Ok(results)
|
|
|
}
|
|
|
}
|
|
|
|