1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768 |
- use crate::Context;
- use actix_web::{
- post,
- rt::time::{self, Interval},
- web,
- web::Bytes,
- HttpResponse, Responder,
- };
- use futures_util::Stream;
- use std::{
- pin::Pin,
- task::{Context as TaskContext, Poll},
- time::Duration,
- };
- use tokio::sync::mpsc::{Receiver, Sender};
- use verax::Filter;
- const DEFAULT_PING_INTERVAL_SECS: u64 = 30;
- struct SubscriberStream {
- receiver: Receiver<verax::Transaction>,
- ping_interval: Interval,
- ping: u128,
- }
- impl SubscriberStream {
- pub fn new(subscription: (Sender<verax::Transaction>, Receiver<verax::Transaction>)) -> Self {
- Self {
- receiver: subscription.1,
- ping_interval: time::interval(Duration::from_secs(DEFAULT_PING_INTERVAL_SECS)),
- ping: 0,
- }
- }
- }
- impl Stream for SubscriberStream {
- type Item = Result<Bytes, actix_web::Error>;
- fn poll_next(mut self: Pin<&mut Self>, cx: &mut TaskContext<'_>) -> Poll<Option<Self::Item>> {
- match Pin::new(&mut self.ping_interval).poll_tick(cx) {
- Poll::Ready(_) => {
- // Send a heartbeat message
- self.ping += 1;
- let message = format!("{}\"ping\":{}{}\n", "{", self.ping, "}");
- let heartbeat_bytes = Bytes::copy_from_slice(&message.as_bytes());
- return Poll::Ready(Some(Ok(heartbeat_bytes)));
- }
- Poll::Pending => {}
- }
- match self.receiver.poll_recv(cx) {
- Poll::Ready(Some(msg)) => {
- let mut msg = serde_json::to_vec(&msg)?;
- msg.push(b'\n');
- Poll::Ready(Some(Ok(Bytes::copy_from_slice(&msg))))
- }
- Poll::Ready(None) => Poll::Ready(None),
- Poll::Pending => Poll::Pending,
- }
- }
- }
- #[post("/subscribe")]
- pub async fn handler(tag: web::Json<Filter>, ctx: web::Data<Context>) -> impl Responder {
- HttpResponse::Ok()
- .content_type("application/json")
- .streaming(SubscriberStream::new(ctx.ledger.subscribe(tag.0).await))
- }
|