Просмотр исходного кода

Remove actix in favour of Axum (#13)

César D. Rodas 9 месяцев назад
Родитель
Сommit
3f1a6103f7
11 измененных файлов с 287 добавлено и 687 удалено
  1. 157 551
      Cargo.lock
  2. 1 1
      Cargo.toml
  3. 12 8
      src/balance.rs
  4. 8 14
      src/deposit.rs
  5. 18 16
      src/get.rs
  6. 9 10
      src/lock.rs
  7. 20 28
      src/main.rs
  8. 21 14
      src/subscribe.rs
  9. 9 10
      src/tx.rs
  10. 18 16
      src/update.rs
  11. 14 19
      utxo/src/worker.rs

Разница между файлами не показана из-за своего большого размера
+ 157 - 551
Cargo.lock


+ 1 - 1
Cargo.toml

@@ -9,10 +9,10 @@ members = ["utxo"]
 
 [dependencies]
 verax = { path = "utxo", features = ["sqlite"] }
-actix-web = "3"
 serde = { version = "1", features = ["derive"] }
 serde_json = "1"
 tokio = { version = "1.32.0", features = ["full"] }
 env_logger = "0.10.0"
 futures-util = "0.3.30"
 async-trait = "0.1.80"
+axum = { version = "0.7.5", features = ["macros", "ws", "http2"] }

+ 12 - 8
src/balance.rs

@@ -1,7 +1,10 @@
 use crate::{Context, Handler};
-use actix_web::{get, web, HttpResponse, Responder};
+use axum::{
+    extract::{Path, State},
+    http::StatusCode,
+    Json,
+};
 use serde::{Deserialize, Serialize};
-use serde_json::json;
 use verax::Asset;
 
 #[derive(Deserialize)]
@@ -40,10 +43,11 @@ impl Handler for AccountId {
     }
 }
 
-#[get("/balance/{id}")]
-pub async fn handler(info: web::Path<AccountId>, ctx: web::Data<Context>) -> impl Responder {
-    match info.0.handle(&ctx).await {
-        Ok(balances) => HttpResponse::Ok().json(balances),
-        Err(err) => HttpResponse::BadRequest().json(json!({ "text": err.to_string(), "err": err})),
-    }
+pub async fn handler(
+    Path(account): Path<AccountId>,
+    State(ctx): State<Context>,
+) -> Result<Json<Vec<Balance>>, (StatusCode, String)> {
+    Ok(Json(account.handle(&ctx).await.map_err(|err| {
+        (StatusCode::BAD_REQUEST, err.to_string())
+    })?))
 }

+ 8 - 14
src/deposit.rs

@@ -1,7 +1,6 @@
 use crate::{Context, Handler};
-use actix_web::{post, web, HttpResponse, Responder};
+use axum::{extract::State, http::StatusCode, Json};
 use serde::Deserialize;
-use serde_json::json;
 use verax::{AccountId, AnyAmount, Status, Tag};
 
 #[derive(Deserialize)]
@@ -32,16 +31,11 @@ impl Handler for Deposit {
     }
 }
 
-#[post("/deposit")]
-pub async fn handler(item: web::Json<Deposit>, ledger: web::Data<Context>) -> impl Responder {
-    // Insert the item into a database or another data source.
-    // For this example, we'll just echo the received item.
-    match item.into_inner().handle(&ledger).await {
-        Ok(tx) => {
-            // Insert the item into a database or another data source.
-            // For this example, we'll just echo the received item.
-            HttpResponse::Created().json(tx)
-        }
-        Err(err) => HttpResponse::BadRequest().json(json!({ "text": err.to_string(), "err": err})),
-    }
+pub async fn handler(
+    State(ledger): State<Context>,
+    Json(item): Json<Deposit>,
+) -> Result<Json<verax::Transaction>, (StatusCode, String)> {
+    Ok(Json(item.handle(&ledger).await.map_err(|err| {
+        (StatusCode::BAD_REQUEST, err.to_string())
+    })?))
 }

+ 18 - 16
src/get.rs

@@ -1,11 +1,17 @@
 use crate::Context;
-use actix_web::{get, web, HttpResponse, Responder};
-use serde_json::json;
+use axum::{
+    extract::{Path, State},
+    http::StatusCode,
+    Json,
+};
 use verax::{AnyId, Filter, Type};
 
-#[get("/{id}")]
-async fn handler(info: web::Path<AnyId>, ctx: web::Data<Context>) -> impl Responder {
-    let (cache_for_ever, filter) = match info.0 {
+/// TODO: Implement the http caching
+pub async fn handler(
+    Path(id): Path<AnyId>,
+    State(ctx): State<Context>,
+) -> Result<Json<Vec<verax::Transaction>>, (StatusCode, String)> {
+    let (_cache_for_ever, filter) = match id {
         AnyId::Account(account_id) => (
             false,
             Filter {
@@ -33,29 +39,25 @@ async fn handler(info: web::Path<AnyId>, ctx: web::Data<Context>) -> impl Respon
         ),
 
         AnyId::Payment(payment_id) => {
-            let _ = ctx
-                .ledger
-                .get_payment_info(&payment_id)
-                .await
-                .map(|tx| HttpResponse::Ok().json(tx));
+            let _ = ctx.ledger.get_payment_info(&payment_id).await;
 
             todo!()
         }
     };
 
-    let limit = filter.limit;
-
     ctx.ledger
         .get_transactions(filter)
         .await
+        .map(Json)
+        /*
         .map(|results| {
+            /// REVISIT LATER TO IMPLEMENT THE CACHE
             let json_response = if limit == 1 {
                 serde_json::to_value(&results[0])
             } else {
                 serde_json::to_value(&results)
             }
             .unwrap();
-
             if cache_for_ever {
                 HttpResponse::Ok()
                     .header(
@@ -67,8 +69,8 @@ async fn handler(info: web::Path<AnyId>, ctx: web::Data<Context>) -> impl Respon
             } else {
                 HttpResponse::Ok().json(json_response)
             }
+            json_response
         })
-        .map_err(|err| {
-            HttpResponse::InternalServerError().json(json!({ "text": err.to_string(), "err": err}))
-        })
+        */
+        .map_err(|err| (StatusCode::BAD_REQUEST, err.to_string()))
 }

+ 9 - 10
src/lock.rs

@@ -1,7 +1,6 @@
 use crate::{Context, Handler};
-use actix_web::{post, web, HttpResponse, Responder};
+use axum::{extract::State, http::StatusCode, Json};
 use serde::{Deserialize, Serialize};
-use serde_json::json;
 use verax::{RevId, TxId};
 
 #[derive(Deserialize)]
@@ -36,12 +35,12 @@ impl Handler for Lock {
     }
 }
 
-#[post("/lock")]
-async fn handler(item: web::Json<Lock>, ctx: web::Data<Context>) -> impl Responder {
-    match item.into_inner().handle(&ctx).await {
-        Ok(tx) => HttpResponse::Accepted().json(tx),
-        Err(err) => {
-            HttpResponse::InternalServerError().json(json!({ "text": err.to_string(), "err": err}))
-        }
-    }
+pub async fn handler(
+    State(ctx): State<Context>,
+    Json(item): Json<Lock>,
+) -> Result<Json<Response>, (StatusCode, String)> {
+    item.handle(&ctx)
+        .await
+        .map_err(|err| (StatusCode::BAD_REQUEST, err.to_string()))
+        .map(Json)
 }

+ 20 - 28
src/main.rs

@@ -1,6 +1,9 @@
-use actix_web::{error::InternalError, middleware::Logger, web, App, HttpResponse, HttpServer};
+use axum::{
+    routing::{get, post},
+    Router,
+};
 use serde::Serialize;
-use std::sync::Arc;
+use std::{net::SocketAddr, sync::Arc};
 
 #[async_trait::async_trait]
 pub trait Handler {
@@ -18,11 +21,12 @@ mod subscribe;
 mod tx;
 mod update;
 
+#[derive(Clone)]
 pub struct Context {
     ledger: Arc<verax::Ledger<verax::storage::Cache<verax::storage::SQLite>>>,
 }
 
-#[actix_web::main]
+#[tokio::main]
 async fn main() -> std::io::Result<()> {
     if std::env::var_os("RUST_LOG").is_none() {
         std::env::set_var("RUST_LOG", "actix_web=info");
@@ -47,30 +51,18 @@ async fn main() -> std::io::Result<()> {
     let storage = verax::storage::Cache::new(storage);
     let ledger = verax::Ledger::new(storage.into());
 
-    HttpServer::new(move || {
-        let ledger = ledger.clone();
+    let app = Router::new()
+        .route("/balance/:account", get(balance::handler))
+        .route("/deposit", post(deposit::handler))
+        .route("/lock", post(lock::handler))
+        .route("/subscribe", post(subscribe::handler))
+        .route("/tx", post(tx::handler))
+        .route("/:id", post(update::handler))
+        .route("/:id", get(get::handler))
+        .with_state(Context { ledger: ledger });
 
-        App::new()
-            .wrap(Logger::default())
-            .app_data(web::Data::new(Context { ledger: ledger }))
-            .app_data(web::JsonConfig::default().error_handler(|err, _req| {
-                InternalError::from_response(
-                    "",
-                    HttpResponse::BadRequest()
-                        .content_type("application/json")
-                        .body(format!(r#"{{"error":"{}"}}"#, err)),
-                )
-                .into()
-            }))
-            .service(subscribe::handler)
-            .service(deposit::handler)
-            .service(balance::handler)
-            .service(lock::handler)
-            .service(tx::handler)
-            .service(update::handler)
-            .service(get::handler)
-    })
-    .bind("127.0.0.1:8080")?
-    .run()
-    .await
+    let addr = SocketAddr::from(([127, 0, 0, 1], 8080));
+    let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
+    axum::serve(listener, app).await?;
+    Ok(())
 }

+ 21 - 14
src/subscribe.rs

@@ -1,18 +1,22 @@
 use crate::Context;
-use actix_web::{
-    post,
-    rt::time::{self, Interval},
-    web,
-    web::Bytes,
-    HttpResponse, Responder,
+use axum::{
+    body::{Body, Bytes},
+    extract::State,
+    http::{Response, StatusCode},
+    response::IntoResponse,
+    Json,
 };
 use futures_util::Stream;
 use std::{
+    convert::Infallible,
     pin::Pin,
     task::{Context as TaskContext, Poll},
     time::Duration,
 };
-use tokio::sync::mpsc::{Receiver, Sender};
+use tokio::{
+    sync::mpsc::{Receiver, Sender},
+    time::{self, Interval},
+};
 use verax::Filter;
 
 const DEFAULT_PING_INTERVAL_SECS: u64 = 30;
@@ -34,7 +38,7 @@ impl SubscriberStream {
 }
 
 impl Stream for SubscriberStream {
-    type Item = Result<Bytes, actix_web::Error>;
+    type Item = Result<Bytes, Infallible>;
 
     fn poll_next(mut self: Pin<&mut Self>, cx: &mut TaskContext<'_>) -> Poll<Option<Self::Item>> {
         match Pin::new(&mut self.ping_interval).poll_tick(cx) {
@@ -50,7 +54,7 @@ impl Stream for SubscriberStream {
 
         match self.receiver.poll_recv(cx) {
             Poll::Ready(Some(msg)) => {
-                let mut msg = serde_json::to_vec(&msg)?;
+                let mut msg = serde_json::to_vec(&msg).unwrap_or_default();
                 msg.push(b'\n');
                 Poll::Ready(Some(Ok(Bytes::copy_from_slice(&msg))))
             }
@@ -60,9 +64,12 @@ impl Stream for SubscriberStream {
     }
 }
 
-#[post("/subscribe")]
-pub async fn handler(tag: web::Json<Filter>, ctx: web::Data<Context>) -> impl Responder {
-    HttpResponse::Ok()
-        .content_type("application/json")
-        .streaming(SubscriberStream::new(ctx.ledger.subscribe(tag.0).await))
+pub async fn handler(State(ctx): State<Context>, Json(tag): Json<Filter>) -> impl IntoResponse {
+    Response::builder()
+        .status(StatusCode::OK)
+        .header("Content-Type", "application/json")
+        .body(Body::from_stream(SubscriberStream::new(
+            ctx.ledger.subscribe(tag).await,
+        )))
+        .unwrap_or_else(|_| StatusCode::INTERNAL_SERVER_ERROR.into_response())
 }

+ 9 - 10
src/tx.rs

@@ -1,7 +1,6 @@
 use crate::{Context, Handler};
-use actix_web::{post, web, HttpResponse, Responder};
+use axum::{extract::State, http::StatusCode, Json};
 use serde::Deserialize;
-use serde_json::json;
 use verax::{AccountId, AnyAmount, Status};
 
 #[derive(Deserialize)]
@@ -44,12 +43,12 @@ impl Handler for Transaction {
     }
 }
 
-#[post("/tx")]
-async fn handler(item: web::Json<Transaction>, ledger: web::Data<Context>) -> impl Responder {
-    match item.into_inner().handle(&ledger).await {
-        Ok(tx) => HttpResponse::Accepted().json(tx),
-        Err(err) => {
-            HttpResponse::InternalServerError().json(json!({ "text": err.to_string(), "err": err}))
-        }
-    }
+pub async fn handler(
+    State(ledger): State<Context>,
+    Json(item): Json<Transaction>,
+) -> Result<Json<verax::Transaction>, (StatusCode, String)> {
+    item.handle(&ledger)
+        .await
+        .map_err(|err| (StatusCode::BAD_REQUEST, err.to_string()))
+        .map(Json)
 }

+ 18 - 16
src/update.rs

@@ -1,7 +1,10 @@
 use crate::{Context, Handler};
-use actix_web::{post, web, HttpResponse, Responder};
+use axum::{
+    extract::{Path, State},
+    http::StatusCode,
+    Json,
+};
 use serde::Deserialize;
-use serde_json::json;
 use verax::{RevId, Status, Tag, TokenPayload};
 
 #[derive(Deserialize)]
@@ -59,20 +62,19 @@ impl Handler for Update {
     }
 }
 
-#[post("/{id}")]
-async fn handler(
-    info: web::Path<RevId>,
-    item: web::Json<UpdateOperation>,
-    ctx: web::Data<Context>,
-) -> impl Responder {
+pub async fn handler(
+    Path(info): Path<RevId>,
+    State(ctx): State<Context>,
+    Json(item): Json<UpdateOperation>,
+) -> Result<Json<verax::Transaction>, (StatusCode, String)> {
     let update = Update {
-        id: info.into_inner(),
-        operation: item.into_inner(),
+        id: info,
+        operation: item,
     };
-    match update.handle(&ctx).await {
-        Ok(tx) => HttpResponse::Accepted().json(tx),
-        Err(err) => {
-            HttpResponse::InternalServerError().json(json!({ "text": err.to_string(), "err": err}))
-        }
-    }
+
+    update
+        .handle(&ctx)
+        .await
+        .map_err(|err| (StatusCode::BAD_REQUEST, err.to_string()))
+        .map(Json)
 }

+ 14 - 19
utxo/src/worker.rs

@@ -5,11 +5,9 @@ use parking_lot::RwLock;
 use std::{
     ops::Deref,
     sync::{atomic::AtomicBool, Arc},
-    thread,
     time::Duration,
 };
 use tokio::{
-    runtime::Runtime,
     sync::mpsc::{channel, error::TrySendError, Sender},
     time::sleep,
 };
@@ -81,26 +79,23 @@ impl<W: Worker + 'static> WorkerManager<W> {
         let (sender, mut receiver) = channel(WORKER_BUFFER_SIZE);
         let worker_for_thread = self.worker.clone();
         let worker_in_scope = self.is_running.clone();
-        thread::spawn(|| {
-            let rt = Runtime::new().expect("Failed to create tokio runtime in worker.rs");
-            rt.block_on(async move {
-                let mut last_time = Utc::now();
-                loop {
-                    tokio::select! {
-                        Some(message) = receiver.recv() => {
-                            worker_for_thread.handler(message).await;
-                            last_time = Utc::now();
-                        }
-                        _ = sleep(Duration::from_millis(CHECK_WORKER_IN_SCOPE_MS))  => {}
+        tokio::spawn(async move {
+            let mut last_time = Utc::now();
+            loop {
+                tokio::select! {
+                    Some(message) = receiver.recv() => {
+                        worker_for_thread.handler(message).await;
+                        last_time = Utc::now();
                     }
+                    _ = sleep(Duration::from_millis(CHECK_WORKER_IN_SCOPE_MS))  => {}
+                }
 
-                    if !worker_in_scope.load(std::sync::atomic::Ordering::Acquire)
-                        || (last_time - Utc::now()).num_seconds() > MAXIMUM_IDLE_TIME_SEC
-                    {
-                        break;
-                    }
+                if !worker_in_scope.load(std::sync::atomic::Ordering::Acquire)
+                    || (last_time - Utc::now()).num_seconds() > MAXIMUM_IDLE_TIME_SEC
+                {
+                    break;
                 }
-            });
+            }
         });
         sender
     }

Некоторые файлы не были показаны из-за большого количества измененных файлов