|
|
@@ -3,9 +3,10 @@
|
|
|
//! Consumers are designed to connect to a producer, through a transport, and subscribe to events.
|
|
|
use std::collections::{HashMap, VecDeque};
|
|
|
use std::sync::atomic::AtomicBool;
|
|
|
-use std::sync::{Arc, RwLock};
|
|
|
+use std::sync::Arc;
|
|
|
use std::time::Duration;
|
|
|
|
|
|
+use parking_lot::RwLock;
|
|
|
use tokio::sync::mpsc;
|
|
|
use tokio::time::{sleep, Instant};
|
|
|
|
|
|
@@ -162,11 +163,7 @@ where
|
|
|
X: Into<S::Event>,
|
|
|
{
|
|
|
let event = event.into();
|
|
|
- let mut cached_events = self.cached_events.write().unwrap_or_else(|mut err| {
|
|
|
- **err.get_mut() = HashMap::new();
|
|
|
- self.cached_events.clear_poison();
|
|
|
- err.into_inner()
|
|
|
- });
|
|
|
+ let mut cached_events = self.cached_events.write();
|
|
|
|
|
|
for topic in event.get_topics() {
|
|
|
cached_events.insert(topic, event.clone());
|
|
|
@@ -222,13 +219,7 @@ where
|
|
|
break;
|
|
|
}
|
|
|
|
|
|
- if instance
|
|
|
- .remote_subscriptions
|
|
|
- .read()
|
|
|
- .map(|x| x.len())
|
|
|
- .unwrap_or_default()
|
|
|
- == 0
|
|
|
- {
|
|
|
+ if instance.remote_subscriptions.read().is_empty() {
|
|
|
sleep(Duration::from_millis(100)).await;
|
|
|
continue;
|
|
|
}
|
|
|
@@ -242,22 +233,16 @@ where
|
|
|
let (sender, receiver) = mpsc::channel(INTERNAL_POLL_SIZE);
|
|
|
|
|
|
{
|
|
|
- *(instance.stream_ctrl.write().unwrap_or_else(|mut err| {
|
|
|
- **err.get_mut() = None;
|
|
|
- instance.stream_ctrl.clear_poison();
|
|
|
- err.into_inner()
|
|
|
- })) = Some(sender);
|
|
|
+ *instance.stream_ctrl.write() = Some(sender);
|
|
|
}
|
|
|
|
|
|
let current_subscriptions = {
|
|
|
- if let Ok(remote_subscriptions) = instance.remote_subscriptions.read() {
|
|
|
- remote_subscriptions
|
|
|
- .iter()
|
|
|
- .map(|(key, name)| (name.name.clone(), key.clone()))
|
|
|
- .collect::<Vec<_>>()
|
|
|
- } else {
|
|
|
- vec![]
|
|
|
- }
|
|
|
+ instance
|
|
|
+ .remote_subscriptions
|
|
|
+ .read()
|
|
|
+ .iter()
|
|
|
+ .map(|(key, name)| (name.name.clone(), key.clone()))
|
|
|
+ .collect::<Vec<_>>()
|
|
|
};
|
|
|
|
|
|
if let Err(err) = instance
|
|
|
@@ -285,27 +270,17 @@ where
|
|
|
}
|
|
|
|
|
|
// remove sender to stream, as there is no stream
|
|
|
- let _ = instance
|
|
|
- .stream_ctrl
|
|
|
- .write()
|
|
|
- .unwrap_or_else(|mut err| {
|
|
|
- **err.get_mut() = None;
|
|
|
- instance.stream_ctrl.clear_poison();
|
|
|
- err.into_inner()
|
|
|
- })
|
|
|
- .take();
|
|
|
+ let _ = instance.stream_ctrl.write().take();
|
|
|
}
|
|
|
|
|
|
if poll_supported {
|
|
|
let current_subscriptions = {
|
|
|
- if let Ok(remote_subscriptions) = instance.remote_subscriptions.read() {
|
|
|
- remote_subscriptions
|
|
|
- .iter()
|
|
|
- .map(|(key, name)| (name.name.clone(), key.clone()))
|
|
|
- .collect::<Vec<_>>()
|
|
|
- } else {
|
|
|
- vec![]
|
|
|
- }
|
|
|
+ instance
|
|
|
+ .remote_subscriptions
|
|
|
+ .read()
|
|
|
+ .iter()
|
|
|
+ .map(|(key, name)| (name.name.clone(), key.clone()))
|
|
|
+ .collect::<Vec<_>>()
|
|
|
};
|
|
|
|
|
|
if let Err(err) = instance
|
|
|
@@ -339,14 +314,10 @@ where
|
|
|
let topics = self
|
|
|
.subscriptions
|
|
|
.write()
|
|
|
- .map_err(|_| Error::Poison)?
|
|
|
.remove(&subscription_name)
|
|
|
.ok_or(Error::AlreadySubscribed)?;
|
|
|
|
|
|
- let mut remote_subscriptions = self
|
|
|
- .remote_subscriptions
|
|
|
- .write()
|
|
|
- .map_err(|_| Error::Poison)?;
|
|
|
+ let mut remote_subscriptions = self.remote_subscriptions.write();
|
|
|
|
|
|
for topic in topics {
|
|
|
let mut remote_subscription =
|
|
|
@@ -362,11 +333,7 @@ where
|
|
|
.unwrap_or_default();
|
|
|
|
|
|
if remote_subscription.total_subscribers == 0 {
|
|
|
- let mut cached_events = self.cached_events.write().unwrap_or_else(|mut err| {
|
|
|
- **err.get_mut() = HashMap::new();
|
|
|
- self.cached_events.clear_poison();
|
|
|
- err.into_inner()
|
|
|
- });
|
|
|
+ let mut cached_events = self.cached_events.write();
|
|
|
|
|
|
cached_events.remove(&topic);
|
|
|
|
|
|
@@ -385,7 +352,7 @@ where
|
|
|
|
|
|
#[inline(always)]
|
|
|
fn message_to_stream(&self, message: StreamCtrl<T::Spec>) -> Result<(), Error> {
|
|
|
- let to_stream = self.stream_ctrl.read().map_err(|_| Error::Poison)?;
|
|
|
+ let to_stream = self.stream_ctrl.read();
|
|
|
|
|
|
if let Some(to_stream) = to_stream.as_ref() {
|
|
|
Ok(to_stream.try_send(message)?)
|
|
|
@@ -414,21 +381,15 @@ where
|
|
|
let subscription_name = request.subscription_name();
|
|
|
let topics = request.try_get_topics()?;
|
|
|
|
|
|
- let mut remote_subscriptions = self
|
|
|
- .remote_subscriptions
|
|
|
- .write()
|
|
|
- .map_err(|_| Error::Poison)?;
|
|
|
- let mut subscriptions = self.subscriptions.write().map_err(|_| Error::Poison)?;
|
|
|
+ let mut remote_subscriptions = self.remote_subscriptions.write();
|
|
|
+ let mut subscriptions = self.subscriptions.write();
|
|
|
|
|
|
if subscriptions.get(&subscription_name).is_some() {
|
|
|
return Err(Error::AlreadySubscribed);
|
|
|
}
|
|
|
|
|
|
let mut previous_messages = Vec::new();
|
|
|
- let cached_events = self.cached_events.read().unwrap_or_else(|e| {
|
|
|
- self.cached_events.clear_poison();
|
|
|
- e.into_inner()
|
|
|
- });
|
|
|
+ let cached_events = self.cached_events.read();
|
|
|
|
|
|
for topic in topics.iter() {
|
|
|
if let Some(subscription) = remote_subscriptions.get_mut(topic) {
|
|
|
@@ -470,7 +431,7 @@ where
|
|
|
fn drop(&mut self) {
|
|
|
self.still_running
|
|
|
.store(false, std::sync::atomic::Ordering::Release);
|
|
|
- if let Ok(Some(to_stream)) = self.stream_ctrl.read().map(|sender| sender.clone()) {
|
|
|
+ if let Some(to_stream) = self.stream_ctrl.read().as_ref() {
|
|
|
let _ = to_stream.try_send(StreamCtrl::Stop).inspect_err(|err| {
|
|
|
tracing::error!("Failed to send message LongPoll::Stop due to {err:?}")
|
|
|
});
|