Przeglądaj źródła

Move subscribe to their own file

Cesar Rodas 10 miesięcy temu
rodzic
commit
d9dee8e595
3 zmienionych plików z 80 dodań i 76 usunięć
  1. 6 64
      src/main.rs
  2. 62 0
      src/subscribe.rs
  3. 12 12
      utxo/src/filter.rs

+ 6 - 64
src/main.rs

@@ -1,25 +1,15 @@
 use actix_web::{
-    error::InternalError,
-    get,
-    middleware::Logger,
-    post,
-    rt::time::{self, Interval},
-    web,
-    web::Bytes,
-    App, HttpResponse, HttpServer, Responder,
+    error::InternalError, get, middleware::Logger, post, web, App, HttpResponse, HttpServer,
+    Responder,
 };
-use futures_util::Stream;
 use serde::{Deserialize, Serialize};
 use serde_json::json;
-use std::{
-    pin::Pin,
-    sync::Arc,
-    task::{Context, Poll},
-    time::Duration,
-};
-use tokio::sync::mpsc::Receiver;
+use std::sync::Arc;
+use subscribe::subscribe_by_tag;
 use verax::{AccountId, AnyAmount, AnyId, Asset, Filter, RevId, Status, Tag, Type};
 
+mod subscribe;
+
 #[derive(Deserialize)]
 pub struct Movement {
     pub account: AccountId,
@@ -209,54 +199,6 @@ async fn get_info(info: web::Path<AnyId>, ctx: web::Data<Ledger>) -> impl Respon
         })
 }
 
-struct SubscriberStream {
-    receiver: Receiver<verax::Transaction>,
-    ping_interval: Interval,
-}
-
-impl Stream for SubscriberStream {
-    type Item = Result<Bytes, actix_web::Error>;
-
-    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
-        match Pin::new(&mut self.ping_interval).poll_tick(cx) {
-            Poll::Ready(_) => {
-                // Send a heartbeat message
-                let heartbeat_bytes = Bytes::copy_from_slice(b"{\"ping\": \"\"}\n");
-                return Poll::Ready(Some(Ok(heartbeat_bytes)));
-            }
-            Poll::Pending => {}
-        }
-
-        match self.receiver.poll_recv(cx) {
-            Poll::Ready(Some(msg)) => {
-                let mut msg = serde_json::to_vec(&msg)?;
-                msg.push(b'\n');
-                Poll::Ready(Some(Ok(Bytes::copy_from_slice(&msg))))
-            }
-            Poll::Ready(None) => Poll::Ready(None),
-            Poll::Pending => Poll::Pending,
-        }
-    }
-}
-
-#[get("/subscribe/tag/{tag}")]
-async fn subscribe_by_tag(tag: web::Path<Tag>, ctx: web::Data<Ledger>) -> impl Responder {
-    let (_, receiver) = ctx
-        ._inner
-        .subscribe(Filter {
-            tags: vec![tag.0],
-            ..Default::default()
-        })
-        .await;
-
-    HttpResponse::Ok()
-        .content_type("application/json")
-        .streaming(SubscriberStream {
-            receiver,
-            ping_interval: time::interval(Duration::from_secs(30)),
-        })
-}
-
 #[post("/deposit")]
 async fn deposit(item: web::Json<Deposit>, ledger: web::Data<Ledger>) -> impl Responder {
     // Insert the item into a database or another data source.

+ 62 - 0
src/subscribe.rs

@@ -0,0 +1,62 @@
+use crate::Ledger;
+use actix_web::{
+    post,
+    rt::time::{self, Interval},
+    web,
+    web::Bytes,
+    HttpResponse, Responder,
+};
+use futures_util::Stream;
+use std::{
+    pin::Pin,
+    task::{Context, Poll},
+    time::Duration,
+};
+use tokio::sync::mpsc::Receiver;
+use verax::Filter;
+
+struct SubscriberStream {
+    receiver: Receiver<verax::Transaction>,
+    ping_interval: Interval,
+    ping: u128,
+}
+
+impl Stream for SubscriberStream {
+    type Item = Result<Bytes, actix_web::Error>;
+
+    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+        match Pin::new(&mut self.ping_interval).poll_tick(cx) {
+            Poll::Ready(_) => {
+                // Send a heartbeat message
+                self.ping += 1;
+                let message = format!("{}\"ping\": {}{}\n", "{", self.ping, "}");
+                let heartbeat_bytes = Bytes::copy_from_slice(&message.as_bytes());
+                return Poll::Ready(Some(Ok(heartbeat_bytes)));
+            }
+            Poll::Pending => {}
+        }
+
+        match self.receiver.poll_recv(cx) {
+            Poll::Ready(Some(msg)) => {
+                let mut msg = serde_json::to_vec(&msg)?;
+                msg.push(b'\n');
+                Poll::Ready(Some(Ok(Bytes::copy_from_slice(&msg))))
+            }
+            Poll::Ready(None) => Poll::Ready(None),
+            Poll::Pending => Poll::Pending,
+        }
+    }
+}
+
+#[post("/subscribe")]
+pub async fn subscribe_by_tag(tag: web::Json<Filter>, ctx: web::Data<Ledger>) -> impl Responder {
+    let (_, receiver) = ctx._inner.subscribe(tag.0).await;
+
+    HttpResponse::Ok()
+        .content_type("application/json")
+        .streaming(SubscriberStream {
+            receiver,
+            ping_interval: time::interval(Duration::from_secs(30)),
+            ping: 0,
+        })
+}

+ 12 - 12
utxo/src/filter.rs

@@ -73,42 +73,42 @@ impl From<PrimaryFilter> for Vec<FilterableValue> {
 #[derive(Clone, Debug, Serialize, PartialEq, Eq, Hash, Deserialize, Default)]
 pub struct Filter {
     /// List of transaction IDs to query
-    #[serde(skip_serializing_if = "Vec::is_empty")]
+    #[serde(skip_serializing_if = "Vec::is_empty", default)]
     pub ids: Vec<TxId>,
     /// List of revisions to query
-    #[serde(skip_serializing_if = "Vec::is_empty")]
+    #[serde(skip_serializing_if = "Vec::is_empty", default)]
     pub revisions: Vec<RevId>,
     /// List of accounts to query their transactions
-    #[serde(skip_serializing_if = "Vec::is_empty")]
+    #[serde(skip_serializing_if = "Vec::is_empty", default)]
     pub accounts: Vec<AccountId>,
     /// List of transaction types-kind
-    #[serde(rename = "type", skip_serializing_if = "Vec::is_empty")]
+    #[serde(rename = "type", skip_serializing_if = "Vec::is_empty", default)]
     pub typ: Vec<Type>,
     /// List of statuses to query
-    #[serde(skip_serializing_if = "Vec::is_empty")]
+    #[serde(skip_serializing_if = "Vec::is_empty", default)]
     pub status: Vec<Status>,
     /// List of transactions by tags
-    #[serde(skip_serializing_if = "Vec::is_empty")]
+    #[serde(skip_serializing_if = "Vec::is_empty", default)]
     pub tags: Vec<Tag>,
     /// List transactions newer than this timestamp
     #[serde(
-        default,
         with = "option_ts_seconds",
-        skip_serializing_if = "Option::is_none"
+        skip_serializing_if = "Option::is_none",
+        default
     )]
     pub since: Option<DateTime<Utc>>,
     /// List transactions upto this timestamp
     #[serde(
-        default,
         with = "option_ts_seconds",
-        skip_serializing_if = "Option::is_none"
+        skip_serializing_if = "Option::is_none",
+        default
     )]
     pub until: Option<DateTime<Utc>>,
     /// Limit for transactions
-    #[serde(default, skip_serializing_if = "is_zero")]
+    #[serde(skip_serializing_if = "is_zero", default)]
     pub limit: usize,
     /// Skip the first `skip` transactions
-    #[serde(default, skip_serializing_if = "is_zero")]
+    #[serde(skip_serializing_if = "is_zero", default)]
     pub skip: usize,
 }