main.rs 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355
  1. use actix_web::{
  2. error::InternalError,
  3. get,
  4. middleware::Logger,
  5. post,
  6. rt::time::{self, Interval},
  7. web,
  8. web::Bytes,
  9. App, HttpResponse, HttpServer, Responder,
  10. };
  11. use futures_util::Stream;
  12. use serde::{Deserialize, Serialize};
  13. use serde_json::json;
  14. use std::{
  15. pin::Pin,
  16. sync::Arc,
  17. task::{Context, Poll},
  18. time::Duration,
  19. };
  20. use tokio::sync::mpsc::Receiver;
  21. use verax::{AccountId, AnyAmount, AnyId, Asset, Filter, RevId, Status, Tag, Type};
  22. #[derive(Deserialize)]
  23. pub struct Movement {
  24. pub account: AccountId,
  25. #[serde(flatten)]
  26. pub amount: AnyAmount,
  27. }
  28. #[derive(Deserialize)]
  29. pub struct Deposit {
  30. pub account: AccountId,
  31. #[serde(flatten)]
  32. pub amount: AnyAmount,
  33. pub memo: String,
  34. pub tags: Vec<Tag>,
  35. pub status: Status,
  36. }
  37. impl Deposit {
  38. pub async fn to_ledger_transaction(
  39. self,
  40. ledger: &Ledger,
  41. ) -> Result<verax::Transaction, verax::Error> {
  42. let zdeposit = ledger
  43. ._inner
  44. .deposit(
  45. &self.account,
  46. self.amount.try_into()?,
  47. self.status,
  48. vec![],
  49. self.memo,
  50. )
  51. .await?;
  52. Ok(if !self.tags.is_empty() {
  53. ledger
  54. ._inner
  55. .set_tags(zdeposit.revision_id, self.tags, "Update tags".to_owned())
  56. .await?
  57. } else {
  58. zdeposit
  59. })
  60. }
  61. }
  62. #[derive(Deserialize)]
  63. pub struct Transaction {
  64. pub debit: Vec<Movement>,
  65. pub credit: Vec<Movement>,
  66. pub memo: String,
  67. pub status: Status,
  68. }
  69. #[derive(Deserialize)]
  70. pub struct UpdateTransaction {
  71. pub status: Status,
  72. pub memo: String,
  73. }
  74. impl UpdateTransaction {
  75. pub async fn to_ledger_transaction(
  76. self,
  77. id: RevId,
  78. ledger: &Ledger,
  79. ) -> Result<verax::Transaction, verax::Error> {
  80. ledger
  81. ._inner
  82. .change_status(id, self.status, self.memo)
  83. .await
  84. }
  85. }
  86. impl Transaction {
  87. pub async fn to_ledger_transaction(
  88. self,
  89. ledger: &Ledger,
  90. ) -> Result<verax::Transaction, verax::Error> {
  91. let from = self
  92. .debit
  93. .into_iter()
  94. .map(|x| x.amount.try_into().map(|amount| (x.account, amount)))
  95. .collect::<Result<Vec<_>, _>>()?;
  96. let to = self
  97. .credit
  98. .into_iter()
  99. .map(|x| x.amount.try_into().map(|amount| (x.account, amount)))
  100. .collect::<Result<Vec<_>, _>>()?;
  101. ledger
  102. ._inner
  103. .new_transaction(self.memo, self.status, from, to)
  104. .await
  105. }
  106. }
  107. #[derive(Serialize)]
  108. struct AccountResponse {
  109. amount: String,
  110. cents: String,
  111. asset: Asset,
  112. }
  113. #[get("/balance/{id}")]
  114. async fn get_balance(info: web::Path<AccountId>, ctx: web::Data<Ledger>) -> impl Responder {
  115. match ctx._inner.get_balance(&info.0).await {
  116. Ok(balances) => HttpResponse::Ok().json(
  117. balances
  118. .into_iter()
  119. .map(|amount| AccountResponse {
  120. amount: amount.to_string(),
  121. cents: amount.cents().to_string(),
  122. asset: amount.asset().clone(),
  123. })
  124. .collect::<Vec<_>>(),
  125. ),
  126. Err(err) => HttpResponse::BadRequest().json(json!({ "text": err.to_string(), "err": err})),
  127. }
  128. }
  129. #[get("/{id}")]
  130. async fn get_info(info: web::Path<AnyId>, ctx: web::Data<Ledger>) -> impl Responder {
  131. let (cache_for_ever, filter) = match info.0 {
  132. AnyId::Account(account_id) => (
  133. false,
  134. Filter {
  135. accounts: vec![account_id],
  136. typ: vec![Type::Deposit, Type::Withdrawal, Type::Transaction],
  137. ..Default::default()
  138. },
  139. ),
  140. AnyId::Revision(rev_id) => (
  141. true,
  142. Filter {
  143. revisions: vec![rev_id],
  144. limit: 1,
  145. ..Default::default()
  146. },
  147. ),
  148. AnyId::Transaction(transaction_id) => (
  149. false,
  150. Filter {
  151. ids: vec![transaction_id],
  152. limit: 1,
  153. ..Default::default()
  154. },
  155. ),
  156. AnyId::Payment(payment_id) => {
  157. let _ = ctx
  158. ._inner
  159. .get_payment_info(&payment_id)
  160. .await
  161. .map(|tx| HttpResponse::Ok().json(tx));
  162. todo!()
  163. }
  164. };
  165. let limit = filter.limit;
  166. ctx._inner
  167. .get_transactions(filter)
  168. .await
  169. .map(|results| {
  170. let json_response = if limit == 1 {
  171. serde_json::to_value(&results[0])
  172. } else {
  173. serde_json::to_value(&results)
  174. }
  175. .unwrap();
  176. if cache_for_ever {
  177. HttpResponse::Ok()
  178. .header(
  179. "Cache-Control",
  180. "public, max-age=31536000, s-maxage=31536000, immutable",
  181. )
  182. .header("Vary", "Accept-Encoding")
  183. .json(json_response)
  184. } else {
  185. HttpResponse::Ok().json(json_response)
  186. }
  187. })
  188. .map_err(|err| {
  189. HttpResponse::InternalServerError().json(json!({ "text": err.to_string(), "err": err}))
  190. })
  191. }
  192. struct SubscriberStream {
  193. receiver: Receiver<verax::Transaction>,
  194. ping_interval: Interval,
  195. }
  196. impl Stream for SubscriberStream {
  197. type Item = Result<Bytes, actix_web::Error>;
  198. fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
  199. match Pin::new(&mut self.ping_interval).poll_tick(cx) {
  200. Poll::Ready(_) => {
  201. // Send a heartbeat message
  202. let heartbeat_bytes = Bytes::copy_from_slice(b"{\"ping\": \"\"}\n");
  203. return Poll::Ready(Some(Ok(heartbeat_bytes)));
  204. }
  205. Poll::Pending => {}
  206. }
  207. match self.receiver.poll_recv(cx) {
  208. Poll::Ready(Some(msg)) => {
  209. let mut msg = serde_json::to_vec(&msg)?;
  210. msg.push(b'\n');
  211. Poll::Ready(Some(Ok(Bytes::copy_from_slice(&msg))))
  212. }
  213. Poll::Ready(None) => Poll::Ready(None),
  214. Poll::Pending => Poll::Pending,
  215. }
  216. }
  217. }
  218. #[get("/subscribe/tag/{tag}")]
  219. async fn subscribe_by_tag(tag: web::Path<Tag>, ctx: web::Data<Ledger>) -> impl Responder {
  220. let (_, receiver) = ctx
  221. ._inner
  222. .subscribe(Filter {
  223. tags: vec![tag.0],
  224. ..Default::default()
  225. })
  226. .await;
  227. HttpResponse::Ok()
  228. .content_type("application/json")
  229. .streaming(SubscriberStream {
  230. receiver,
  231. ping_interval: time::interval(Duration::from_secs(30)),
  232. })
  233. }
  234. #[post("/deposit")]
  235. async fn deposit(item: web::Json<Deposit>, ledger: web::Data<Ledger>) -> impl Responder {
  236. // Insert the item into a database or another data source.
  237. // For this example, we'll just echo the received item.
  238. match item.into_inner().to_ledger_transaction(&ledger).await {
  239. Ok(tx) => {
  240. // Insert the item into a database or another data source.
  241. // For this example, we'll just echo the received item.
  242. HttpResponse::Created().json(tx)
  243. }
  244. Err(err) => HttpResponse::BadRequest().json(json!({ "text": err.to_string(), "err": err})),
  245. }
  246. }
  247. #[post("/tx")]
  248. async fn create_transaction(
  249. item: web::Json<Transaction>,
  250. ledger: web::Data<Ledger>,
  251. ) -> impl Responder {
  252. match item.into_inner().to_ledger_transaction(&ledger).await {
  253. Ok(tx) => HttpResponse::Accepted().json(tx),
  254. Err(err) => {
  255. HttpResponse::InternalServerError().json(json!({ "text": err.to_string(), "err": err}))
  256. }
  257. }
  258. }
  259. #[post("/{id}")]
  260. async fn update_status(
  261. info: web::Path<RevId>,
  262. item: web::Json<UpdateTransaction>,
  263. ctx: web::Data<Ledger>,
  264. ) -> impl Responder {
  265. match item.into_inner().to_ledger_transaction(info.0, &ctx).await {
  266. Ok(tx) => HttpResponse::Accepted().json(tx),
  267. Err(err) => {
  268. HttpResponse::InternalServerError().json(json!({ "text": err.to_string(), "err": err}))
  269. }
  270. }
  271. }
  272. pub struct Ledger {
  273. _inner: Arc<verax::Ledger<verax::storage::Cache<verax::storage::SQLite>>>,
  274. }
  275. #[actix_web::main]
  276. async fn main() -> std::io::Result<()> {
  277. if std::env::var_os("RUST_LOG").is_none() {
  278. std::env::set_var("RUST_LOG", "actix_web=info");
  279. }
  280. env_logger::init_from_env(env_logger::Env::new().default_filter_or("info"));
  281. let settings = "sqlite:./test.db"
  282. .parse::<verax::storage::sqlite::SqliteConnectOptions>()
  283. .expect("valid settings")
  284. .journal_mode(verax::storage::sqlite::SqliteJournalMode::Wal)
  285. .create_if_missing(true);
  286. let pool = verax::storage::sqlite::SqlitePoolOptions::new()
  287. .connect_with(settings)
  288. .await
  289. .expect("pool");
  290. let storage = verax::storage::SQLite::new(pool.clone());
  291. storage.setup().await.expect("setup");
  292. let storage = verax::storage::SQLite::new(pool.clone());
  293. let storage = verax::storage::Cache::new(storage);
  294. let ledger = verax::Ledger::new(storage.into());
  295. HttpServer::new(move || {
  296. let ledger = ledger.clone();
  297. App::new()
  298. .wrap(Logger::default())
  299. .app_data(web::Data::new(Ledger { _inner: ledger }))
  300. .app_data(web::JsonConfig::default().error_handler(|err, _req| {
  301. InternalError::from_response(
  302. "",
  303. HttpResponse::BadRequest()
  304. .content_type("application/json")
  305. .body(format!(r#"{{"error":"{}"}}"#, err)),
  306. )
  307. .into()
  308. }))
  309. .service(subscribe_by_tag)
  310. .service(get_balance)
  311. .service(get_info)
  312. .service(deposit)
  313. .service(create_transaction)
  314. .service(update_status)
  315. })
  316. .bind("127.0.0.1:8080")?
  317. .run()
  318. .await
  319. }