|
@@ -1,12 +1,12 @@
|
|
|
-use super::filter::{Key, SortedFilter};
|
|
|
use crate::connection::ConnectionId;
|
|
|
+use nostr_rs_storage_base::{EventFilter, Index, SingleIndex};
|
|
|
use nostr_rs_types::{
|
|
|
client::Subscribe,
|
|
|
types::{Event, SubscriptionId},
|
|
|
Response,
|
|
|
};
|
|
|
use std::{
|
|
|
- collections::{BTreeMap, BTreeSet},
|
|
|
+ collections::{BTreeMap, HashSet},
|
|
|
sync::{
|
|
|
atomic::{AtomicUsize, Ordering},
|
|
|
Arc,
|
|
@@ -14,7 +14,7 @@ use std::{
|
|
|
};
|
|
|
use tokio::sync::{mpsc::Sender, RwLock};
|
|
|
|
|
|
-type SubIdx = (Key, ConnectionId, SubscriptionId);
|
|
|
+type SubIdx = (SingleIndex, ConnectionId, SubscriptionId);
|
|
|
|
|
|
pub const MIN_PREFIX_MATCH_LEN: usize = 2;
|
|
|
|
|
@@ -27,7 +27,7 @@ pub const MIN_PREFIX_MATCH_LEN: usize = 2;
|
|
|
pub struct ActiveSubscription {
|
|
|
conn_id: ConnectionId,
|
|
|
name: SubscriptionId,
|
|
|
- keys: Vec<Key>,
|
|
|
+ indexes: Vec<SingleIndex>,
|
|
|
manager: Arc<SubscriptionManager>,
|
|
|
}
|
|
|
|
|
@@ -35,13 +35,13 @@ impl ActiveSubscription {
|
|
|
fn new(
|
|
|
conn_id: ConnectionId,
|
|
|
name: SubscriptionId,
|
|
|
- keys: Vec<Key>,
|
|
|
+ index: Index,
|
|
|
manager: Arc<SubscriptionManager>,
|
|
|
) -> Self {
|
|
|
Self {
|
|
|
conn_id,
|
|
|
name,
|
|
|
- keys,
|
|
|
+ indexes: index.split(),
|
|
|
manager,
|
|
|
}
|
|
|
}
|
|
@@ -51,8 +51,8 @@ impl Drop for ActiveSubscription {
|
|
|
/// When the subscription is dropped, it will remove the listener from the
|
|
|
/// subscription manager
|
|
|
fn drop(&mut self) {
|
|
|
- let keys = self
|
|
|
- .keys
|
|
|
+ let indexes = self
|
|
|
+ .indexes
|
|
|
.drain(..)
|
|
|
.map(|key| (key, self.conn_id, self.name.clone()))
|
|
|
.collect::<Vec<_>>();
|
|
@@ -60,12 +60,12 @@ impl Drop for ActiveSubscription {
|
|
|
let manager = self.manager.clone();
|
|
|
|
|
|
tokio::spawn(async move {
|
|
|
- manager.unsubscribe(keys).await;
|
|
|
+ manager.unsubscribe(indexes).await;
|
|
|
});
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-type SubscriptionValue = (Arc<SortedFilter>, Sender<Response>);
|
|
|
+type SubscriptionValue = (Arc<EventFilter>, Sender<Response>);
|
|
|
|
|
|
/// Subscription manager
|
|
|
///
|
|
@@ -104,44 +104,19 @@ impl SubscriptionManager {
|
|
|
self.total_subscribers.fetch_sub(1, Ordering::Relaxed);
|
|
|
}
|
|
|
|
|
|
- fn get_keys_from_event(event: &Event, min_prefix_match_len: usize) -> Vec<Key> {
|
|
|
+ fn get_keys_from_event(event: &Event, _min_prefix_match_len: usize) -> Vec<SingleIndex> {
|
|
|
let mut subscriptions = vec![];
|
|
|
|
|
|
- let author = event.author().as_ref().to_vec();
|
|
|
- let id = event.id.as_ref().to_vec();
|
|
|
-
|
|
|
- for i in min_prefix_match_len..=author.len() {
|
|
|
- subscriptions.push(Key::Author(author[..i].to_vec(), None));
|
|
|
- subscriptions.push(Key::Author(author[..i].to_vec(), Some(event.kind())));
|
|
|
- }
|
|
|
+ subscriptions.push(SingleIndex::Author(event.author().to_owned()));
|
|
|
+ subscriptions.push(SingleIndex::Id(event.id.to_owned()));
|
|
|
|
|
|
for t in event.tags() {
|
|
|
- match t {
|
|
|
- nostr_rs_types::types::Tag::Event(ref_event) => {
|
|
|
- for i in min_prefix_match_len..=ref_event.id.len() {
|
|
|
- subscriptions.push(Key::RefId(ref_event.id[..i].to_vec(), None));
|
|
|
- subscriptions
|
|
|
- .push(Key::RefId(ref_event.id[..i].to_vec(), Some(event.kind())));
|
|
|
- }
|
|
|
- }
|
|
|
- nostr_rs_types::types::Tag::PubKey(ref_pub_key) => {
|
|
|
- for i in min_prefix_match_len..=ref_pub_key.id.len() {
|
|
|
- subscriptions.push(Key::RefId(ref_pub_key.id[..i].to_vec(), None));
|
|
|
- subscriptions
|
|
|
- .push(Key::RefId(ref_pub_key.id[..i].to_vec(), Some(event.kind())));
|
|
|
- }
|
|
|
- }
|
|
|
- _ => {}
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- for i in min_prefix_match_len..=id.len() {
|
|
|
- subscriptions.push(Key::Id(id[..i].to_vec()));
|
|
|
+ t.get_indexable_value()
|
|
|
+ .map(|v| subscriptions.push(SingleIndex::Tag(t.get_identifier().to_owned(), v)));
|
|
|
}
|
|
|
|
|
|
- subscriptions.push(Key::Kind(event.kind()));
|
|
|
- subscriptions.push(Key::AllUpdates);
|
|
|
-
|
|
|
+ subscriptions.push(SingleIndex::Kind(event.kind()));
|
|
|
+ subscriptions.push(SingleIndex::AllUpdates);
|
|
|
subscriptions
|
|
|
}
|
|
|
|
|
@@ -165,11 +140,13 @@ impl SubscriptionManager {
|
|
|
let subscriptions = request
|
|
|
.filters
|
|
|
.into_iter()
|
|
|
- .map(|filter| {
|
|
|
- let filter = Arc::new(SortedFilter::from(filter));
|
|
|
+ .map(|mut filter| {
|
|
|
+ let index: Index = (&mut filter).into();
|
|
|
+ let filter = Arc::new(EventFilter::from(filter));
|
|
|
let subscription =
|
|
|
- ActiveSubscription::new(conn_id, name.clone(), filter.keys(), self.clone());
|
|
|
- for key in subscription.keys.iter() {
|
|
|
+ ActiveSubscription::new(conn_id, name.clone(), index, self.clone());
|
|
|
+
|
|
|
+ for key in subscription.indexes.iter() {
|
|
|
subscriptions.insert(
|
|
|
(key.clone(), conn_id, name.clone()),
|
|
|
(filter.clone(), sender.clone()),
|
|
@@ -189,7 +166,7 @@ impl SubscriptionManager {
|
|
|
tokio::spawn(async move {
|
|
|
let subscriptions = this.subscriptions.read().await;
|
|
|
let subs = Self::get_keys_from_event(&event, this.min_prefix_match_len);
|
|
|
- let mut deliverded = BTreeSet::new();
|
|
|
+ let mut deliverded = HashSet::new();
|
|
|
|
|
|
for sub in subs {
|
|
|
for ((sub_type, client, name), (filter, sender)) in subscriptions.range(
|
|
@@ -203,7 +180,7 @@ impl SubscriptionManager {
|
|
|
break;
|
|
|
}
|
|
|
|
|
|
- if deliverded.contains(client) || !filter.check(&event) {
|
|
|
+ if deliverded.contains(client) || !filter.check_event(&event) {
|
|
|
continue;
|
|
|
}
|
|
|
|