Selaa lähdekoodia

Internal improvement to Pub/Sub (#19)

Instead of using Unbounded channels, add a cap for 1,000 messages in
the buffer. When the buffer is full, messages will be dropped. For this
to happen a connection in subscribe mode should have more than 1,000
other messages pending delivery. After that point, messages will be
dropped to avoid using too much memory.

In general it is a bad practice to have unlimited buffers lying around
in the codebase.
César D. Rodas 3 vuotta sitten
vanhempi
säilyke
fd3acd9bff

+ 3 - 3
src/cmd/mod.rs

@@ -23,7 +23,7 @@ mod test {
         net::{IpAddr, Ipv4Addr, SocketAddr},
         sync::Arc,
     };
-    use tokio::sync::mpsc::UnboundedReceiver;
+    use tokio::sync::mpsc::Receiver;
 
     pub fn create_connection() -> Arc<Connection> {
         let db = Arc::new(Db::new(1000));
@@ -34,7 +34,7 @@ mod test {
         all_connections.new_connection(db.clone(), client).1
     }
 
-    pub fn create_connection_and_pubsub() -> (UnboundedReceiver<Value>, Arc<Connection>) {
+    pub fn create_connection_and_pubsub() -> (Receiver<Value>, Arc<Connection>) {
         let db = Arc::new(Db::new(1000));
         let all_connections = Arc::new(Connections::new(db.clone()));
 
@@ -45,7 +45,7 @@ mod test {
 
     pub fn create_new_connection_from_connection(
         conn: &Connection,
-    ) -> (UnboundedReceiver<Value>, Arc<Connection>) {
+    ) -> (Receiver<Value>, Arc<Connection>) {
         let all_connections = conn.all_connections();
 
         let client = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);

+ 2 - 2
src/cmd/pubsub.rs

@@ -94,12 +94,12 @@ mod test {
         },
         value::Value,
     };
-    use tokio::sync::mpsc::UnboundedReceiver;
+    use tokio::sync::mpsc::Receiver;
 
     async fn test_subscription_confirmation_and_first_message(
         msg: &str,
         channel: &str,
-        recv: &mut UnboundedReceiver<Value>,
+        recv: &mut Receiver<Value>,
     ) {
         assert_eq!(
             Some(Value::Array(vec![

+ 2 - 2
src/connection/connections.rs

@@ -58,11 +58,11 @@ impl Connections {
         self: &Arc<Connections>,
         db: Arc<Db>,
         addr: SocketAddr,
-    ) -> (mpsc::UnboundedReceiver<Value>, Arc<Connection>) {
+    ) -> (mpsc::Receiver<Value>, Arc<Connection>) {
         let mut id = self.counter.write();
         *id += 1;
 
-        let (pubsub_sender, pubsub_receiver) = mpsc::unbounded_channel();
+        let (pubsub_sender, pubsub_receiver) = mpsc::channel(1_000);
 
         let conn = Arc::new(Connection {
             id: *id,

+ 3 - 3
src/connection/pubsub_connection.rs

@@ -13,7 +13,7 @@ use tokio::sync::mpsc;
 #[derive(Debug)]
 pub struct PubsubClient {
     meta: RwLock<MetaData>,
-    sender: mpsc::UnboundedSender<Value>,
+    sender: mpsc::Sender<Value>,
 }
 
 /// Metadata associated with a pubsub client
@@ -27,7 +27,7 @@ struct MetaData {
 
 impl PubsubClient {
     /// Creates a new pubsub client instance
-    pub fn new(sender: mpsc::UnboundedSender<Value>) -> Self {
+    pub fn new(sender: mpsc::Sender<Value>) -> Self {
         Self {
             meta: RwLock::new(MetaData {
                 subscriptions: HashMap::new(),
@@ -115,7 +115,7 @@ impl PubsubClient {
 
     /// Returns a copy of the pubsub sender. This sender object can be used to send messages (from
     /// other connections) to this connection.
-    pub fn sender(&self) -> mpsc::UnboundedSender<Value> {
+    pub fn sender(&self) -> mpsc::Sender<Value> {
         self.sender.clone()
     }
 }

+ 16 - 12
src/connection/pubsub_server.rs

@@ -8,7 +8,7 @@ use parking_lot::RwLock;
 use std::collections::HashMap;
 use tokio::sync::mpsc;
 
-type Sender = mpsc::UnboundedSender<Value>;
+type Sender = mpsc::Sender<Value>;
 type Subscription = HashMap<u128, Sender>;
 
 /// Pubsub global server structure
@@ -76,7 +76,7 @@ impl Pubsub {
                 *psubs += 1;
             }
 
-            let _ = conn.pubsub_client().sender().send(
+            let _ = conn.pubsub_client().sender().try_send(
                 vec![
                     "psubscribe".into(),
                     Value::Blob(bytes_channel.clone()),
@@ -96,12 +96,16 @@ impl Pubsub {
 
         if let Some(subs) = self.subscriptions.read().get(channel) {
             for sender in subs.values() {
-                let _ = sender.send(Value::Array(vec![
-                    "message".into(),
-                    Value::Blob(channel.clone()),
-                    Value::Blob(message.clone()),
-                ]));
-                i += 1;
+                if sender
+                    .try_send(Value::Array(vec![
+                        "message".into(),
+                        Value::Blob(channel.clone()),
+                        Value::Blob(message.clone()),
+                    ]))
+                    .is_ok()
+                {
+                    i += 1;
+                }
             }
         }
 
@@ -113,7 +117,7 @@ impl Pubsub {
             }
 
             for sub in subs.values() {
-                let _ = sub.send(Value::Array(vec![
+                let _ = sub.try_send(Value::Array(vec![
                     "pmessage".into(),
                     pattern.as_str().into(),
                     Value::Blob(channel.clone()),
@@ -136,7 +140,7 @@ impl Pubsub {
             .map(|channel| {
                 if let Some(subs) = all_subs.get_mut(channel) {
                     if let Some(sender) = subs.remove(&conn_id) {
-                        let _ = sender.send(Value::Array(vec![
+                        let _ = sender.try_send(Value::Array(vec![
                             "punsubscribe".into(),
                             channel.as_str().into(),
                             1.into(),
@@ -168,7 +172,7 @@ impl Pubsub {
                     subscriptions.insert(channel.clone(), h);
                 }
 
-                let _ = conn.pubsub_client().sender().send(
+                let _ = conn.pubsub_client().sender().try_send(
                     vec![
                         "subscribe".into(),
                         Value::Blob(channel.clone()),
@@ -190,7 +194,7 @@ impl Pubsub {
             .map(|channel| {
                 if let Some(subs) = all_subs.get_mut(channel) {
                     if let Some(sender) = subs.remove(&conn_id) {
-                        let _ = sender.send(Value::Array(vec![
+                        let _ = sender.try_send(Value::Array(vec![
                             "unsubscribe".into(),
                             Value::Blob(channel.clone()),
                             1.into(),