Эх сурвалжийг харах

Improved the WorkerManager

 * [x] Boot the spawn thread on demand (right now it is at instantiation time)
 * [x] Respawn on  the worker thread need (while the Worker is in memory / not dropped)
 * [x] Kill off the worker thread after a long time without any activity. This will reboot (step 1) when needed
Cesar Rodas 10 сар өмнө
parent
commit
1362ae6386
1 өөрчлөгдсөн 43 нэмэгдсэн , 23 устгасан
  1. 43 23
      utxo/src/worker.rs

+ 43 - 23
utxo/src/worker.rs

@@ -1,24 +1,34 @@
 //! Creates a worker thread and exposes a sender to communidate with
 use async_trait::async_trait;
+use chrono::Utc;
 use std::{
     ops::Deref,
     sync::{atomic::AtomicBool, Arc},
     time::Duration,
 };
 use tokio::{
-    sync::mpsc::{channel, Sender},
+    sync::{
+        mpsc::{channel, error::TrySendError, Sender},
+        Mutex,
+    },
     time::sleep,
 };
 
+/// Time to awake the main thread to check if the parent struct is still in memory if it was dropped
+/// already. If it was dropped the main thread has to be stopped.
 const CHECK_WORKER_IN_SCOPE_MS: u64 = 50;
+/// The maximum size for buffering messages
 const WORKER_BUFFER_SIZE: usize = 1_000;
+/// The maximum time to be idle waiting for requests to be processed. After this is reached the main
+/// loop is stopped.
+const MAXIMUM_IDLE_TIME_SEC: i64 = 60;
 
 /// Worker trait
 ///
 /// The worker trait has the definition of the code the worker has to perform in a different thread
 #[async_trait]
 pub trait Worker: Send + Sync {
-    type Payload: Send + Sync;
+    type Payload: Send + Sync + Clone;
 
     /// Method to be executed with a given task
     async fn handler(&self, payload: Self::Payload);
@@ -35,15 +45,9 @@ pub trait Worker: Send + Sync {
 /// in a separate thread from the send() context.
 ///
 /// The logic of having one or more instances of the Worker trait is abstracted in this structure.
-///
-/// TODO:
-///  * Boot the spawn thread on demand (right now it is at instantiation time)
-///  * Respawn on  the worker thread need (while the Worker is in memory / not dropped)
-///  * Kill off the worker thread after a long time without any activity. This will reboot (step 1)
-///    when needed
 #[derive(Debug)]
 pub struct WorkerManager<W: Worker> {
-    sender: Sender<W::Payload>,
+    sender: Mutex<Option<Sender<W::Payload>>>,
     is_running: Arc<AtomicBool>,
     worker: Arc<W>,
 }
@@ -66,38 +70,54 @@ impl<W: Worker> Deref for WorkerManager<W> {
 impl<W: Worker + 'static> WorkerManager<W> {
     /// Creates a new WorkerManager given a struct that implements the Worker trait
     pub fn new(worker: W) -> Self {
-        let (sender, mut receiver) = channel(WORKER_BUFFER_SIZE);
-        let worker = Arc::new(worker);
-        let is_running = Arc::new(AtomicBool::new(true));
+        Self {
+            sender: Mutex::new(None),
+            is_running: Arc::new(true.into()),
+            worker: Arc::new(worker),
+        }
+    }
 
-        let worker_for_thread = worker.clone();
-        let worker_in_scope = is_running.clone();
+    fn start_background_worker(&self) -> Sender<W::Payload> {
+        let (sender, mut receiver) = channel(WORKER_BUFFER_SIZE);
+        let worker_for_thread = self.worker.clone();
+        let worker_in_scope = self.is_running.clone();
         tokio::spawn(async move {
+            let mut last_time = Utc::now();
             loop {
                 tokio::select! {
                     Some(message) = receiver.recv() => {
-                        worker_for_thread.handler(message).await
+                        worker_for_thread.handler(message).await;
+                        last_time = Utc::now();
                     }
                     _ = sleep(Duration::from_millis(CHECK_WORKER_IN_SCOPE_MS))  => {}
                 }
 
-                if !worker_in_scope.load(std::sync::atomic::Ordering::Acquire) {
+                if !worker_in_scope.load(std::sync::atomic::Ordering::Acquire)
+                    || (last_time - Utc::now()).num_seconds() > MAXIMUM_IDLE_TIME_SEC
+                {
                     break;
                 }
             }
         });
-
-        Self {
-            sender,
-            is_running,
-            worker,
-        }
+        sender
     }
 
     /// Sends a message to be processed in another thread
     pub async fn process(&self, message: W::Payload) {
         if self.worker.process_request().await {
-            self.sender.try_send(message).expect("foo");
+            let mut sender = self.sender.lock().await;
+            match sender
+                .as_ref()
+                .map(|sender| sender.try_send(message.clone()))
+            {
+                None | Some(Err(TrySendError::Closed(_))) => {
+                    // Either there is no running worker thread and it is closed already
+                    let new_worker = self.start_background_worker();
+                    let _ = new_worker.try_send(message);
+                    *sender = Some(new_worker);
+                }
+                _ => {}
+            }
         }
     }
 }