瀏覽代碼

Fix missed events race when creating subscriptions

Previously, we fetched the initial state *before* registering the new
subscription. Any events emitted after the DB read but before the
subscription was installed were dropped—most visible under
low-resource conditions (e.g., CI).

Change:
- Register the subscription first, then asynchronously fetch and send
  the initial state (spawned task). This eliminates the window where
  events could be missed.
- Require `F: Send + Sync` and store `on_new_subscription` as `Arc<F>`
  so it can be safely used from the spawned task.

Result:
- No gap between “subscribe” and “start receiving,” avoiding lost events.
- Initial state still delivered, now via a background task.

Implementation highlights (see diff):
- `on_new_subscription: Option<Arc<F>>`
- Added `Send + Sync` bounds to `F`
- Move initial-state fetch into `tokio::spawn` after index insertion
Cesar Rodas 1 月之前
父節點
當前提交
83f2f6b07a
共有 1 個文件被更改,包括 39 次插入26 次删除
  1. 39 26
      crates/cdk/src/pub_sub.rs

+ 39 - 26
crates/cdk/src/pub_sub.rs

@@ -41,10 +41,10 @@ pub struct Manager<T, I, F>
 where
     T: Indexable<Type = I> + Clone + Send + Sync + 'static,
     I: PartialOrd + Clone + Debug + Ord + Send + Sync + 'static,
-    F: OnNewSubscription<Index = I, Event = T> + 'static,
+    F: OnNewSubscription<Index = I, Event = T> + Send + Sync + 'static,
 {
     indexes: IndexTree<T, I>,
-    on_new_subscription: Option<F>,
+    on_new_subscription: Option<Arc<F>>,
     unsubscription_sender: mpsc::Sender<(SubId, Vec<Index<I>>)>,
     active_subscriptions: Arc<AtomicUsize>,
     background_subscription_remover: Option<JoinHandle<()>>,
@@ -54,7 +54,7 @@ impl<T, I, F> Default for Manager<T, I, F>
 where
     T: Indexable<Type = I> + Clone + Send + Sync + 'static,
     I: PartialOrd + Clone + Debug + Ord + Send + Sync + 'static,
-    F: OnNewSubscription<Index = I, Event = T> + 'static,
+    F: OnNewSubscription<Index = I, Event = T> + Send + Sync + 'static,
 {
     fn default() -> Self {
         let (sender, receiver) = mpsc::channel(DEFAULT_REMOVE_SIZE);
@@ -79,11 +79,11 @@ impl<T, I, F> From<F> for Manager<T, I, F>
 where
     T: Indexable<Type = I> + Clone + Send + Sync + 'static,
     I: PartialOrd + Clone + Debug + Ord + Send + Sync + 'static,
-    F: OnNewSubscription<Index = I, Event = T> + 'static,
+    F: OnNewSubscription<Index = I, Event = T> + Send + Sync + 'static,
 {
     fn from(value: F) -> Self {
         let mut manager: Self = Default::default();
-        manager.on_new_subscription = Some(value);
+        manager.on_new_subscription = Some(Arc::new(value));
         manager
     }
 }
@@ -92,7 +92,7 @@ impl<T, I, F> Manager<T, I, F>
 where
     T: Indexable<Type = I> + Clone + Send + Sync + 'static,
     I: PartialOrd + Clone + Debug + Ord + Send + Sync + 'static,
-    F: OnNewSubscription<Index = I, Event = T> + 'static,
+    F: OnNewSubscription<Index = I, Event = T> + Send + Sync + 'static,
 {
     #[inline]
     /// Broadcast an event to all listeners
@@ -143,32 +143,45 @@ where
         indexes: Vec<Index<I>>,
     ) -> ActiveSubscription<T, I> {
         let (sender, receiver) = mpsc::channel(10);
-        if let Some(on_new_subscription) = self.on_new_subscription.as_ref() {
-            match on_new_subscription
-                .on_new_subscription(&indexes.iter().map(|x| x.deref()).collect::<Vec<_>>())
-                .await
-            {
-                Ok(events) => {
-                    for event in events {
-                        let _ = sender.try_send((sub_id.clone(), event));
-                    }
-                }
-                Err(err) => {
-                    tracing::info!(
-                        "Failed to get initial state for subscription: {:?}, {}",
-                        sub_id,
-                        err
-                    );
-                }
-            }
-        }
 
         let mut index_storage = self.indexes.write().await;
+        // Subscribe to events as soon as possible
         for index in indexes.clone() {
             index_storage.insert(index, sender.clone());
         }
         drop(index_storage);
 
+        if let Some(on_new_subscription) = self.on_new_subscription.clone() {
+            // After we're subscribed already, fetch the current status of matching events. It is
+            // down in another thread to return right away
+            let indexes_for_worker = indexes.clone();
+            let sub_id_for_worker = sub_id.clone();
+            tokio::spawn(async move {
+                match on_new_subscription
+                    .on_new_subscription(
+                        &indexes_for_worker
+                            .iter()
+                            .map(|x| x.deref())
+                            .collect::<Vec<_>>(),
+                    )
+                    .await
+                {
+                    Ok(events) => {
+                        for event in events {
+                            let _ = sender.try_send((sub_id_for_worker.clone(), event));
+                        }
+                    }
+                    Err(err) => {
+                        tracing::info!(
+                            "Failed to get initial state for subscription: {:?}, {}",
+                            sub_id_for_worker,
+                            err
+                        );
+                    }
+                }
+            });
+        }
+
         self.active_subscriptions
             .fetch_add(1, atomic::Ordering::Relaxed);
 
@@ -232,7 +245,7 @@ impl<T, I, F> Drop for Manager<T, I, F>
 where
     T: Indexable<Type = I> + Clone + Send + Sync + 'static,
     I: Clone + Debug + PartialOrd + Ord + Send + Sync + 'static,
-    F: OnNewSubscription<Index = I, Event = T> + 'static,
+    F: OnNewSubscription<Index = I, Event = T> + Send + Sync + 'static,
 {
     fn drop(&mut self) {
         if let Some(handler) = self.background_subscription_remover.take() {