Переглянути джерело

Improve WorkerManager

Use the JoinHandle to abort instead of awaking the bg tasks every 50ms to kill
it for inactivity
Cesar Rodas 9 місяців тому
батько
коміт
6bec4af7c0
4 змінених файлів з 22 додано та 71 видалено
  1. 0 1
      Cargo.lock
  2. 0 1
      utxo/Cargo.toml
  3. 1 1
      utxo/src/broadcaster.rs
  4. 21 68
      utxo/src/worker.rs

+ 0 - 1
Cargo.lock

@@ -2466,7 +2466,6 @@ dependencies = [
  "chrono",
  "futures",
  "hmac",
- "parking_lot",
  "rand",
  "serde",
  "sha2",

+ 0 - 1
utxo/Cargo.toml

@@ -10,7 +10,6 @@ borsh = { version = "1.3.1", features = ["derive", "bytes", "de_strict_order"] }
 chrono = { version = "0.4.31", features = ["serde"] }
 futures = { version = "0.3.30", optional = true }
 hmac = "0.12.1"
-parking_lot = "0.12.2"
 rand = "0.8.5"
 serde = { version = "1.0.188", features = ["derive"] }
 sha2 = "0.10.7"

+ 1 - 1
utxo/src/broadcaster.rs

@@ -73,7 +73,7 @@ impl Worker for Broadcaster {
             .load(std::sync::atomic::Ordering::Acquire)
     }
 
-    async fn handler(&self, transaction: Self::Payload) {
+    async fn handle(&self, transaction: Self::Payload) {
         let listeners = self.subscriptions.read().await;
         let senders = self.subscribers.read().await;
 

+ 21 - 68
utxo/src/worker.rs

@@ -1,25 +1,13 @@
 //! Creates a worker thread and exposes a sender to communidate with
 use async_trait::async_trait;
-use chrono::Utc;
-use parking_lot::RwLock;
-use std::{
-    ops::Deref,
-    sync::{atomic::AtomicBool, Arc},
-    time::Duration,
-};
+use std::{ops::Deref, sync::Arc};
 use tokio::{
-    sync::mpsc::{channel, error::TrySendError, Sender},
-    time::sleep,
+    sync::mpsc::{channel, Sender},
+    task::JoinHandle,
 };
 
-/// Time to awake the main thread to check if the parent struct is still in memory if it was dropped
-/// already. If it was dropped the main thread has to be stopped.
-const CHECK_WORKER_IN_SCOPE_MS: u64 = 50;
 /// The maximum size for buffering messages
 const WORKER_BUFFER_SIZE: usize = 1_000;
-/// The maximum time to be idle waiting for requests to be processed. After this is reached the main
-/// loop is stopped.
-const MAXIMUM_IDLE_TIME_SEC: i64 = 60;
 
 /// Worker trait
 ///
@@ -29,7 +17,7 @@ pub trait Worker: Send + Sync {
     type Payload: Send + Sync + Clone;
 
     /// Method to be executed with a given task
-    async fn handler(&self, payload: Self::Payload);
+    async fn handle(&self, payload: Self::Payload);
 
     /// Whether or not to process the request
     fn process_request(&self) -> bool {
@@ -45,15 +33,14 @@ pub trait Worker: Send + Sync {
 /// The logic of having one or more instances of the Worker trait is abstracted in this structure.
 #[derive(Debug)]
 pub struct WorkerManager<W: Worker> {
-    sender: RwLock<Option<Sender<W::Payload>>>,
-    is_running: Arc<AtomicBool>,
+    sender: Sender<W::Payload>,
+    worker_thread: JoinHandle<()>,
     worker: Arc<W>,
 }
 
 impl<W: Worker> Drop for WorkerManager<W> {
     fn drop(&mut self) {
-        self.is_running
-            .store(false, std::sync::atomic::Ordering::Release);
+        self.worker_thread.abort();
     }
 }
 
@@ -68,65 +55,31 @@ impl<W: Worker> Deref for WorkerManager<W> {
 impl<W: Worker + 'static> WorkerManager<W> {
     /// Creates a new WorkerManager given a struct that implements the Worker trait
     pub fn new(worker: W) -> Self {
+        let worker = Arc::new(worker);
+        let (worker_thread, sender) = WorkerManager::start_background_worker(worker.clone());
         Self {
-            sender: RwLock::new(None),
-            is_running: Arc::new(true.into()),
-            worker: Arc::new(worker),
+            sender,
+            worker_thread,
+            worker,
         }
     }
 
-    fn start_background_worker(&self) -> Sender<W::Payload> {
+    fn start_background_worker(worker: Arc<W>) -> (JoinHandle<()>, Sender<W::Payload>) {
         let (sender, mut receiver) = channel(WORKER_BUFFER_SIZE);
-        let worker_for_thread = self.worker.clone();
-        let worker_in_scope = self.is_running.clone();
-        tokio::spawn(async move {
-            let mut last_time = Utc::now();
-            loop {
-                tokio::select! {
-                    Some(message) = receiver.recv() => {
-                        worker_for_thread.handler(message).await;
-                        last_time = Utc::now();
-                    }
-                    _ = sleep(Duration::from_millis(CHECK_WORKER_IN_SCOPE_MS))  => {}
-                }
-
-                if !worker_in_scope.load(std::sync::atomic::Ordering::Acquire)
-                    || (last_time - Utc::now()).num_seconds() > MAXIMUM_IDLE_TIME_SEC
-                {
-                    break;
+        (
+            tokio::spawn(async move {
+                while let Some(message) = receiver.recv().await {
+                    worker.handle(message).await;
                 }
-            }
-        });
-        sender
+            }),
+            sender,
+        )
     }
 
     /// Sends a message to be processed in another thread
     pub fn process(&self, message: W::Payload) {
         if self.worker.process_request() {
-            let sender = self.sender.read();
-            match sender
-                .as_ref()
-                .map(|sender| sender.try_send(message.clone()))
-            {
-                None | Some(Err(TrySendError::Closed(_))) => {
-                    drop(sender);
-
-                    let mut sender = self.sender.write();
-
-                    if let Some(sender) = sender.as_ref() {
-                        // Check if another faster thread did not set sender to Some already
-                        let _ = sender.try_send(message.clone());
-                    } else {
-                        // Either there is no running worker thread and it is closed already, and
-                        // this thread is fastest and therefore will start the background worker and
-                        // will set the sender to Some
-                        let new_worker = self.start_background_worker();
-                        let _ = new_worker.try_send(message);
-                        *sender = Some(new_worker);
-                    }
-                }
-                _ => {}
-            }
+            let _ = self.sender.try_send(message);
         }
     }
 }