|
@@ -0,0 +1,80 @@
|
|
|
+use nostr_rs_storage::RocksDb;
|
|
|
+use nostr_rs_types::{client::Event, Request};
|
|
|
+use parking_lot::{RwLock, RwLockReadGuard};
|
|
|
+use std::collections::HashMap;
|
|
|
+use tokio::sync::mpsc::Sender;
|
|
|
+
|
|
|
+#[derive(Clone, Debug, Default, Eq, PartialEq, Hash)]
|
|
|
+pub struct SubscriptionType {
|
|
|
+ public_key: Option<Vec<u8>>,
|
|
|
+ id: Option<Vec<u8>>,
|
|
|
+ kind: Option<u32>,
|
|
|
+}
|
|
|
+
|
|
|
+type Subscriptions = HashMap<u128, Sender<Event>>;
|
|
|
+
|
|
|
+pub struct Relayer {
|
|
|
+ storage: RocksDb,
|
|
|
+ subscriptions: RwLock<HashMap<SubscriptionType, RwLock<Subscriptions>>>,
|
|
|
+}
|
|
|
+
|
|
|
+impl Relayer {
|
|
|
+ pub fn new(storage: RocksDb) -> Self {
|
|
|
+ Self {
|
|
|
+ storage,
|
|
|
+ subscriptions: RwLock::new(HashMap::new()),
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ pub async fn recv(&self, request: Request) {
|
|
|
+ match request {
|
|
|
+ Request::Event(event) => {
|
|
|
+ let _ = self.storage.store(&event);
|
|
|
+ self.broadcast(event).await;
|
|
|
+ }
|
|
|
+ Request::Request(subscribe) => {}
|
|
|
+ Request::Close(close) => {}
|
|
|
+ _ => {}
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ #[inline]
|
|
|
+ fn get_possible_listeners_from_event(event: &Event) -> Vec<SubscriptionType> {
|
|
|
+ let kind = event.kind().into();
|
|
|
+ let public_keys = [None, Some(event.author().as_ref().to_vec())];
|
|
|
+ let id = [None, Some(event.id.as_ref().to_vec())];
|
|
|
+ let kind = [None, Some(kind)];
|
|
|
+ let mut subs = vec![];
|
|
|
+
|
|
|
+ for public_key in public_keys.iter() {
|
|
|
+ for id in id.iter() {
|
|
|
+ for kind in kind.iter() {
|
|
|
+ subs.push(SubscriptionType {
|
|
|
+ public_key: public_key.clone(),
|
|
|
+ id: id.clone(),
|
|
|
+ kind: *kind,
|
|
|
+ });
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ subs
|
|
|
+ }
|
|
|
+
|
|
|
+ #[inline]
|
|
|
+ fn broadcast_to_subscribers(subscriptions: RwLockReadGuard<Subscriptions>, event: &Event) {
|
|
|
+ for (_, receiver) in subscriptions.iter() {
|
|
|
+ let _ = receiver.try_send(event.clone());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ async fn broadcast(&self, event: Event) {
|
|
|
+ let subscriptions = self.subscriptions.read();
|
|
|
+
|
|
|
+ for subscription_type in Self::get_possible_listeners_from_event(&event) {
|
|
|
+ if let Some(subscribers) = subscriptions.get(&subscription_type) {
|
|
|
+ Self::broadcast_to_subscribers(subscribers.read(), &event);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|