subscribe.rs 1.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142
  1. use cdk::subscription::{IndexableParams, Params};
  2. use cdk::ws::{WsResponseResult, WsSubscribeResponse};
  3. use super::{WsContext, WsError};
  4. /// The `handle` method is called when a client sends a subscription request
  5. pub(crate) async fn handle(
  6. context: &mut WsContext,
  7. params: Params,
  8. ) -> Result<WsResponseResult, WsError> {
  9. let sub_id = params.id.clone();
  10. if context.subscriptions.contains_key(&sub_id) {
  11. // Subscription ID already exits. Returns an error instead of
  12. // replacing the other subscription or avoiding it.
  13. return Err(WsError::InvalidParams);
  14. }
  15. let params: IndexableParams = params.into();
  16. let mut subscription = context
  17. .state
  18. .mint
  19. .pubsub_manager()
  20. .try_subscribe(params)
  21. .await
  22. .map_err(|_| WsError::ParseError)?;
  23. let publisher = context.publisher.clone();
  24. context.subscriptions.insert(
  25. sub_id.clone(),
  26. tokio::spawn(async move {
  27. while let Some(response) = subscription.recv().await {
  28. let _ = publisher.send(response).await;
  29. }
  30. }),
  31. );
  32. Ok(WsSubscribeResponse {
  33. status: "OK".to_string(),
  34. sub_id,
  35. }
  36. .into())
  37. }