connection.rs 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. use crate::Error;
  2. use futures_util::{SinkExt, StreamExt};
  3. use nostr_rs_types::{Request, Response};
  4. use parking_lot::RwLock;
  5. use std::collections::HashMap;
  6. use tokio::{
  7. net::TcpStream,
  8. sync::mpsc::{channel, Receiver, Sender},
  9. };
  10. use tokio_tungstenite::{accept_async, tungstenite::Message, WebSocketStream};
  11. pub struct Connection {
  12. pub(crate) conn_id: u128,
  13. sender: Sender<Response>,
  14. subscriptions: RwLock<HashMap<String, u128>>,
  15. }
  16. const MAX_SUBSCRIPTIONS_BUFFER: usize = 100;
  17. impl Connection {
  18. pub async fn new(
  19. broadcast_request: Sender<(u128, Request)>,
  20. stream: TcpStream,
  21. ) -> Result<Self, Error> {
  22. let websocket = accept_async(stream).await?;
  23. let conn_id = 0;
  24. let (sender, receiver) = channel(MAX_SUBSCRIPTIONS_BUFFER);
  25. Self::spawn(broadcast_request, websocket, receiver, conn_id);
  26. Ok(Self {
  27. conn_id,
  28. sender,
  29. subscriptions: RwLock::new(HashMap::new()),
  30. })
  31. }
  32. pub fn spawn(
  33. broadcast_request: Sender<(u128, Request)>,
  34. websocket: WebSocketStream<TcpStream>,
  35. mut receiver: Receiver<Response>,
  36. conn_id: u128,
  37. ) {
  38. tokio::spawn(async move {
  39. let mut _subscriptions: HashMap<String, (u128, Receiver<Response>)> = HashMap::new();
  40. let (mut writer, mut reader) = websocket.split();
  41. loop {
  42. tokio::select! {
  43. Some(msg) = receiver.recv() => {
  44. let msg = serde_json::to_string(&msg).unwrap();
  45. writer.send(Message::Text(msg.into())).await.unwrap();
  46. }
  47. Some(msg) = reader.next() => {
  48. if let Ok(Message::Text(msg)) = msg {
  49. let msg: Result<Request, _> = serde_json::from_str(&msg);
  50. if let Ok(msg) = msg {
  51. let _ = broadcast_request.send((conn_id, msg)).await;
  52. }
  53. }
  54. }
  55. else => {
  56. break;
  57. }
  58. }
  59. }
  60. });
  61. }
  62. pub async fn send(&self, _response: Response) -> Result<(), Error> {
  63. Ok(())
  64. }
  65. pub fn create_connection(&self, id: String) -> Result<(u128, u128, Sender<Response>), Error> {
  66. let mut subscriptions = self.subscriptions.write();
  67. if subscriptions.contains_key(&id) {
  68. return Err(Error::IdentifierAlreadyUsed(id));
  69. }
  70. let internal_id = 0;
  71. subscriptions.insert(id, internal_id);
  72. Ok((self.conn_id, internal_id, self.sender.clone()))
  73. }
  74. }