//! Creates a worker thread and exposes a sender to communidate with use async_trait::async_trait; use chrono::Utc; use parking_lot::RwLock; use std::{ ops::Deref, sync::{atomic::AtomicBool, Arc}, time::Duration, }; use tokio::{ sync::mpsc::{channel, error::TrySendError, Sender}, time::sleep, }; /// Time to awake the main thread to check if the parent struct is still in memory if it was dropped /// already. If it was dropped the main thread has to be stopped. const CHECK_WORKER_IN_SCOPE_MS: u64 = 50; /// The maximum size for buffering messages const WORKER_BUFFER_SIZE: usize = 1_000; /// The maximum time to be idle waiting for requests to be processed. After this is reached the main /// loop is stopped. const MAXIMUM_IDLE_TIME_SEC: i64 = 60; /// Worker trait /// /// The worker trait has the definition of the code the worker has to perform in a different thread #[async_trait] pub trait Worker: Send + Sync { type Payload: Send + Sync + Clone; /// Method to be executed with a given task async fn handler(&self, payload: Self::Payload); /// Whether or not to process the request fn process_request(&self) -> bool { true } } /// Worker manager /// /// The worker manager manages the instances of the Worker trait, which is executed asynchronously /// in a separate thread from the send() context. /// /// The logic of having one or more instances of the Worker trait is abstracted in this structure. #[derive(Debug)] pub struct WorkerManager { sender: RwLock>>, is_running: Arc, worker: Arc, } impl Drop for WorkerManager { fn drop(&mut self) { self.is_running .store(false, std::sync::atomic::Ordering::Release); } } impl Deref for WorkerManager { type Target = Arc; fn deref(&self) -> &Self::Target { &self.worker } } impl WorkerManager { /// Creates a new WorkerManager given a struct that implements the Worker trait pub fn new(worker: W) -> Self { Self { sender: RwLock::new(None), is_running: Arc::new(true.into()), worker: Arc::new(worker), } } fn start_background_worker(&self) -> Sender { let (sender, mut receiver) = channel(WORKER_BUFFER_SIZE); let worker_for_thread = self.worker.clone(); let worker_in_scope = self.is_running.clone(); tokio::spawn(async move { let mut last_time = Utc::now(); loop { tokio::select! { Some(message) = receiver.recv() => { worker_for_thread.handler(message).await; last_time = Utc::now(); } _ = sleep(Duration::from_millis(CHECK_WORKER_IN_SCOPE_MS)) => {} } if !worker_in_scope.load(std::sync::atomic::Ordering::Acquire) || (last_time - Utc::now()).num_seconds() > MAXIMUM_IDLE_TIME_SEC { break; } } }); sender } /// Sends a message to be processed in another thread pub fn process(&self, message: W::Payload) { if self.worker.process_request() { let sender = self.sender.read(); match sender .as_ref() .map(|sender| sender.try_send(message.clone())) { None | Some(Err(TrySendError::Closed(_))) => { drop(sender); let mut sender = self.sender.write(); if let Some(sender) = sender.as_ref() { // Check if another faster thread did not set sender to Some already let _ = sender.try_send(message.clone()); } else { // Either there is no running worker thread and it is closed already, and // this thread is fastest and therefore will start the background worker and // will set the sender to Some let new_worker = self.start_background_worker(); let _ = new_worker.try_send(message); *sender = Some(new_worker); } } _ => {} } } } }