|
@@ -36,25 +36,18 @@ impl SqlStore {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
/// Run database migrations. Idempotent: a `_migrations` ledger records what
|
|
/// Run database migrations. Idempotent: a `_migrations` ledger records what
|
|
|
- /// has been applied, so re-running is a no-op. The DDL is selected per
|
|
|
|
|
- /// backend (SQLite uses `BLOB`, PostgreSQL uses `BYTEA`).
|
|
|
|
|
|
|
+ /// has been applied, so re-running is a no-op. Every column is a text type,
|
|
|
|
|
+ /// so the store holds no opaque binary and the DDL is identical for both
|
|
|
|
|
+ /// backends. Content-addressed ids and opaque saga bytes are stored as hex
|
|
|
|
|
+ /// `TEXT`, and JSON payloads as their `TEXT` serialization, keeping every
|
|
|
|
|
+ /// row legible for auditing.
|
|
|
pub async fn migrate(&self) -> Result<(), StoreError> {
|
|
pub async fn migrate(&self) -> Result<(), StoreError> {
|
|
|
- // Detect the backend. `sqlite_version()` exists only on SQLite.
|
|
|
|
|
- let is_sqlite = sqlx::query("SELECT sqlite_version()")
|
|
|
|
|
- .fetch_optional(&self.pool)
|
|
|
|
|
- .await
|
|
|
|
|
- .is_ok();
|
|
|
|
|
-
|
|
|
|
|
sqlx::query("CREATE TABLE IF NOT EXISTS _migrations (name TEXT PRIMARY KEY)")
|
|
sqlx::query("CREATE TABLE IF NOT EXISTS _migrations (name TEXT PRIMARY KEY)")
|
|
|
.execute(&self.pool)
|
|
.execute(&self.pool)
|
|
|
.await
|
|
.await
|
|
|
.map_err(|e| StoreError::Internal(e.to_string()))?;
|
|
.map_err(|e| StoreError::Internal(e.to_string()))?;
|
|
|
|
|
|
|
|
- let migrations: &[(&str, &str)] = if is_sqlite {
|
|
|
|
|
- &[("001_init", include_str!("migrations/sqlite/001_init.sql"))]
|
|
|
|
|
- } else {
|
|
|
|
|
- &[("001_init", include_str!("migrations/postgres/001_init.sql"))]
|
|
|
|
|
- };
|
|
|
|
|
|
|
+ let migrations: &[(&str, &str)] = &[("001_init", include_str!("migrations/001_init.sql"))];
|
|
|
|
|
|
|
|
for (name, sql) in migrations {
|
|
for (name, sql) in migrations {
|
|
|
let applied = sqlx::query("SELECT 1 FROM _migrations WHERE name = $1")
|
|
let applied = sqlx::query("SELECT 1 FROM _migrations WHERE name = $1")
|
|
@@ -99,12 +92,51 @@ fn deserialize_policy(s: &str) -> Result<AccountPolicy, StoreError> {
|
|
|
serde_json::from_str(s).map_err(|e| StoreError::Internal(format!("bad policy: {e}")))
|
|
serde_json::from_str(s).map_err(|e| StoreError::Internal(format!("bad policy: {e}")))
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-fn serialize_blob<T: serde::Serialize>(val: &T) -> Result<Vec<u8>, StoreError> {
|
|
|
|
|
- serde_json::to_vec(val).map_err(|e| StoreError::Internal(format!("blob serialization: {e}")))
|
|
|
|
|
|
|
+/// Serialize a value to a JSON string. Payload columns store JSON as `TEXT` so
|
|
|
|
|
+/// the database is directly readable for auditing; the ledger never queries
|
|
|
|
|
+/// into the JSON, so no binary or indexed representation is needed.
|
|
|
|
|
+fn serialize_json<T: serde::Serialize>(val: &T) -> Result<String, StoreError> {
|
|
|
|
|
+ serde_json::to_string(val).map_err(|e| StoreError::Internal(format!("json serialization: {e}")))
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-fn deserialize_blob<T: serde::de::DeserializeOwned>(bytes: &[u8]) -> Result<T, StoreError> {
|
|
|
|
|
- serde_json::from_slice(bytes).map_err(|e| StoreError::Internal(format!("bad blob: {e}")))
|
|
|
|
|
|
|
+fn deserialize_json<T: serde::de::DeserializeOwned>(s: &str) -> Result<T, StoreError> {
|
|
|
|
|
+ serde_json::from_str(s).map_err(|e| StoreError::Internal(format!("bad json: {e}")))
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+/// Lower-case hex encoding. Binary identifiers (content-addressed hashes) and
|
|
|
|
|
+/// opaque saga bytes are stored as hex `TEXT` so a row is legible in any SQL
|
|
|
|
|
+/// client and matches the hex form used in logs and `Debug` output.
|
|
|
|
|
+fn to_hex(bytes: &[u8]) -> String {
|
|
|
|
|
+ let mut s = String::with_capacity(bytes.len() * 2);
|
|
|
|
|
+ for b in bytes {
|
|
|
|
|
+ s.push_str(&format!("{b:02x}"));
|
|
|
|
|
+ }
|
|
|
|
|
+ s
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+fn from_hex(s: &str) -> Result<Vec<u8>, StoreError> {
|
|
|
|
|
+ if s.len() % 2 != 0 {
|
|
|
|
|
+ return Err(StoreError::Internal(format!("odd-length hex: {s:?}")));
|
|
|
|
|
+ }
|
|
|
|
|
+ (0..s.len())
|
|
|
|
|
+ .step_by(2)
|
|
|
|
|
+ .map(|i| {
|
|
|
|
|
+ u8::from_str_radix(&s[i..i + 2], 16)
|
|
|
|
|
+ .map_err(|e| StoreError::Internal(format!("bad hex: {e}")))
|
|
|
|
|
+ })
|
|
|
|
|
+ .collect()
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+fn envelope_id_to_hex(id: &EnvelopeId) -> String {
|
|
|
|
|
+ to_hex(&id.0)
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+fn envelope_id_from_hex(s: &str) -> Result<EnvelopeId, StoreError> {
|
|
|
|
|
+ let bytes = from_hex(s)?;
|
|
|
|
|
+ let arr: [u8; 32] = bytes.as_slice().try_into().map_err(|_| {
|
|
|
|
|
+ StoreError::Internal(format!("expected 32-byte id, got {} bytes", bytes.len()))
|
|
|
|
|
+ })?;
|
|
|
|
|
+ Ok(EnvelopeId(arr))
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
fn status_to_i16(s: PostingStatus) -> i16 {
|
|
fn status_to_i16(s: PostingStatus) -> i16 {
|
|
@@ -140,10 +172,10 @@ fn row_to_account(row: &sqlx::any::AnyRow) -> Result<Account, StoreError> {
|
|
|
let book: i64 = row
|
|
let book: i64 = row
|
|
|
.try_get("book")
|
|
.try_get("book")
|
|
|
.map_err(|e| StoreError::Internal(e.to_string()))?;
|
|
.map_err(|e| StoreError::Internal(e.to_string()))?;
|
|
|
- let user_data_bytes: Vec<u8> = row
|
|
|
|
|
|
|
+ let user_data_json: String = row
|
|
|
.try_get("user_data")
|
|
.try_get("user_data")
|
|
|
.map_err(|e| StoreError::Internal(e.to_string()))?;
|
|
.map_err(|e| StoreError::Internal(e.to_string()))?;
|
|
|
- let metadata_bytes: Vec<u8> = row
|
|
|
|
|
|
|
+ let metadata_json: String = row
|
|
|
.try_get("metadata")
|
|
.try_get("metadata")
|
|
|
.map_err(|e| StoreError::Internal(e.to_string()))?;
|
|
.map_err(|e| StoreError::Internal(e.to_string()))?;
|
|
|
|
|
|
|
@@ -153,13 +185,13 @@ fn row_to_account(row: &sqlx::any::AnyRow) -> Result<Account, StoreError> {
|
|
|
policy: deserialize_policy(&policy_str)?,
|
|
policy: deserialize_policy(&policy_str)?,
|
|
|
flags: AccountFlags::from_bits_truncate(flags_bits as u32),
|
|
flags: AccountFlags::from_bits_truncate(flags_bits as u32),
|
|
|
book: BookId::new(book),
|
|
book: BookId::new(book),
|
|
|
- user_data: deserialize_blob(&user_data_bytes)?,
|
|
|
|
|
- metadata: deserialize_blob(&metadata_bytes)?,
|
|
|
|
|
|
|
+ user_data: deserialize_json(&user_data_json)?,
|
|
|
|
|
+ metadata: deserialize_json(&metadata_json)?,
|
|
|
})
|
|
})
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
fn row_to_posting(row: &sqlx::any::AnyRow) -> Result<Posting, StoreError> {
|
|
fn row_to_posting(row: &sqlx::any::AnyRow) -> Result<Posting, StoreError> {
|
|
|
- let transfer_id: Vec<u8> = row
|
|
|
|
|
|
|
+ let transfer_id: String = row
|
|
|
.try_get("transfer_id")
|
|
.try_get("transfer_id")
|
|
|
.map_err(|e| StoreError::Internal(e.to_string()))?;
|
|
.map_err(|e| StoreError::Internal(e.to_string()))?;
|
|
|
let idx: i16 = row
|
|
let idx: i16 = row
|
|
@@ -182,12 +214,9 @@ fn row_to_posting(row: &sqlx::any::AnyRow) -> Result<Posting, StoreError> {
|
|
|
.try_get("reservation")
|
|
.try_get("reservation")
|
|
|
.map_err(|e| StoreError::Internal(e.to_string()))?;
|
|
.map_err(|e| StoreError::Internal(e.to_string()))?;
|
|
|
|
|
|
|
|
- let mut tid = [0u8; 32];
|
|
|
|
|
- tid.copy_from_slice(&transfer_id);
|
|
|
|
|
-
|
|
|
|
|
Ok(Posting {
|
|
Ok(Posting {
|
|
|
id: PostingId {
|
|
id: PostingId {
|
|
|
- transfer: EnvelopeId(tid),
|
|
|
|
|
|
|
+ transfer: envelope_id_from_hex(&transfer_id)?,
|
|
|
index: idx as u16,
|
|
index: idx as u16,
|
|
|
},
|
|
},
|
|
|
owner: AccountId::new(owner),
|
|
owner: AccountId::new(owner),
|
|
@@ -243,8 +272,8 @@ impl AccountStore for SqlStore {
|
|
|
.bind(serialize_policy(&account.policy)?)
|
|
.bind(serialize_policy(&account.policy)?)
|
|
|
.bind(account.flags.bits() as i32)
|
|
.bind(account.flags.bits() as i32)
|
|
|
.bind(account.book.0)
|
|
.bind(account.book.0)
|
|
|
- .bind(serialize_blob(&account.user_data)?)
|
|
|
|
|
- .bind(serialize_blob(&account.metadata)?)
|
|
|
|
|
|
|
+ .bind(serialize_json(&account.user_data)?)
|
|
|
|
|
+ .bind(serialize_json(&account.metadata)?)
|
|
|
.execute(&self.pool)
|
|
.execute(&self.pool)
|
|
|
.await
|
|
.await
|
|
|
.map_err(|e| StoreError::Internal(e.to_string()))?;
|
|
.map_err(|e| StoreError::Internal(e.to_string()))?;
|
|
@@ -283,8 +312,8 @@ impl AccountStore for SqlStore {
|
|
|
.bind(serialize_policy(&account.policy)?)
|
|
.bind(serialize_policy(&account.policy)?)
|
|
|
.bind(account.flags.bits() as i32)
|
|
.bind(account.flags.bits() as i32)
|
|
|
.bind(account.book.0)
|
|
.bind(account.book.0)
|
|
|
- .bind(serialize_blob(&account.user_data)?)
|
|
|
|
|
- .bind(serialize_blob(&account.metadata)?)
|
|
|
|
|
|
|
+ .bind(serialize_json(&account.user_data)?)
|
|
|
|
|
+ .bind(serialize_json(&account.metadata)?)
|
|
|
.execute(&self.pool)
|
|
.execute(&self.pool)
|
|
|
.await
|
|
.await
|
|
|
.map_err(|e| StoreError::Internal(e.to_string()))?;
|
|
.map_err(|e| StoreError::Internal(e.to_string()))?;
|
|
@@ -325,7 +354,7 @@ impl PostingStore for SqlStore {
|
|
|
let mut result = Vec::with_capacity(ids.len());
|
|
let mut result = Vec::with_capacity(ids.len());
|
|
|
for id in ids {
|
|
for id in ids {
|
|
|
let row = sqlx::query("SELECT * FROM postings WHERE transfer_id = $1 AND idx = $2")
|
|
let row = sqlx::query("SELECT * FROM postings WHERE transfer_id = $1 AND idx = $2")
|
|
|
- .bind(id.transfer.0.as_slice())
|
|
|
|
|
|
|
+ .bind(envelope_id_to_hex(&id.transfer))
|
|
|
.bind(id.index as i16)
|
|
.bind(id.index as i16)
|
|
|
.fetch_optional(&self.pool)
|
|
.fetch_optional(&self.pool)
|
|
|
.await
|
|
.await
|
|
@@ -453,7 +482,7 @@ impl PostingStore for SqlStore {
|
|
|
)
|
|
)
|
|
|
.bind(status_to_i16(PostingStatus::PendingInactive))
|
|
.bind(status_to_i16(PostingStatus::PendingInactive))
|
|
|
.bind(reservation.0)
|
|
.bind(reservation.0)
|
|
|
- .bind(id.transfer.0.as_slice())
|
|
|
|
|
|
|
+ .bind(envelope_id_to_hex(&id.transfer))
|
|
|
.bind(id.index as i16)
|
|
.bind(id.index as i16)
|
|
|
.bind(status_to_i16(PostingStatus::Active))
|
|
.bind(status_to_i16(PostingStatus::Active))
|
|
|
.execute(&mut *tx)
|
|
.execute(&mut *tx)
|
|
@@ -485,7 +514,7 @@ impl PostingStore for SqlStore {
|
|
|
for id in ids {
|
|
for id in ids {
|
|
|
let res = sqlx::query("UPDATE postings SET status = $1, reservation = NULL WHERE transfer_id = $2 AND idx = $3 AND status = $4 AND reservation = $5")
|
|
let res = sqlx::query("UPDATE postings SET status = $1, reservation = NULL WHERE transfer_id = $2 AND idx = $3 AND status = $4 AND reservation = $5")
|
|
|
.bind(status_to_i16(PostingStatus::Active))
|
|
.bind(status_to_i16(PostingStatus::Active))
|
|
|
- .bind(id.transfer.0.as_slice())
|
|
|
|
|
|
|
+ .bind(envelope_id_to_hex(&id.transfer))
|
|
|
.bind(id.index as i16)
|
|
.bind(id.index as i16)
|
|
|
.bind(status_to_i16(PostingStatus::PendingInactive))
|
|
.bind(status_to_i16(PostingStatus::PendingInactive))
|
|
|
.bind(reservation.0)
|
|
.bind(reservation.0)
|
|
@@ -519,7 +548,7 @@ impl PostingStore for SqlStore {
|
|
|
None => {
|
|
None => {
|
|
|
sqlx::query("UPDATE postings SET status = $1, reservation = NULL WHERE transfer_id = $2 AND idx = $3 AND status = $4")
|
|
sqlx::query("UPDATE postings SET status = $1, reservation = NULL WHERE transfer_id = $2 AND idx = $3 AND status = $4")
|
|
|
.bind(status_to_i16(PostingStatus::Inactive))
|
|
.bind(status_to_i16(PostingStatus::Inactive))
|
|
|
- .bind(id.transfer.0.as_slice())
|
|
|
|
|
|
|
+ .bind(envelope_id_to_hex(&id.transfer))
|
|
|
.bind(id.index as i16)
|
|
.bind(id.index as i16)
|
|
|
.bind(status_to_i16(PostingStatus::Active))
|
|
.bind(status_to_i16(PostingStatus::Active))
|
|
|
.execute(&mut *tx)
|
|
.execute(&mut *tx)
|
|
@@ -528,7 +557,7 @@ impl PostingStore for SqlStore {
|
|
|
Some(rid) => {
|
|
Some(rid) => {
|
|
|
sqlx::query("UPDATE postings SET status = $1, reservation = NULL WHERE transfer_id = $2 AND idx = $3 AND status = $4 AND reservation = $5")
|
|
sqlx::query("UPDATE postings SET status = $1, reservation = NULL WHERE transfer_id = $2 AND idx = $3 AND status = $4 AND reservation = $5")
|
|
|
.bind(status_to_i16(PostingStatus::Inactive))
|
|
.bind(status_to_i16(PostingStatus::Inactive))
|
|
|
- .bind(id.transfer.0.as_slice())
|
|
|
|
|
|
|
+ .bind(envelope_id_to_hex(&id.transfer))
|
|
|
.bind(id.index as i16)
|
|
.bind(id.index as i16)
|
|
|
.bind(status_to_i16(PostingStatus::PendingInactive))
|
|
.bind(status_to_i16(PostingStatus::PendingInactive))
|
|
|
.bind(rid.0)
|
|
.bind(rid.0)
|
|
@@ -556,7 +585,7 @@ impl PostingStore for SqlStore {
|
|
|
let res = sqlx::query(
|
|
let res = sqlx::query(
|
|
|
"INSERT INTO postings (transfer_id, idx, owner, asset, value, status) VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (transfer_id, idx) DO NOTHING"
|
|
"INSERT INTO postings (transfer_id, idx, owner, asset, value, status) VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (transfer_id, idx) DO NOTHING"
|
|
|
)
|
|
)
|
|
|
- .bind(posting.id.transfer.0.as_slice())
|
|
|
|
|
|
|
+ .bind(envelope_id_to_hex(&posting.id.transfer))
|
|
|
.bind(posting.id.index as i16)
|
|
.bind(posting.id.index as i16)
|
|
|
.bind(posting.owner.0)
|
|
.bind(posting.owner.0)
|
|
|
.bind(posting.asset.0 as i32)
|
|
.bind(posting.asset.0 as i32)
|
|
@@ -582,7 +611,7 @@ impl PostingStore for SqlStore {
|
|
|
impl TransferStore for SqlStore {
|
|
impl TransferStore for SqlStore {
|
|
|
async fn get_transfer(&self, id: &EnvelopeId) -> Result<Option<EnvelopeRecord>, StoreError> {
|
|
async fn get_transfer(&self, id: &EnvelopeId) -> Result<Option<EnvelopeRecord>, StoreError> {
|
|
|
let row = sqlx::query("SELECT transfer, receipt, created_at FROM transfers WHERE id = $1")
|
|
let row = sqlx::query("SELECT transfer, receipt, created_at FROM transfers WHERE id = $1")
|
|
|
- .bind(id.0.as_slice())
|
|
|
|
|
|
|
+ .bind(envelope_id_to_hex(id))
|
|
|
.fetch_optional(&self.pool)
|
|
.fetch_optional(&self.pool)
|
|
|
.await
|
|
.await
|
|
|
.map_err(|e| StoreError::Internal(e.to_string()))?;
|
|
.map_err(|e| StoreError::Internal(e.to_string()))?;
|
|
@@ -590,18 +619,18 @@ impl TransferStore for SqlStore {
|
|
|
match row {
|
|
match row {
|
|
|
None => Ok(None),
|
|
None => Ok(None),
|
|
|
Some(row) => {
|
|
Some(row) => {
|
|
|
- let transfer_bytes: Vec<u8> = row
|
|
|
|
|
|
|
+ let transfer_json: String = row
|
|
|
.try_get("transfer")
|
|
.try_get("transfer")
|
|
|
.map_err(|e| StoreError::Internal(e.to_string()))?;
|
|
.map_err(|e| StoreError::Internal(e.to_string()))?;
|
|
|
- let receipt_bytes: Vec<u8> = row
|
|
|
|
|
|
|
+ let receipt_json: String = row
|
|
|
.try_get("receipt")
|
|
.try_get("receipt")
|
|
|
.map_err(|e| StoreError::Internal(e.to_string()))?;
|
|
.map_err(|e| StoreError::Internal(e.to_string()))?;
|
|
|
let created_at: i64 = row
|
|
let created_at: i64 = row
|
|
|
.try_get("created_at")
|
|
.try_get("created_at")
|
|
|
.map_err(|e| StoreError::Internal(e.to_string()))?;
|
|
.map_err(|e| StoreError::Internal(e.to_string()))?;
|
|
|
Ok(Some(EnvelopeRecord {
|
|
Ok(Some(EnvelopeRecord {
|
|
|
- envelope: deserialize_blob(&transfer_bytes)?,
|
|
|
|
|
- receipt: deserialize_blob(&receipt_bytes)?,
|
|
|
|
|
|
|
+ envelope: deserialize_json(&transfer_json)?,
|
|
|
|
|
+ receipt: deserialize_json(&receipt_json)?,
|
|
|
created_at,
|
|
created_at,
|
|
|
}))
|
|
}))
|
|
|
}
|
|
}
|
|
@@ -614,8 +643,9 @@ impl TransferStore for SqlStore {
|
|
|
involved: &[AccountId],
|
|
involved: &[AccountId],
|
|
|
) -> Result<u64, StoreError> {
|
|
) -> Result<u64, StoreError> {
|
|
|
let tid = record.receipt.transfer_id;
|
|
let tid = record.receipt.transfer_id;
|
|
|
- let transfer_bytes = serialize_blob(&record.envelope)?;
|
|
|
|
|
- let receipt_bytes = serialize_blob(&record.receipt)?;
|
|
|
|
|
|
|
+ let tid_hex = envelope_id_to_hex(&tid);
|
|
|
|
|
+ let transfer_json = serialize_json(&record.envelope)?;
|
|
|
|
|
+ let receipt_json = serialize_json(&record.receipt)?;
|
|
|
|
|
|
|
|
let mut tx = self
|
|
let mut tx = self
|
|
|
.pool
|
|
.pool
|
|
@@ -624,9 +654,9 @@ impl TransferStore for SqlStore {
|
|
|
.map_err(|e| StoreError::Internal(e.to_string()))?;
|
|
.map_err(|e| StoreError::Internal(e.to_string()))?;
|
|
|
|
|
|
|
|
let res = sqlx::query("INSERT INTO transfers (id, transfer, receipt, created_at, book) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (id) DO NOTHING")
|
|
let res = sqlx::query("INSERT INTO transfers (id, transfer, receipt, created_at, book) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (id) DO NOTHING")
|
|
|
- .bind(tid.0.as_slice())
|
|
|
|
|
- .bind(&transfer_bytes)
|
|
|
|
|
- .bind(&receipt_bytes)
|
|
|
|
|
|
|
+ .bind(&tid_hex)
|
|
|
|
|
+ .bind(&transfer_json)
|
|
|
|
|
+ .bind(&receipt_json)
|
|
|
.bind(record.created_at)
|
|
.bind(record.created_at)
|
|
|
.bind(record.envelope.book().0)
|
|
.bind(record.envelope.book().0)
|
|
|
.execute(&mut *tx)
|
|
.execute(&mut *tx)
|
|
@@ -638,7 +668,7 @@ impl TransferStore for SqlStore {
|
|
|
// computation). Idempotent so a replay is harmless.
|
|
// computation). Idempotent so a replay is harmless.
|
|
|
for account in involved {
|
|
for account in involved {
|
|
|
sqlx::query("INSERT INTO transfer_accounts (transfer_id, account_id) VALUES ($1, $2) ON CONFLICT (transfer_id, account_id) DO NOTHING")
|
|
sqlx::query("INSERT INTO transfer_accounts (transfer_id, account_id) VALUES ($1, $2) ON CONFLICT (transfer_id, account_id) DO NOTHING")
|
|
|
- .bind(tid.0.as_slice())
|
|
|
|
|
|
|
+ .bind(&tid_hex)
|
|
|
.bind(account.0)
|
|
.bind(account.0)
|
|
|
.execute(&mut *tx)
|
|
.execute(&mut *tx)
|
|
|
.await
|
|
.await
|
|
@@ -665,18 +695,18 @@ impl TransferStore for SqlStore {
|
|
|
|
|
|
|
|
let mut result = Vec::with_capacity(rows.len());
|
|
let mut result = Vec::with_capacity(rows.len());
|
|
|
for row in &rows {
|
|
for row in &rows {
|
|
|
- let transfer_bytes: Vec<u8> = row
|
|
|
|
|
|
|
+ let transfer_json: String = row
|
|
|
.try_get("transfer")
|
|
.try_get("transfer")
|
|
|
.map_err(|e| StoreError::Internal(e.to_string()))?;
|
|
.map_err(|e| StoreError::Internal(e.to_string()))?;
|
|
|
- let receipt_bytes: Vec<u8> = row
|
|
|
|
|
|
|
+ let receipt_json: String = row
|
|
|
.try_get("receipt")
|
|
.try_get("receipt")
|
|
|
.map_err(|e| StoreError::Internal(e.to_string()))?;
|
|
.map_err(|e| StoreError::Internal(e.to_string()))?;
|
|
|
let created_at: i64 = row
|
|
let created_at: i64 = row
|
|
|
.try_get("created_at")
|
|
.try_get("created_at")
|
|
|
.map_err(|e| StoreError::Internal(e.to_string()))?;
|
|
.map_err(|e| StoreError::Internal(e.to_string()))?;
|
|
|
result.push(EnvelopeRecord {
|
|
result.push(EnvelopeRecord {
|
|
|
- envelope: deserialize_blob(&transfer_bytes)?,
|
|
|
|
|
- receipt: deserialize_blob(&receipt_bytes)?,
|
|
|
|
|
|
|
+ envelope: deserialize_json(&transfer_json)?,
|
|
|
|
|
+ receipt: deserialize_json(&receipt_json)?,
|
|
|
created_at,
|
|
created_at,
|
|
|
});
|
|
});
|
|
|
}
|
|
}
|
|
@@ -700,18 +730,18 @@ impl TransferStore for SqlStore {
|
|
|
|
|
|
|
|
let mut records = Vec::with_capacity(rows.len());
|
|
let mut records = Vec::with_capacity(rows.len());
|
|
|
for row in &rows {
|
|
for row in &rows {
|
|
|
- let transfer_bytes: Vec<u8> = row
|
|
|
|
|
|
|
+ let transfer_json: String = row
|
|
|
.try_get("transfer")
|
|
.try_get("transfer")
|
|
|
.map_err(|e| StoreError::Internal(e.to_string()))?;
|
|
.map_err(|e| StoreError::Internal(e.to_string()))?;
|
|
|
- let receipt_bytes: Vec<u8> = row
|
|
|
|
|
|
|
+ let receipt_json: String = row
|
|
|
.try_get("receipt")
|
|
.try_get("receipt")
|
|
|
.map_err(|e| StoreError::Internal(e.to_string()))?;
|
|
.map_err(|e| StoreError::Internal(e.to_string()))?;
|
|
|
let created_at: i64 = row
|
|
let created_at: i64 = row
|
|
|
.try_get("created_at")
|
|
.try_get("created_at")
|
|
|
.map_err(|e| StoreError::Internal(e.to_string()))?;
|
|
.map_err(|e| StoreError::Internal(e.to_string()))?;
|
|
|
records.push(EnvelopeRecord {
|
|
records.push(EnvelopeRecord {
|
|
|
- envelope: deserialize_blob(&transfer_bytes)?,
|
|
|
|
|
- receipt: deserialize_blob(&receipt_bytes)?,
|
|
|
|
|
|
|
+ envelope: deserialize_json(&transfer_json)?,
|
|
|
|
|
+ receipt: deserialize_json(&receipt_json)?,
|
|
|
created_at,
|
|
created_at,
|
|
|
});
|
|
});
|
|
|
}
|
|
}
|
|
@@ -762,7 +792,7 @@ impl SagaStore for SqlStore {
|
|
|
ON CONFLICT (id) DO UPDATE SET data = EXCLUDED.data",
|
|
ON CONFLICT (id) DO UPDATE SET data = EXCLUDED.data",
|
|
|
)
|
|
)
|
|
|
.bind(*id)
|
|
.bind(*id)
|
|
|
- .bind(&data)
|
|
|
|
|
|
|
+ .bind(to_hex(&data))
|
|
|
.execute(&self.pool)
|
|
.execute(&self.pool)
|
|
|
.await
|
|
.await
|
|
|
.map_err(|e| StoreError::Internal(e.to_string()))?;
|
|
.map_err(|e| StoreError::Internal(e.to_string()))?;
|
|
@@ -779,10 +809,10 @@ impl SagaStore for SqlStore {
|
|
|
let id: i64 = row
|
|
let id: i64 = row
|
|
|
.try_get("id")
|
|
.try_get("id")
|
|
|
.map_err(|e| StoreError::Internal(e.to_string()))?;
|
|
.map_err(|e| StoreError::Internal(e.to_string()))?;
|
|
|
- let data: Vec<u8> = row
|
|
|
|
|
|
|
+ let data_hex: String = row
|
|
|
.try_get("data")
|
|
.try_get("data")
|
|
|
.map_err(|e| StoreError::Internal(e.to_string()))?;
|
|
.map_err(|e| StoreError::Internal(e.to_string()))?;
|
|
|
- result.push((id, data));
|
|
|
|
|
|
|
+ result.push((id, from_hex(&data_hex)?));
|
|
|
}
|
|
}
|
|
|
Ok(result)
|
|
Ok(result)
|
|
|
}
|
|
}
|
|
@@ -806,25 +836,26 @@ impl EventStore for SqlStore {
|
|
|
async fn append_event(&self, event: &LedgerEvent) -> Result<u64, StoreError> {
|
|
async fn append_event(&self, event: &LedgerEvent) -> Result<u64, StoreError> {
|
|
|
let kind_str =
|
|
let kind_str =
|
|
|
serde_json::to_string(&event.kind).map_err(|e| StoreError::Internal(e.to_string()))?;
|
|
serde_json::to_string(&event.kind).map_err(|e| StoreError::Internal(e.to_string()))?;
|
|
|
- let data = serialize_blob(event)?;
|
|
|
|
|
|
|
+ let data = serialize_json(event)?;
|
|
|
let seq = self.autoid.next() as u64;
|
|
let seq = self.autoid.next() as u64;
|
|
|
|
|
|
|
|
// Idempotent on the dedup key: a replayed transfer event conflicts on
|
|
// Idempotent on the dedup key: a replayed transfer event conflicts on
|
|
|
// `dedup_key` and returns the existing seq instead of a duplicate row.
|
|
// `dedup_key` and returns the existing seq instead of a duplicate row.
|
|
|
match kuatia_storage::events::event_dedup_key(&event.kind) {
|
|
match kuatia_storage::events::event_dedup_key(&event.kind) {
|
|
|
Some(eid) => {
|
|
Some(eid) => {
|
|
|
|
|
+ let dedup_hex = envelope_id_to_hex(&eid);
|
|
|
let res = sqlx::query("INSERT INTO events (seq, timestamp, kind, data, dedup_key) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (dedup_key) DO NOTHING")
|
|
let res = sqlx::query("INSERT INTO events (seq, timestamp, kind, data, dedup_key) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (dedup_key) DO NOTHING")
|
|
|
.bind(seq as i64)
|
|
.bind(seq as i64)
|
|
|
.bind(event.timestamp)
|
|
.bind(event.timestamp)
|
|
|
.bind(&kind_str)
|
|
.bind(&kind_str)
|
|
|
.bind(&data)
|
|
.bind(&data)
|
|
|
- .bind(eid.0.as_slice())
|
|
|
|
|
|
|
+ .bind(&dedup_hex)
|
|
|
.execute(&self.pool)
|
|
.execute(&self.pool)
|
|
|
.await
|
|
.await
|
|
|
.map_err(|e| StoreError::Internal(e.to_string()))?;
|
|
.map_err(|e| StoreError::Internal(e.to_string()))?;
|
|
|
if res.rows_affected() == 0 {
|
|
if res.rows_affected() == 0 {
|
|
|
let row = sqlx::query("SELECT seq FROM events WHERE dedup_key = $1")
|
|
let row = sqlx::query("SELECT seq FROM events WHERE dedup_key = $1")
|
|
|
- .bind(eid.0.as_slice())
|
|
|
|
|
|
|
+ .bind(&dedup_hex)
|
|
|
.fetch_one(&self.pool)
|
|
.fetch_one(&self.pool)
|
|
|
.await
|
|
.await
|
|
|
.map_err(|e| StoreError::Internal(e.to_string()))?;
|
|
.map_err(|e| StoreError::Internal(e.to_string()))?;
|
|
@@ -868,10 +899,10 @@ impl EventStore for SqlStore {
|
|
|
let seq: i64 = row
|
|
let seq: i64 = row
|
|
|
.try_get("seq")
|
|
.try_get("seq")
|
|
|
.map_err(|e| StoreError::Internal(e.to_string()))?;
|
|
.map_err(|e| StoreError::Internal(e.to_string()))?;
|
|
|
- let data: Vec<u8> = row
|
|
|
|
|
|
|
+ let data_json: String = row
|
|
|
.try_get("data")
|
|
.try_get("data")
|
|
|
.map_err(|e| StoreError::Internal(e.to_string()))?;
|
|
.map_err(|e| StoreError::Internal(e.to_string()))?;
|
|
|
- let mut event: LedgerEvent = deserialize_blob(&data)?;
|
|
|
|
|
|
|
+ let mut event: LedgerEvent = deserialize_json(&data_json)?;
|
|
|
event.seq = seq as u64;
|
|
event.seq = seq as u64;
|
|
|
events.push(event);
|
|
events.push(event);
|
|
|
}
|
|
}
|
|
@@ -895,7 +926,7 @@ impl BookStore for SqlStore {
|
|
|
return Err(StoreError::AlreadyExists(format!("book {:?}", book.id)));
|
|
return Err(StoreError::AlreadyExists(format!("book {:?}", book.id)));
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- let data = serialize_blob(&book)?;
|
|
|
|
|
|
|
+ let data = serialize_json(&book)?;
|
|
|
sqlx::query("INSERT INTO books (id, name, data) VALUES ($1, $2, $3)")
|
|
sqlx::query("INSERT INTO books (id, name, data) VALUES ($1, $2, $3)")
|
|
|
.bind(book.id.0)
|
|
.bind(book.id.0)
|
|
|
.bind(&book.name)
|
|
.bind(&book.name)
|
|
@@ -913,10 +944,10 @@ impl BookStore for SqlStore {
|
|
|
.await
|
|
.await
|
|
|
.map_err(|e| StoreError::Internal(e.to_string()))?
|
|
.map_err(|e| StoreError::Internal(e.to_string()))?
|
|
|
.ok_or_else(|| StoreError::NotFound(format!("book {id:?}")))?;
|
|
.ok_or_else(|| StoreError::NotFound(format!("book {id:?}")))?;
|
|
|
- let data: Vec<u8> = row
|
|
|
|
|
|
|
+ let data: String = row
|
|
|
.try_get("data")
|
|
.try_get("data")
|
|
|
.map_err(|e| StoreError::Internal(e.to_string()))?;
|
|
.map_err(|e| StoreError::Internal(e.to_string()))?;
|
|
|
- deserialize_blob(&data)
|
|
|
|
|
|
|
+ deserialize_json(&data)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
async fn list_books(&self) -> Result<Vec<Book>, StoreError> {
|
|
async fn list_books(&self) -> Result<Vec<Book>, StoreError> {
|
|
@@ -926,10 +957,10 @@ impl BookStore for SqlStore {
|
|
|
.map_err(|e| StoreError::Internal(e.to_string()))?;
|
|
.map_err(|e| StoreError::Internal(e.to_string()))?;
|
|
|
rows.iter()
|
|
rows.iter()
|
|
|
.map(|row| {
|
|
.map(|row| {
|
|
|
- let data: Vec<u8> = row
|
|
|
|
|
|
|
+ let data: String = row
|
|
|
.try_get("data")
|
|
.try_get("data")
|
|
|
.map_err(|e| StoreError::Internal(e.to_string()))?;
|
|
.map_err(|e| StoreError::Internal(e.to_string()))?;
|
|
|
- deserialize_blob(&data)
|
|
|
|
|
|
|
+ deserialize_json(&data)
|
|
|
})
|
|
})
|
|
|
.collect()
|
|
.collect()
|
|
|
}
|
|
}
|