| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596 | //! Creates a worker thread and exposes a sender to communidate withuse async_trait::async_trait;use std::{    ops::Deref,    sync::{atomic::AtomicBool, Arc},    time::Duration,};use tokio::{    sync::mpsc::{channel, Sender},    time::sleep,};const CHECK_WORKER_IN_SCOPE_MS: u64 = 50;const WORKER_BUFFER_SIZE: usize = 1_000;/// Worker trait////// The worker trait has the definition of the code the worker has to perform in a different thread#[async_trait]pub trait Worker: Send + Sync {    type Payload: Send + Sync;    /// Method to be executed with a given task    async fn handler(&self, payload: Self::Payload);}/// Worker manager////// The worker manager manages the instances of the Worker trait, which is executed asynchronously/// in a separate thread from the send() context.////// The logic of having one or more instances of the Worker trait is abstracted in this structure.////// TODO:///  * Boot the spawn thread on demand (right now it is at instantiation time)///  * Respawn on  the worker thread need (while the Worker is in memory / not dropped)///  * Kill off the worker thread after a long time without any activity. This will reboot (step 1)///    when needed#[derive(Debug)]pub struct WorkerManager<W: Worker> {    sender: Sender<W::Payload>,    is_running: Arc<AtomicBool>,    worker: Arc<W>,}impl<W: Worker> Drop for WorkerManager<W> {    fn drop(&mut self) {        self.is_running            .store(false, std::sync::atomic::Ordering::Release);    }}impl<W: Worker> Deref for WorkerManager<W> {    type Target = Arc<W>;    fn deref(&self) -> &Self::Target {        &self.worker    }}impl<W: Worker + 'static> WorkerManager<W> {    /// Creates a new WorkerManager given a struct that implements the Worker trait    pub fn new(worker: W) -> Self {        let (sender, mut receiver) = channel(WORKER_BUFFER_SIZE);        let worker = Arc::new(worker);        let is_running = Arc::new(AtomicBool::new(true));        let worker_for_thread = worker.clone();        let worker_in_scope = is_running.clone();        tokio::spawn(async move {            loop {                tokio::select! {                    Some(message) = receiver.recv() => {                        worker_for_thread.handler(message).await                    }                    _ = sleep(Duration::from_millis(CHECK_WORKER_IN_SCOPE_MS))  => {}                }                if !worker_in_scope.load(std::sync::atomic::Ordering::Acquire) {                    break;                }            }        });        Self {            sender,            is_running,            worker,        }    }    /// Sends a message to be processed in another thread    pub fn send(&self, message: W::Payload) {        self.sender.try_send(message).expect("foo");    }}
 |