Преглед изворни кода

Working on a better and most efficient matching struct for events

Use ConnectionId type instead of u128
Cesar Rodas пре 3 месеци
родитељ
комит
76f16725bf

+ 0 - 1
Cargo.lock

@@ -962,7 +962,6 @@ dependencies = [
  "nostr-rs-memory",
  "nostr-rs-storage-base",
  "nostr-rs-types",
- "rand",
  "serde_json",
  "thiserror",
  "tokio",

+ 0 - 1
crates/relayer/Cargo.toml

@@ -18,7 +18,6 @@ tokio-tungstenite = { version = "0.18.0", features = [
 ] }
 thiserror = "1.0.39"
 serde_json = "1.0.94"
-rand = "0.8.5"
 log = "0.4.17"
 futures = "0.3.30"
 

+ 24 - 8
crates/relayer/src/connection.rs

@@ -1,4 +1,4 @@
-use crate::{get_id, Error};
+use crate::Error;
 use futures_util::{SinkExt, StreamExt};
 use nostr_rs_types::{
     relayer::{Auth, ROk},
@@ -30,6 +30,22 @@ impl Default for ConnectionId {
     }
 }
 
+impl ConnectionId {
+    /// Connection ID for messages from the client pool or any special empty ConnectionId
+    #[inline]
+    pub fn new_empty() -> Self {
+        Self(0)
+    }
+
+    /// Check if the connection id is empty
+    ///
+    /// Empty connection id is used for messages from Client pool to the relayer
+    #[inline]
+    pub fn is_empty(&self) -> bool {
+        self.0 == 0
+    }
+}
+
 #[derive(Debug)]
 /// Relayer connection
 ///
@@ -70,12 +86,12 @@ impl Connection {
 
     /// Create new connection
     pub async fn new_connection(
-        send_message_to_relayer: Sender<(u128, Request)>,
-        disconnection_notify: Option<Sender<u128>>,
+        send_message_to_relayer: Sender<(ConnectionId, Request)>,
+        disconnection_notify: Option<Sender<ConnectionId>>,
         stream: TcpStream,
     ) -> Result<Self, Error> {
         let websocket = accept_async(stream).await?;
-        let conn_id = get_id();
+        let conn_id = Default::default();
         let (sender, receiver) = channel(MAX_SUBSCRIPTIONS_BUFFER);
         let _ = sender.send(Auth::default().into()).await;
         Ok(Self {
@@ -93,11 +109,11 @@ impl Connection {
     }
 
     fn spawn(
-        send_message_to_relayer: Sender<(u128, Request)>,
+        send_message_to_relayer: Sender<(ConnectionId, Request)>,
         websocket: WebSocketStream<TcpStream>,
         mut receiver: Receiver<Response>,
-        disconnection_notify: Option<Sender<u128>>,
-        conn_id: u128,
+        disconnection_notify: Option<Sender<ConnectionId>>,
+        conn_id: ConnectionId,
     ) -> JoinHandle<()> {
         tokio::spawn(async move {
             let mut _subscriptions: HashMap<String, (u128, Receiver<Response>)> = HashMap::new();
@@ -163,7 +179,7 @@ impl Connection {
     }
 
     /// Get subscription id for a given subscription id
-    pub async fn get_subscription_id(&self, id: &str) -> Option<u128> {
+    pub async fn get_subscription_id(&self, id: &str) -> Option<ConnectionId> {
         let subscriptions = self.subscriptions.read().await;
         subscriptions.get(id).copied()
     }

+ 0 - 15
crates/relayer/src/lib.rs

@@ -10,7 +10,6 @@
 //! updates.
 #![deny(missing_docs, warnings)]
 
-use rand::Rng;
 mod connection;
 mod error;
 mod relayer;
@@ -19,17 +18,3 @@ mod subscription;
 pub use self::{
     connection::Connection, error::Error, relayer::Relayer, subscription::Subscription,
 };
-
-// Get current nanoseconds and use the last 3 digits as a random number (because
-// sometimes it comes as 0)
-pub(crate) fn get_id() -> u128 {
-    let mut rng = rand::thread_rng();
-    let random_number = rng.gen_range(0..999);
-
-    let ts = std::time::SystemTime::now()
-        .duration_since(std::time::UNIX_EPOCH)
-        .expect("time")
-        .as_nanos();
-
-    ts.checked_add(random_number).unwrap_or(ts)
-}

+ 16 - 12
crates/relayer/src/relayer.rs

@@ -1,4 +1,4 @@
-use crate::{Connection, Error, Subscription};
+use crate::{connection::ConnectionId, Connection, Error, Subscription};
 use futures_util::StreamExt;
 use nostr_rs_client::{Error as ClientError, Pool};
 use nostr_rs_storage_base::Storage;
@@ -20,8 +20,7 @@ use tokio::{
     task::JoinHandle,
 };
 
-type SubId = u128;
-
+type SubId = ConnectionId;
 type Subscriptions = HashMap<SubId, (SubscriptionId, Sender<Response>)>;
 
 /// Relayer struct
@@ -45,11 +44,11 @@ pub struct Relayer<T: Storage + Send + Sync + 'static> {
     /// when it is translated in OR filters. It is designed this way to allow a
     /// fast iteration and match quickly filters.
     subscriptions: RwLock<HashMap<Subscription, RwLock<Subscriptions>>>,
-    clients: RwLock<HashMap<u128, Connection>>,
+    clients: RwLock<HashMap<ConnectionId, Connection>>,
     /// This Sender can be used to send requests from anywhere to the relayer.
-    send_to_relayer: Sender<(u128, Request)>,
+    send_to_relayer: Sender<(ConnectionId, Request)>,
     /// This Receiver is the relayer the way the relayer receives messages
-    relayer_receiver: Option<Receiver<(u128, Request)>>,
+    relayer_receiver: Option<Receiver<(ConnectionId, Request)>>,
     /// Client pool
     ///
     /// A relayer can optionally be connected to a pool of clients to get foreign events.
@@ -91,7 +90,7 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
     }
 
     /// Splits the relayer object and extract their receiver.
-    pub fn split(mut self) -> Result<(Self, Receiver<(u128, Request)>), Error> {
+    pub fn split(mut self) -> Result<(Self, Receiver<(ConnectionId, Request)>), Error> {
         let receiver = self.relayer_receiver.take().ok_or(Error::AlreadySplitted)?;
         Ok((self, receiver))
     }
@@ -111,7 +110,7 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
                     },
                     Some((conn_id, request)) = receiver.recv() => {
                         // receive messages from the connection pool
-                        if conn_id == 0 {
+                        if conn_id.is_empty() {
                             // connection pool
                             if let Request::Event(event) = request {
                                 if let Some(storage) = this.storage.as_ref() {
@@ -142,7 +141,7 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
 
     fn handle_client_pool(
         client_pool: Pool,
-        sender: Sender<(u128, Request)>,
+        sender: Sender<(ConnectionId, Request)>,
     ) -> Result<(Pool, JoinHandle<()>), ClientError> {
         let (mut receiver, client_pool) = client_pool.split()?;
 
@@ -151,7 +150,12 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
                 if let Some((response, _)) = receiver.recv().await {
                     match response {
                         Response::Event(event) => {
-                            let _ = sender.send((0, Request::Event(event.event.into()))).await;
+                            let _ = sender
+                                .send((
+                                    ConnectionId::new_empty(),
+                                    Request::Event(event.event.into()),
+                                ))
+                                .await;
                         }
                         x => {
                             println!("x => {:?}", x);
@@ -174,9 +178,9 @@ impl<T: Storage + Send + Sync + 'static> Relayer<T> {
     /// This function will spawn the client's loop to receive incoming messages and send those messages
     pub async fn add_connection(
         &self,
-        disconnection_notify: Option<mpsc::Sender<u128>>,
+        disconnection_notify: Option<mpsc::Sender<ConnectionId>>,
         stream: TcpStream,
-    ) -> Result<u128, Error> {
+    ) -> Result<ConnectionId, Error> {
         let client =
             Connection::new_connection(self.send_to_relayer.clone(), disconnection_notify, stream)
                 .await?;

+ 119 - 0
crates/relayer/src/subscription/filter.rs

@@ -0,0 +1,119 @@
+use nostr_rs_types::types::{Event, Filter, Kind, Tag};
+use std::collections::BTreeSet;
+
+type SortedSet<T> = Option<BTreeSet<T>>;
+
+/// Sorted filter
+///
+/// This is a pre-processed filter that is optimized for fast lookups.
+#[derive(Debug, Clone)]
+pub struct SortedFilter {
+    ids: SortedSet<Vec<u8>>,
+    authors: SortedSet<Vec<u8>>,
+    kinds: SortedSet<Kind>,
+    references_to_event: SortedSet<Vec<u8>>,
+    references_to_public_key: SortedSet<Vec<u8>>,
+}
+
+impl From<Filter> for SortedFilter {
+    /// Converts a Filter into a SortedFilter.
+    ///
+    /// SortedFilters have SortedSets that are optimized for fast lookups.
+    fn from(filter: Filter) -> Self {
+        Self {
+            ids: (!filter.ids.is_empty()).then(|| {
+                filter
+                    .ids
+                    .into_iter()
+                    .map(|x| x.as_ref().to_vec())
+                    .collect()
+            }),
+            authors: (!filter.authors.is_empty()).then(|| {
+                filter
+                    .authors
+                    .into_iter()
+                    .map(|x| x.as_ref().to_vec())
+                    .collect()
+            }),
+            kinds: (!filter.kinds.is_empty()).then(|| filter.kinds.into_iter().collect()),
+            references_to_event: (!filter.references_to_event.is_empty()).then(|| {
+                filter
+                    .references_to_event
+                    .into_iter()
+                    .map(|x| x.as_ref().to_vec())
+                    .collect()
+            }),
+            references_to_public_key: (!filter.references_to_public_key.is_empty()).then(|| {
+                filter
+                    .references_to_public_key
+                    .into_iter()
+                    .map(|x| x.as_ref().to_vec())
+                    .collect()
+            }),
+        }
+    }
+}
+
+impl SortedFilter {
+    /// Checks if a given key exists in the sorted set, either as a whole or as a partial match
+    #[inline]
+    fn has_key_or_partial_match<T: AsRef<[u8]>>(id: T, ids: &SortedSet<Vec<u8>>) -> bool {
+        if let Some(ids) = ids.as_ref() {
+            let id = id.as_ref().to_vec();
+            if ids.contains(&id) {
+                return true;
+            }
+
+            for len in 4..=id.len() {
+                let prev_id = &id[..len].to_vec();
+                if ids.contains(prev_id) {
+                    return true;
+                }
+            }
+            false
+        } else {
+            true
+        }
+    }
+
+    /// Checks if any of key given set exists in the sorted set, either as a whole or as a partial match
+    #[inline]
+    fn has_any_key_or_partial_match<T: Iterator<Item = Vec<u8>>>(
+        id_set: T,
+        ids: &SortedSet<Vec<u8>>,
+    ) -> bool {
+        if ids.is_some() {
+            for id in id_set {
+                if Self::has_key_or_partial_match(id, ids) {
+                    return true;
+                }
+            }
+            false
+        } else {
+            true
+        }
+    }
+
+    /// Checks if the event matches the filter
+    pub fn check(&self, event: &Event) -> bool {
+        self.kinds
+            .as_ref()
+            .map_or(true, |kinds| kinds.contains(&event.kind()))
+            && Self::has_key_or_partial_match(&event.id, &self.ids)
+            && Self::has_key_or_partial_match(&event.author(), &self.authors)
+            && Self::has_any_key_or_partial_match(
+                event.tags().iter().filter_map(|f| match f {
+                    Tag::Event(x) => Some(x.id.as_ref().to_vec()),
+                    _ => None,
+                }),
+                &self.references_to_event,
+            )
+            && Self::has_any_key_or_partial_match(
+                event.tags().iter().filter_map(|f| match f {
+                    Tag::PubKey(x) => Some(x.id.as_ref().to_vec()),
+                    _ => None,
+                }),
+                &self.references_to_public_key,
+            )
+    }
+}

+ 8 - 4
crates/relayer/src/subscription/manager.rs

@@ -1,4 +1,7 @@
-use crate::{connection::ConnectionId, subscription::Key};
+use crate::{
+    connection::ConnectionId,
+    subscription::{Key, Subscription},
+};
 use futures::executor::block_on;
 use nostr_rs_types::{
     relayer,
@@ -19,10 +22,13 @@ pub struct SubscriptionForConnection {
     conn_id: ConnectionId,
     name: SubscriptionId,
     keys: Vec<Key>,
+    subscribe: Subscription,
     manager: Arc<SubscriptionManager>,
 }
 
 impl Drop for SubscriptionForConnection {
+    /// When the subscription is dropped, it will remove the listener from the
+    /// subscription manager
     fn drop(&mut self) {
         let keys = self
             .keys
@@ -75,11 +81,9 @@ impl SubscriptionManager {
             let subscriptions = subscriptions.read().await;
             let subs = Self::get_keys_from_event(&event);
 
-            let empty_sub_id = SubscriptionId::default();
-
             for sub in subs {
                 for ((sub_type, _, sub_id), sender) in
-                    subscriptions.range(&(sub.clone(), ConnectionId(0), empty_sub_id.clone())..)
+                    subscriptions.range(&(sub.clone(), ConnectionId(0), SubscriptionId::empty())..)
                 {
                     if sub_type != &sub {
                         break;

+ 1 - 0
crates/relayer/src/subscription/mod.rs

@@ -1,5 +1,6 @@
 use nostr_rs_types::types::{Event, Filter, Tag};
 
+mod filter;
 mod manager;
 
 /// The subscription keys are used to quickly identify the subscriptions

+ 2 - 2
crates/types/src/types/addr.rs

@@ -39,7 +39,7 @@ pub enum Error {
 /// Human Readable Part
 ///
 /// Which HDR has been used to encode this address with Bech32
-#[derive(Debug, Clone, PartialEq, Eq, Copy)]
+#[derive(Debug, Clone, Ord, PartialOrd, PartialEq, Eq, Copy)]
 pub enum HumanReadablePart {
     /// Public Key / Account Address
     NPub,
@@ -67,7 +67,7 @@ impl Display for HumanReadablePart {
 ///
 /// Clients may want to use the Bech32 encoded address *but* the protocol only
 /// cares about hex-encoded binary data.
-#[derive(Debug, Default, Clone, Eq)]
+#[derive(Debug, Default, Ord, PartialOrd, Clone, Eq)]
 pub struct Addr {
     /// Bytes (up to 32 bytes)
     pub bytes: Vec<u8>,

+ 7 - 0
crates/types/src/types/subscription_id.rs

@@ -26,6 +26,13 @@ pub enum Error {
 #[derive(Debug, Ord, PartialOrd, Clone, Hash, PartialEq, Eq)]
 pub struct SubscriptionId(String);
 
+impl SubscriptionId {
+    /// Create a new subscription ID
+    pub fn empty() -> Self {
+        Self("".to_owned())
+    }
+}
+
 impl Deref for SubscriptionId {
     type Target = str;