subscribe.rs 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
  1. use crate::Context;
  2. use actix_web::{
  3. post,
  4. rt::time::{self, Interval},
  5. web,
  6. web::Bytes,
  7. HttpResponse, Responder,
  8. };
  9. use futures_util::Stream;
  10. use std::{
  11. pin::Pin,
  12. task::{Context as TaskContext, Poll},
  13. time::Duration,
  14. };
  15. use tokio::sync::mpsc::{Receiver, Sender};
  16. use verax::Filter;
  17. const DEFAULT_PING_INTERVAL_SECS: u64 = 30;
  18. struct SubscriberStream {
  19. receiver: Receiver<verax::Transaction>,
  20. ping_interval: Interval,
  21. ping: u128,
  22. }
  23. impl SubscriberStream {
  24. pub fn new(subscription: (Sender<verax::Transaction>, Receiver<verax::Transaction>)) -> Self {
  25. Self {
  26. receiver: subscription.1,
  27. ping_interval: time::interval(Duration::from_secs(DEFAULT_PING_INTERVAL_SECS)),
  28. ping: 0,
  29. }
  30. }
  31. }
  32. impl Stream for SubscriberStream {
  33. type Item = Result<Bytes, actix_web::Error>;
  34. fn poll_next(mut self: Pin<&mut Self>, cx: &mut TaskContext<'_>) -> Poll<Option<Self::Item>> {
  35. match Pin::new(&mut self.ping_interval).poll_tick(cx) {
  36. Poll::Ready(_) => {
  37. // Send a heartbeat message
  38. self.ping += 1;
  39. let message = format!("{}\"ping\":{}{}\n", "{", self.ping, "}");
  40. let heartbeat_bytes = Bytes::copy_from_slice(&message.as_bytes());
  41. return Poll::Ready(Some(Ok(heartbeat_bytes)));
  42. }
  43. Poll::Pending => {}
  44. }
  45. match self.receiver.poll_recv(cx) {
  46. Poll::Ready(Some(msg)) => {
  47. let mut msg = serde_json::to_vec(&msg)?;
  48. msg.push(b'\n');
  49. Poll::Ready(Some(Ok(Bytes::copy_from_slice(&msg))))
  50. }
  51. Poll::Ready(None) => Poll::Ready(None),
  52. Poll::Pending => Poll::Pending,
  53. }
  54. }
  55. }
  56. #[post("/subscribe")]
  57. pub async fn handler(tag: web::Json<Filter>, ctx: web::Data<Context>) -> impl Responder {
  58. HttpResponse::Ok()
  59. .content_type("application/json")
  60. .streaming(SubscriberStream::new(ctx.ledger.subscribe(tag.0).await))
  61. }