use crate::Context; use actix_web::{ post, rt::time::{self, Interval}, web, web::Bytes, HttpResponse, Responder, }; use futures_util::Stream; use std::{ pin::Pin, task::{Context as TaskContext, Poll}, time::Duration, }; use tokio::sync::mpsc::{Receiver, Sender}; use verax::Filter; const DEFAULT_PING_INTERVAL_SECS: u64 = 30; struct SubscriberStream { receiver: Receiver, ping_interval: Interval, ping: u128, } impl SubscriberStream { pub fn new(subscription: (Sender, Receiver)) -> Self { Self { receiver: subscription.1, ping_interval: time::interval(Duration::from_secs(DEFAULT_PING_INTERVAL_SECS)), ping: 0, } } } impl Stream for SubscriberStream { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut TaskContext<'_>) -> Poll> { match Pin::new(&mut self.ping_interval).poll_tick(cx) { Poll::Ready(_) => { // Send a heartbeat message self.ping += 1; let message = format!("{}\"ping\":{}{}\n", "{", self.ping, "}"); let heartbeat_bytes = Bytes::copy_from_slice(&message.as_bytes()); return Poll::Ready(Some(Ok(heartbeat_bytes))); } Poll::Pending => {} } match self.receiver.poll_recv(cx) { Poll::Ready(Some(msg)) => { let mut msg = serde_json::to_vec(&msg)?; msg.push(b'\n'); Poll::Ready(Some(Ok(Bytes::copy_from_slice(&msg)))) } Poll::Ready(None) => Poll::Ready(None), Poll::Pending => Poll::Pending, } } } #[post("/subscribe")] pub async fn handler(tag: web::Json, ctx: web::Data) -> impl Responder { HttpResponse::Ok() .content_type("application/json") .streaming(SubscriberStream::new(ctx.ledger.subscribe(tag.0).await)) }