subscribe.rs 1.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940
  1. use cdk::subscription::Params;
  2. use cdk::ws::{WsResponseResult, WsSubscribeResponse};
  3. use super::{WsContext, WsError};
  4. /// The `handle` method is called when a client sends a subscription request
  5. pub(crate) async fn handle(
  6. context: &mut WsContext,
  7. params: Params,
  8. ) -> Result<WsResponseResult, WsError> {
  9. let sub_id = params.id.clone();
  10. if context.subscriptions.contains_key(&sub_id) {
  11. // Subscription ID already exits. Returns an error instead of
  12. // replacing the other subscription or avoiding it.
  13. return Err(WsError::InvalidParams);
  14. }
  15. let mut subscription = context
  16. .state
  17. .mint
  18. .pubsub_manager()
  19. .subscribe(params)
  20. .map_err(|_| WsError::ParseError)?;
  21. let publisher = context.publisher.clone();
  22. let sub_id_for_sender = sub_id.clone();
  23. context.subscriptions.insert(
  24. sub_id.clone(),
  25. tokio::spawn(async move {
  26. while let Some(response) = subscription.recv().await {
  27. let _ = publisher.try_send((sub_id_for_sender.clone(), response.into_inner()));
  28. }
  29. }),
  30. );
  31. Ok(WsSubscribeResponse {
  32. status: "OK".to_string(),
  33. sub_id,
  34. }
  35. .into())
  36. }