use actix_web::{ error::InternalError, get, middleware::Logger, post, rt::time::{self, Interval}, web, web::Bytes, App, HttpResponse, HttpServer, Responder, }; use futures_util::Stream; use serde::{Deserialize, Serialize}; use serde_json::json; use std::{ pin::Pin, sync::Arc, task::{Context, Poll}, time::Duration, }; use tokio::sync::mpsc::Receiver; use verax::{AccountId, AnyAmount, AnyId, Asset, Filter, RevId, Status, Tag, Type}; #[derive(Deserialize)] pub struct Movement { pub account: AccountId, #[serde(flatten)] pub amount: AnyAmount, } #[derive(Deserialize)] pub struct Deposit { pub account: AccountId, #[serde(flatten)] pub amount: AnyAmount, pub memo: String, pub tags: Vec, pub status: Status, } impl Deposit { pub async fn to_ledger_transaction( self, ledger: &Ledger, ) -> Result { let zdeposit = ledger ._inner .deposit( &self.account, self.amount.try_into()?, self.status, vec![], self.memo, ) .await?; Ok(if !self.tags.is_empty() { ledger ._inner .set_tags(zdeposit.revision_id, self.tags, "Update tags".to_owned()) .await? } else { zdeposit }) } } #[derive(Deserialize)] pub struct Transaction { pub debit: Vec, pub credit: Vec, pub memo: String, pub status: Status, } #[derive(Deserialize)] pub struct UpdateTransaction { pub status: Status, pub memo: String, } impl UpdateTransaction { pub async fn to_ledger_transaction( self, id: RevId, ledger: &Ledger, ) -> Result { ledger ._inner .change_status(id, self.status, self.memo) .await } } impl Transaction { pub async fn to_ledger_transaction( self, ledger: &Ledger, ) -> Result { let from = self .debit .into_iter() .map(|x| x.amount.try_into().map(|amount| (x.account, amount))) .collect::, _>>()?; let to = self .credit .into_iter() .map(|x| x.amount.try_into().map(|amount| (x.account, amount))) .collect::, _>>()?; ledger ._inner .new_transaction(self.memo, self.status, from, to) .await } } #[derive(Serialize)] struct AccountResponse { amount: String, cents: String, asset: Asset, } #[get("/balance/{id}")] async fn get_balance(info: web::Path, ctx: web::Data) -> impl Responder { match ctx._inner.get_balance(&info.0).await { Ok(balances) => HttpResponse::Ok().json( balances .into_iter() .map(|amount| AccountResponse { amount: amount.to_string(), cents: amount.cents().to_string(), asset: amount.asset().clone(), }) .collect::>(), ), Err(err) => HttpResponse::BadRequest().json(json!({ "text": err.to_string(), "err": err})), } } #[get("/{id}")] async fn get_info(info: web::Path, ctx: web::Data) -> impl Responder { let (cache_for_ever, filter) = match info.0 { AnyId::Account(account_id) => ( false, Filter { accounts: vec![account_id], typ: vec![Type::Deposit, Type::Withdrawal, Type::Transaction], ..Default::default() }, ), AnyId::Revision(rev_id) => ( true, Filter { revisions: vec![rev_id], limit: 1, ..Default::default() }, ), AnyId::Transaction(transaction_id) => ( false, Filter { ids: vec![transaction_id], limit: 1, ..Default::default() }, ), AnyId::Payment(payment_id) => { let _ = ctx ._inner .get_payment_info(&payment_id) .await .map(|tx| HttpResponse::Ok().json(tx)); todo!() } }; let limit = filter.limit; ctx._inner .get_transactions(filter) .await .map(|results| { let json_response = if limit == 1 { serde_json::to_value(&results[0]) } else { serde_json::to_value(&results) } .unwrap(); if cache_for_ever { HttpResponse::Ok() .header( "Cache-Control", "public, max-age=31536000, s-maxage=31536000, immutable", ) .header("Vary", "Accept-Encoding") .json(json_response) } else { HttpResponse::Ok().json(json_response) } }) .map_err(|err| { HttpResponse::InternalServerError().json(json!({ "text": err.to_string(), "err": err})) }) } struct SubscriberStream { receiver: Receiver, ping_interval: Interval, } impl Stream for SubscriberStream { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match Pin::new(&mut self.ping_interval).poll_tick(cx) { Poll::Ready(_) => { // Send a heartbeat message let heartbeat_bytes = Bytes::copy_from_slice(b"{\"ping\": \"\"}\n"); return Poll::Ready(Some(Ok(heartbeat_bytes))); } Poll::Pending => {} } match self.receiver.poll_recv(cx) { Poll::Ready(Some(msg)) => { let mut msg = serde_json::to_vec(&msg)?; msg.push(b'\n'); Poll::Ready(Some(Ok(Bytes::copy_from_slice(&msg)))) } Poll::Ready(None) => Poll::Ready(None), Poll::Pending => Poll::Pending, } } } #[get("/subscribe/tag/{tag}")] async fn subscribe_by_tag(tag: web::Path, ctx: web::Data) -> impl Responder { let (_, receiver) = ctx ._inner .subscribe(Filter { tags: vec![tag.0], ..Default::default() }) .await; HttpResponse::Ok() .content_type("application/json") .streaming(SubscriberStream { receiver, ping_interval: time::interval(Duration::from_secs(30)), }) } #[post("/deposit")] async fn deposit(item: web::Json, ledger: web::Data) -> impl Responder { // Insert the item into a database or another data source. // For this example, we'll just echo the received item. match item.into_inner().to_ledger_transaction(&ledger).await { Ok(tx) => { // Insert the item into a database or another data source. // For this example, we'll just echo the received item. HttpResponse::Created().json(tx) } Err(err) => HttpResponse::BadRequest().json(json!({ "text": err.to_string(), "err": err})), } } #[post("/tx")] async fn create_transaction( item: web::Json, ledger: web::Data, ) -> impl Responder { match item.into_inner().to_ledger_transaction(&ledger).await { Ok(tx) => HttpResponse::Accepted().json(tx), Err(err) => { HttpResponse::InternalServerError().json(json!({ "text": err.to_string(), "err": err})) } } } #[post("/{id}")] async fn update_status( info: web::Path, item: web::Json, ctx: web::Data, ) -> impl Responder { match item.into_inner().to_ledger_transaction(info.0, &ctx).await { Ok(tx) => HttpResponse::Accepted().json(tx), Err(err) => { HttpResponse::InternalServerError().json(json!({ "text": err.to_string(), "err": err})) } } } pub struct Ledger { _inner: Arc>>, } #[actix_web::main] async fn main() -> std::io::Result<()> { if std::env::var_os("RUST_LOG").is_none() { std::env::set_var("RUST_LOG", "actix_web=info"); } env_logger::init_from_env(env_logger::Env::new().default_filter_or("info")); let settings = "sqlite:./test.db" .parse::() .expect("valid settings") .journal_mode(verax::storage::sqlite::SqliteJournalMode::Wal) .create_if_missing(true); let pool = verax::storage::sqlite::SqlitePoolOptions::new() .connect_with(settings) .await .expect("pool"); let storage = verax::storage::SQLite::new(pool.clone()); storage.setup().await.expect("setup"); let storage = verax::storage::SQLite::new(pool.clone()); let storage = verax::storage::Cache::new(storage); let ledger = verax::Ledger::new(storage.into()); HttpServer::new(move || { let ledger = ledger.clone(); App::new() .wrap(Logger::default()) .app_data(web::Data::new(Ledger { _inner: ledger })) .app_data(web::JsonConfig::default().error_handler(|err, _req| { InternalError::from_response( "", HttpResponse::BadRequest() .content_type("application/json") .body(format!(r#"{{"error":"{}"}}"#, err)), ) .into() })) .service(subscribe_by_tag) .service(get_balance) .service(get_info) .service(deposit) .service(create_transaction) .service(update_status) }) .bind("127.0.0.1:8080")? .run() .await }