use crate::Error; use futures_util::{SinkExt, StreamExt}; use nostr_rs_types::{Request, Response}; use parking_lot::RwLock; use std::collections::HashMap; use tokio::{ net::TcpStream, sync::mpsc::{channel, Receiver, Sender}, }; use tokio_tungstenite::{accept_async, tungstenite::Message, WebSocketStream}; pub struct Connection { pub(crate) conn_id: u128, sender: Sender, subscriptions: RwLock>, } const MAX_SUBSCRIPTIONS_BUFFER: usize = 100; impl Connection { pub async fn new( broadcast_request: Sender<(u128, Request)>, stream: TcpStream, ) -> Result { let websocket = accept_async(stream).await?; let conn_id = 0; let (sender, receiver) = channel(MAX_SUBSCRIPTIONS_BUFFER); Self::spawn(broadcast_request, websocket, receiver, conn_id); Ok(Self { conn_id, sender, subscriptions: RwLock::new(HashMap::new()), }) } pub fn spawn( broadcast_request: Sender<(u128, Request)>, websocket: WebSocketStream, mut receiver: Receiver, conn_id: u128, ) { tokio::spawn(async move { let mut _subscriptions: HashMap)> = HashMap::new(); let (mut writer, mut reader) = websocket.split(); loop { tokio::select! { Some(msg) = receiver.recv() => { let msg = serde_json::to_string(&msg).unwrap(); writer.send(Message::Text(msg.into())).await.unwrap(); } Some(msg) = reader.next() => { if let Ok(Message::Text(msg)) = msg { let msg: Result = serde_json::from_str(&msg); if let Ok(msg) = msg { let _ = broadcast_request.send((conn_id, msg)).await; } } } else => { break; } } } }); } pub async fn send(&self, _response: Response) -> Result<(), Error> { Ok(()) } pub fn create_connection(&self, id: String) -> Result<(u128, u128, Sender), Error> { let mut subscriptions = self.subscriptions.write(); if subscriptions.contains_key(&id) { return Err(Error::IdentifierAlreadyUsed(id)); } let internal_id = 0; subscriptions.insert(id, internal_id); Ok((self.conn_id, internal_id, self.sender.clone())) } }