//! Creates a worker thread and exposes a sender to communidate with use async_trait::async_trait; use std::{ops::Deref, sync::Arc}; use tokio::{ sync::mpsc::{channel, Sender}, task::JoinHandle, }; /// The maximum size for buffering messages const WORKER_BUFFER_SIZE: usize = 1_000; /// Worker trait /// /// The worker trait has the definition of the code the worker has to perform in a different thread #[async_trait] pub trait Worker: Send + Sync { type Payload: Send + Sync + Clone; /// Method to be executed with a given task async fn handle(&self, payload: Self::Payload); /// Whether or not to process the request fn process_request(&self) -> bool { true } } /// Worker manager /// /// The worker manager manages the instances of the Worker trait, which is executed asynchronously /// in a separate thread from the send() context. /// /// The logic of having one or more instances of the Worker trait is abstracted in this structure. #[derive(Debug)] pub struct WorkerManager { sender: Sender, worker_thread: JoinHandle<()>, worker: Arc, } impl Drop for WorkerManager { fn drop(&mut self) { self.worker_thread.abort(); } } impl Deref for WorkerManager { type Target = Arc; fn deref(&self) -> &Self::Target { &self.worker } } impl WorkerManager { /// Creates a new WorkerManager given a struct that implements the Worker trait pub fn new(worker: W) -> Self { let worker = Arc::new(worker); let (worker_thread, sender) = WorkerManager::start_background_worker(worker.clone()); Self { sender, worker_thread, worker, } } fn start_background_worker(worker: Arc) -> (JoinHandle<()>, Sender) { let (sender, mut receiver) = channel(WORKER_BUFFER_SIZE); ( tokio::spawn(async move { while let Some(message) = receiver.recv().await { worker.handle(message).await; } }), sender, ) } /// Sends a message to be processed in another thread pub fn process(&self, message: W::Payload) { if self.worker.process_request() { let _ = self.sender.try_send(message); } } }