|
@@ -12,6 +12,7 @@ use tokio::{
|
|
mpsc::{channel, Receiver, Sender},
|
|
mpsc::{channel, Receiver, Sender},
|
|
RwLock,
|
|
RwLock,
|
|
},
|
|
},
|
|
|
|
+ task::JoinHandle,
|
|
};
|
|
};
|
|
#[allow(unused_imports)]
|
|
#[allow(unused_imports)]
|
|
use tokio_tungstenite::{accept_async, tungstenite::Message, WebSocketStream};
|
|
use tokio_tungstenite::{accept_async, tungstenite::Message, WebSocketStream};
|
|
@@ -22,10 +23,19 @@ pub struct Connection {
|
|
pub(crate) conn_id: u128,
|
|
pub(crate) conn_id: u128,
|
|
sender: Sender<Response>,
|
|
sender: Sender<Response>,
|
|
subscriptions: RwLock<HashMap<String, u128>>,
|
|
subscriptions: RwLock<HashMap<String, u128>>,
|
|
|
|
+ handler: Option<JoinHandle<()>>,
|
|
}
|
|
}
|
|
|
|
|
|
const MAX_SUBSCRIPTIONS_BUFFER: usize = 100;
|
|
const MAX_SUBSCRIPTIONS_BUFFER: usize = 100;
|
|
|
|
|
|
|
|
+impl Drop for Connection {
|
|
|
|
+ fn drop(&mut self) {
|
|
|
|
+ if let Some(handler) = self.handler.take() {
|
|
|
|
+ let _ = handler.abort();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+}
|
|
|
|
+
|
|
impl Connection {
|
|
impl Connection {
|
|
#[cfg(test)]
|
|
#[cfg(test)]
|
|
pub fn new_for_test() -> (Self, Receiver<Response>) {
|
|
pub fn new_for_test() -> (Self, Receiver<Response>) {
|
|
@@ -35,12 +45,13 @@ impl Connection {
|
|
conn_id: 0,
|
|
conn_id: 0,
|
|
sender,
|
|
sender,
|
|
subscriptions: RwLock::new(HashMap::new()),
|
|
subscriptions: RwLock::new(HashMap::new()),
|
|
|
|
+ handler: None,
|
|
},
|
|
},
|
|
receiver,
|
|
receiver,
|
|
)
|
|
)
|
|
}
|
|
}
|
|
|
|
|
|
- pub async fn new(
|
|
|
|
|
|
+ pub async fn new_connection(
|
|
broadcast_request: Sender<(u128, Request)>,
|
|
broadcast_request: Sender<(u128, Request)>,
|
|
disconnection_notify: Option<Sender<u128>>,
|
|
disconnection_notify: Option<Sender<u128>>,
|
|
stream: TcpStream,
|
|
stream: TcpStream,
|
|
@@ -48,29 +59,28 @@ impl Connection {
|
|
let websocket = accept_async(stream).await?;
|
|
let websocket = accept_async(stream).await?;
|
|
let conn_id = get_id();
|
|
let conn_id = get_id();
|
|
let (sender, receiver) = channel(MAX_SUBSCRIPTIONS_BUFFER);
|
|
let (sender, receiver) = channel(MAX_SUBSCRIPTIONS_BUFFER);
|
|
- Self::spawn(
|
|
|
|
- broadcast_request,
|
|
|
|
- websocket,
|
|
|
|
- receiver,
|
|
|
|
- disconnection_notify,
|
|
|
|
- conn_id,
|
|
|
|
- );
|
|
|
|
let _ = sender.send(Auth::default().into()).await;
|
|
let _ = sender.send(Auth::default().into()).await;
|
|
Ok(Self {
|
|
Ok(Self {
|
|
conn_id,
|
|
conn_id,
|
|
sender,
|
|
sender,
|
|
subscriptions: RwLock::new(HashMap::new()),
|
|
subscriptions: RwLock::new(HashMap::new()),
|
|
|
|
+ handler: Some(Self::spawn(
|
|
|
|
+ broadcast_request,
|
|
|
|
+ websocket,
|
|
|
|
+ receiver,
|
|
|
|
+ disconnection_notify,
|
|
|
|
+ conn_id,
|
|
|
|
+ )),
|
|
})
|
|
})
|
|
}
|
|
}
|
|
|
|
|
|
- #[allow(unused)]
|
|
|
|
fn spawn(
|
|
fn spawn(
|
|
- broadcast_request: Sender<(u128, Request)>,
|
|
|
|
|
|
+ publish_message_from_client: Sender<(u128, Request)>,
|
|
websocket: WebSocketStream<TcpStream>,
|
|
websocket: WebSocketStream<TcpStream>,
|
|
mut receiver: Receiver<Response>,
|
|
mut receiver: Receiver<Response>,
|
|
disconnection_notify: Option<Sender<u128>>,
|
|
disconnection_notify: Option<Sender<u128>>,
|
|
conn_id: u128,
|
|
conn_id: u128,
|
|
- ) {
|
|
|
|
|
|
+ ) -> JoinHandle<()> {
|
|
tokio::spawn(async move {
|
|
tokio::spawn(async move {
|
|
let mut _subscriptions: HashMap<String, (u128, Receiver<Response>)> = HashMap::new();
|
|
let mut _subscriptions: HashMap<String, (u128, Receiver<Response>)> = HashMap::new();
|
|
let (mut writer, mut reader) = websocket.split();
|
|
let (mut writer, mut reader) = websocket.split();
|
|
@@ -92,7 +102,7 @@ impl Connection {
|
|
let msg: Result<Request, _> = serde_json::from_str(&msg);
|
|
let msg: Result<Request, _> = serde_json::from_str(&msg);
|
|
match msg {
|
|
match msg {
|
|
Ok(msg) => {
|
|
Ok(msg) => {
|
|
- let _ = broadcast_request.send((conn_id, msg)).await;
|
|
|
|
|
|
+ let _ = publish_message_from_client.send((conn_id, msg)).await;
|
|
},
|
|
},
|
|
Err(err) => {
|
|
Err(err) => {
|
|
log::error!("Error parsing message from client: {}", err);
|
|
log::error!("Error parsing message from client: {}", err);
|
|
@@ -123,7 +133,7 @@ impl Connection {
|
|
if let Some(disconnection_notify) = disconnection_notify {
|
|
if let Some(disconnection_notify) = disconnection_notify {
|
|
let _ = disconnection_notify.try_send(conn_id);
|
|
let _ = disconnection_notify.try_send(conn_id);
|
|
}
|
|
}
|
|
- });
|
|
|
|
|
|
+ })
|
|
}
|
|
}
|
|
|
|
|
|
#[inline]
|
|
#[inline]
|