//! Creates a worker thread and exposes a sender to communidate with use async_trait::async_trait; use std::{ ops::Deref, sync::{atomic::AtomicBool, Arc}, time::Duration, }; use tokio::{ sync::mpsc::{channel, Sender}, time::sleep, }; const CHECK_WORKER_IN_SCOPE_MS: u64 = 50; const WORKER_BUFFER_SIZE: usize = 1_000; /// Worker trait /// /// The worker trait has the definition of the code the worker has to perform in a different thread #[async_trait] pub trait Worker: Send + Sync { type Payload: Send + Sync; /// Method to be executed with a given task async fn handler(&self, payload: Self::Payload); } /// Worker manager /// /// The worker manager manages the instances of the Worker trait, which is executed asynchronously /// in a separate thread from the send() context. /// /// The logic of having one or more instances of the Worker trait is abstracted in this structure. /// /// TODO: /// * Boot the spawn thread on demand (right now it is at instantiation time) /// * Respawn on the worker thread need (while the Worker is in memory / not dropped) /// * Kill off the worker thread after a long time without any activity. This will reboot (step 1) /// when needed #[derive(Debug)] pub struct WorkerManager { sender: Sender, is_running: Arc, worker: Arc, } impl Drop for WorkerManager { fn drop(&mut self) { self.is_running .store(false, std::sync::atomic::Ordering::Release); } } impl Deref for WorkerManager { type Target = Arc; fn deref(&self) -> &Self::Target { &self.worker } } impl WorkerManager { /// Creates a new WorkerManager given a struct that implements the Worker trait pub fn new(worker: W) -> Self { let (sender, mut receiver) = channel(WORKER_BUFFER_SIZE); let worker = Arc::new(worker); let is_running = Arc::new(AtomicBool::new(true)); let worker_for_thread = worker.clone(); let worker_in_scope = is_running.clone(); tokio::spawn(async move { loop { tokio::select! { Some(message) = receiver.recv() => { worker_for_thread.handler(message).await } _ = sleep(Duration::from_millis(CHECK_WORKER_IN_SCOPE_MS)) => {} } if !worker_in_scope.load(std::sync::atomic::Ordering::Acquire) { break; } } }); Self { sender, is_running, worker, } } /// Sends a message to be processed in another thread pub fn send(&self, message: W::Payload) { self.sender.try_send(message).expect("foo"); } }