123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355 |
- use actix_web::{
- error::InternalError,
- get,
- middleware::Logger,
- post,
- rt::time::{self, Interval},
- web,
- web::Bytes,
- App, HttpResponse, HttpServer, Responder,
- };
- use futures_util::Stream;
- use serde::{Deserialize, Serialize};
- use serde_json::json;
- use std::{
- pin::Pin,
- sync::Arc,
- task::{Context, Poll},
- time::Duration,
- };
- use tokio::sync::mpsc::Receiver;
- use verax::{AccountId, AnyAmount, AnyId, Asset, Filter, RevId, Status, Tag, Type};
- #[derive(Deserialize)]
- pub struct Movement {
- pub account: AccountId,
- #[serde(flatten)]
- pub amount: AnyAmount,
- }
- #[derive(Deserialize)]
- pub struct Deposit {
- pub account: AccountId,
- #[serde(flatten)]
- pub amount: AnyAmount,
- pub memo: String,
- pub tags: Vec<Tag>,
- pub status: Status,
- }
- impl Deposit {
- pub async fn to_ledger_transaction(
- self,
- ledger: &Ledger,
- ) -> Result<verax::Transaction, verax::Error> {
- let zdeposit = ledger
- ._inner
- .deposit(
- &self.account,
- self.amount.try_into()?,
- self.status,
- vec![],
- self.memo,
- )
- .await?;
- Ok(if !self.tags.is_empty() {
- ledger
- ._inner
- .set_tags(zdeposit.revision_id, self.tags, "Update tags".to_owned())
- .await?
- } else {
- zdeposit
- })
- }
- }
- #[derive(Deserialize)]
- pub struct Transaction {
- pub debit: Vec<Movement>,
- pub credit: Vec<Movement>,
- pub memo: String,
- pub status: Status,
- }
- #[derive(Deserialize)]
- pub struct UpdateTransaction {
- pub status: Status,
- pub memo: String,
- }
- impl UpdateTransaction {
- pub async fn to_ledger_transaction(
- self,
- id: RevId,
- ledger: &Ledger,
- ) -> Result<verax::Transaction, verax::Error> {
- ledger
- ._inner
- .change_status(id, self.status, self.memo)
- .await
- }
- }
- impl Transaction {
- pub async fn to_ledger_transaction(
- self,
- ledger: &Ledger,
- ) -> Result<verax::Transaction, verax::Error> {
- let from = self
- .debit
- .into_iter()
- .map(|x| x.amount.try_into().map(|amount| (x.account, amount)))
- .collect::<Result<Vec<_>, _>>()?;
- let to = self
- .credit
- .into_iter()
- .map(|x| x.amount.try_into().map(|amount| (x.account, amount)))
- .collect::<Result<Vec<_>, _>>()?;
- ledger
- ._inner
- .new_transaction(self.memo, self.status, from, to)
- .await
- }
- }
- #[derive(Serialize)]
- struct AccountResponse {
- amount: String,
- cents: String,
- asset: Asset,
- }
- #[get("/balance/{id}")]
- async fn get_balance(info: web::Path<AccountId>, ctx: web::Data<Ledger>) -> impl Responder {
- match ctx._inner.get_balance(&info.0).await {
- Ok(balances) => HttpResponse::Ok().json(
- balances
- .into_iter()
- .map(|amount| AccountResponse {
- amount: amount.to_string(),
- cents: amount.cents().to_string(),
- asset: amount.asset().clone(),
- })
- .collect::<Vec<_>>(),
- ),
- Err(err) => HttpResponse::BadRequest().json(json!({ "text": err.to_string(), "err": err})),
- }
- }
- #[get("/{id}")]
- async fn get_info(info: web::Path<AnyId>, ctx: web::Data<Ledger>) -> impl Responder {
- let (cache_for_ever, filter) = match info.0 {
- AnyId::Account(account_id) => (
- false,
- Filter {
- accounts: vec![account_id],
- typ: vec![Type::Deposit, Type::Withdrawal, Type::Transaction],
- ..Default::default()
- },
- ),
- AnyId::Revision(rev_id) => (
- true,
- Filter {
- revisions: vec![rev_id],
- limit: 1,
- ..Default::default()
- },
- ),
- AnyId::Transaction(transaction_id) => (
- false,
- Filter {
- ids: vec![transaction_id],
- limit: 1,
- ..Default::default()
- },
- ),
- AnyId::Payment(payment_id) => {
- let _ = ctx
- ._inner
- .get_payment_info(&payment_id)
- .await
- .map(|tx| HttpResponse::Ok().json(tx));
- todo!()
- }
- };
- let limit = filter.limit;
- ctx._inner
- .get_transactions(filter)
- .await
- .map(|results| {
- let json_response = if limit == 1 {
- serde_json::to_value(&results[0])
- } else {
- serde_json::to_value(&results)
- }
- .unwrap();
- if cache_for_ever {
- HttpResponse::Ok()
- .header(
- "Cache-Control",
- "public, max-age=31536000, s-maxage=31536000, immutable",
- )
- .header("Vary", "Accept-Encoding")
- .json(json_response)
- } else {
- HttpResponse::Ok().json(json_response)
- }
- })
- .map_err(|err| {
- HttpResponse::InternalServerError().json(json!({ "text": err.to_string(), "err": err}))
- })
- }
- struct SubscriberStream {
- receiver: Receiver<verax::Transaction>,
- ping_interval: Interval,
- }
- impl Stream for SubscriberStream {
- type Item = Result<Bytes, actix_web::Error>;
- fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
- match Pin::new(&mut self.ping_interval).poll_tick(cx) {
- Poll::Ready(_) => {
- // Send a heartbeat message
- let heartbeat_bytes = Bytes::copy_from_slice(b"{\"ping\": \"\"}\n");
- return Poll::Ready(Some(Ok(heartbeat_bytes)));
- }
- Poll::Pending => {}
- }
- match self.receiver.poll_recv(cx) {
- Poll::Ready(Some(msg)) => {
- let mut msg = serde_json::to_vec(&msg)?;
- msg.push(b'\n');
- Poll::Ready(Some(Ok(Bytes::copy_from_slice(&msg))))
- }
- Poll::Ready(None) => Poll::Ready(None),
- Poll::Pending => Poll::Pending,
- }
- }
- }
- #[get("/subscribe/tag/{tag}")]
- async fn subscribe_by_tag(tag: web::Path<Tag>, ctx: web::Data<Ledger>) -> impl Responder {
- let (_, receiver) = ctx
- ._inner
- .subscribe(Filter {
- tags: vec![tag.0],
- ..Default::default()
- })
- .await;
- HttpResponse::Ok()
- .content_type("application/json")
- .streaming(SubscriberStream {
- receiver,
- ping_interval: time::interval(Duration::from_secs(30)),
- })
- }
- #[post("/deposit")]
- async fn deposit(item: web::Json<Deposit>, ledger: web::Data<Ledger>) -> impl Responder {
- // Insert the item into a database or another data source.
- // For this example, we'll just echo the received item.
- match item.into_inner().to_ledger_transaction(&ledger).await {
- Ok(tx) => {
- // Insert the item into a database or another data source.
- // For this example, we'll just echo the received item.
- HttpResponse::Created().json(tx)
- }
- Err(err) => HttpResponse::BadRequest().json(json!({ "text": err.to_string(), "err": err})),
- }
- }
- #[post("/tx")]
- async fn create_transaction(
- item: web::Json<Transaction>,
- ledger: web::Data<Ledger>,
- ) -> impl Responder {
- match item.into_inner().to_ledger_transaction(&ledger).await {
- Ok(tx) => HttpResponse::Accepted().json(tx),
- Err(err) => {
- HttpResponse::InternalServerError().json(json!({ "text": err.to_string(), "err": err}))
- }
- }
- }
- #[post("/{id}")]
- async fn update_status(
- info: web::Path<RevId>,
- item: web::Json<UpdateTransaction>,
- ctx: web::Data<Ledger>,
- ) -> impl Responder {
- match item.into_inner().to_ledger_transaction(info.0, &ctx).await {
- Ok(tx) => HttpResponse::Accepted().json(tx),
- Err(err) => {
- HttpResponse::InternalServerError().json(json!({ "text": err.to_string(), "err": err}))
- }
- }
- }
- pub struct Ledger {
- _inner: Arc<verax::Ledger<verax::storage::Cache<verax::storage::SQLite>>>,
- }
- #[actix_web::main]
- async fn main() -> std::io::Result<()> {
- if std::env::var_os("RUST_LOG").is_none() {
- std::env::set_var("RUST_LOG", "actix_web=info");
- }
- env_logger::init_from_env(env_logger::Env::new().default_filter_or("info"));
- let settings = "sqlite:./test.db"
- .parse::<verax::storage::sqlite::SqliteConnectOptions>()
- .expect("valid settings")
- .journal_mode(verax::storage::sqlite::SqliteJournalMode::Wal)
- .create_if_missing(true);
- let pool = verax::storage::sqlite::SqlitePoolOptions::new()
- .connect_with(settings)
- .await
- .expect("pool");
- let storage = verax::storage::SQLite::new(pool.clone());
- storage.setup().await.expect("setup");
- let storage = verax::storage::SQLite::new(pool.clone());
- let storage = verax::storage::Cache::new(storage);
- let ledger = verax::Ledger::new(storage.into());
- HttpServer::new(move || {
- let ledger = ledger.clone();
- App::new()
- .wrap(Logger::default())
- .app_data(web::Data::new(Ledger { _inner: ledger }))
- .app_data(web::JsonConfig::default().error_handler(|err, _req| {
- InternalError::from_response(
- "",
- HttpResponse::BadRequest()
- .content_type("application/json")
- .body(format!(r#"{{"error":"{}"}}"#, err)),
- )
- .into()
- }))
- .service(subscribe_by_tag)
- .service(get_balance)
- .service(get_info)
- .service(deposit)
- .service(create_transaction)
- .service(update_status)
- })
- .bind("127.0.0.1:8080")?
- .run()
- .await
- }
|