123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081 |
- use crate::Error;
- use futures_util::{SinkExt, StreamExt};
- use nostr_rs_types::{Request, Response};
- use parking_lot::RwLock;
- use std::collections::HashMap;
- use tokio::{
- net::TcpStream,
- sync::mpsc::{channel, Receiver, Sender},
- };
- use tokio_tungstenite::{accept_async, tungstenite::Message, WebSocketStream};
- pub struct Connection {
- pub(crate) conn_id: u128,
- sender: Sender<Response>,
- subscriptions: RwLock<HashMap<String, u128>>,
- }
- const MAX_SUBSCRIPTIONS_BUFFER: usize = 100;
- impl Connection {
- pub async fn new(
- broadcast_request: Sender<(u128, Request)>,
- stream: TcpStream,
- ) -> Result<Self, Error> {
- let websocket = accept_async(stream).await?;
- let conn_id = 0;
- let (sender, receiver) = channel(MAX_SUBSCRIPTIONS_BUFFER);
- Self::spawn(broadcast_request, websocket, receiver, conn_id);
- Ok(Self {
- conn_id,
- sender,
- subscriptions: RwLock::new(HashMap::new()),
- })
- }
- pub fn spawn(
- broadcast_request: Sender<(u128, Request)>,
- websocket: WebSocketStream<TcpStream>,
- mut receiver: Receiver<Response>,
- conn_id: u128,
- ) {
- tokio::spawn(async move {
- let mut _subscriptions: HashMap<String, (u128, Receiver<Response>)> = HashMap::new();
- let (mut writer, mut reader) = websocket.split();
- loop {
- tokio::select! {
- Some(msg) = receiver.recv() => {
- let msg = serde_json::to_string(&msg).unwrap();
- writer.send(Message::Text(msg.into())).await.unwrap();
- }
- Some(msg) = reader.next() => {
- if let Ok(Message::Text(msg)) = msg {
- let msg: Result<Request, _> = serde_json::from_str(&msg);
- if let Ok(msg) = msg {
- let _ = broadcast_request.send((conn_id, msg)).await;
- }
- }
- }
- else => {
- break;
- }
- }
- }
- });
- }
- pub async fn send(&self, _response: Response) -> Result<(), Error> {
- Ok(())
- }
- pub fn create_connection(&self, id: String) -> Result<(u128, u128, Sender<Response>), Error> {
- let mut subscriptions = self.subscriptions.write();
- if subscriptions.contains_key(&id) {
- return Err(Error::IdentifierAlreadyUsed(id));
- }
- let internal_id = 0;
- subscriptions.insert(id, internal_id);
- Ok((self.conn_id, internal_id, self.sender.clone()))
- }
- }
|