123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149 |
- use crate::{get_id, Error};
- use futures_util::{SinkExt, StreamExt};
- use nostr_rs_types::{
- relayer::{Auth, ROk},
- types::Addr,
- Request, Response,
- };
- use parking_lot::RwLock;
- use std::collections::HashMap;
- use tokio::{
- net::TcpStream,
- sync::mpsc::{channel, Receiver, Sender},
- };
- #[allow(unused_imports)]
- use tokio_tungstenite::{accept_async, tungstenite::Message, WebSocketStream};
- #[derive(Debug)]
- pub struct Connection {
- #[allow(unused)]
- pub(crate) conn_id: u128,
- sender: Sender<Response>,
- subscriptions: RwLock<HashMap<String, u128>>,
- }
- const MAX_SUBSCRIPTIONS_BUFFER: usize = 100;
- impl Connection {
- #[cfg(test)]
- pub fn new_for_test() -> (Self, Receiver<Response>) {
- let (sender, receiver) = channel(MAX_SUBSCRIPTIONS_BUFFER);
- (
- Self {
- conn_id: 0,
- sender,
- subscriptions: RwLock::new(HashMap::new()),
- },
- receiver,
- )
- }
- pub async fn new(
- broadcast_request: Sender<(u128, Request)>,
- disconnection_notify: Option<Sender<u128>>,
- stream: TcpStream,
- ) -> Result<Self, Error> {
- let websocket = accept_async(stream).await?;
- let conn_id = get_id();
- let (sender, receiver) = channel(MAX_SUBSCRIPTIONS_BUFFER);
- Self::spawn(
- broadcast_request,
- websocket,
- receiver,
- disconnection_notify,
- conn_id,
- );
- let _ = sender.send(Auth::default().into()).await;
- Ok(Self {
- conn_id,
- sender,
- subscriptions: RwLock::new(HashMap::new()),
- })
- }
- #[allow(unused)]
- fn spawn(
- broadcast_request: Sender<(u128, Request)>,
- websocket: WebSocketStream<TcpStream>,
- mut receiver: Receiver<Response>,
- disconnection_notify: Option<Sender<u128>>,
- conn_id: u128,
- ) {
- tokio::spawn(async move {
- let mut _subscriptions: HashMap<String, (u128, Receiver<Response>)> = HashMap::new();
- let (mut writer, mut reader) = websocket.split();
- loop {
- tokio::select! {
- Some(msg) = receiver.recv() => {
- let msg = if let Ok(msg) = serde_json::to_string(&msg) {
- msg
- } else {
- continue;
- };
- if let Err(err) = writer.send(Message::Text(msg)).await {
- log::error!("Error sending message to client: {}", err);
- break;
- }
- }
- Some(msg) = reader.next() => {
- if let Ok(Message::Text(msg)) = msg {
- let msg: Result<Request, _> = serde_json::from_str(&msg);
- match msg {
- Ok(msg) => {
- let _ = broadcast_request.send((conn_id, msg)).await;
- },
- Err(err) => {
- log::error!("Error parsing message from client: {}", err);
- let reply: Response = ROk {
- id: Addr::default(),
- status: false,
- message: "Error parsing message".to_owned(),
- }.into();
- let reply = if let Ok(reply) = serde_json::to_string(&reply) {
- reply
- } else {
- continue;
- };
- if let Err(err) = writer.send(Message::Text(reply)).await {
- log::error!("Error sending message to client: {}", err);
- break;
- }
- }
- };
- }
- }
- else => {
- break;
- }
- }
- }
- if let Some(disconnection_notify) = disconnection_notify {
- let _ = disconnection_notify.try_send(conn_id);
- }
- });
- }
- #[inline]
- pub fn send(&self, response: Response) -> Result<(), Error> {
- self.sender
- .try_send(response)
- .map_err(|e| Error::TrySendError(Box::new(e)))
- }
- #[inline]
- pub fn get_sender(&self) -> Sender<Response> {
- self.sender.clone()
- }
- pub fn get_subscription_id(&self, id: &str) -> Option<u128> {
- let subscriptions = self.subscriptions.read();
- subscriptions.get(id).copied()
- }
- pub fn create_subscription(&self, id: String) -> (u128, Sender<Response>) {
- let mut subscriptions = self.subscriptions.write();
- let internal_id = subscriptions.entry(id).or_insert_with(get_id);
- (*internal_id, self.sender.clone())
- }
- }
|