123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596 |
- //! Creates a worker thread and exposes a sender to communidate with
- use async_trait::async_trait;
- use std::{
- ops::Deref,
- sync::{atomic::AtomicBool, Arc},
- time::Duration,
- };
- use tokio::{
- sync::mpsc::{channel, Sender},
- time::sleep,
- };
- const CHECK_WORKER_IN_SCOPE_MS: u64 = 50;
- const WORKER_BUFFER_SIZE: usize = 1_000;
- /// Worker trait
- ///
- /// The worker trait has the definition of the code the worker has to perform in a different thread
- #[async_trait]
- pub trait Worker: Send + Sync {
- type Payload: Send + Sync;
- /// Method to be executed with a given task
- async fn handler(&self, payload: Self::Payload);
- }
- /// Worker manager
- ///
- /// The worker manager manages the instances of the Worker trait, which is executed asynchronously
- /// in a separate thread from the send() context.
- ///
- /// The logic of having one or more instances of the Worker trait is abstracted in this structure.
- ///
- /// TODO:
- /// * Boot the spawn thread on demand (right now it is at instantiation time)
- /// * Respawn on the worker thread need (while the Worker is in memory / not dropped)
- /// * Kill off the worker thread after a long time without any activity. This will reboot (step 1)
- /// when needed
- #[derive(Debug)]
- pub struct WorkerManager<W: Worker> {
- sender: Sender<W::Payload>,
- is_running: Arc<AtomicBool>,
- worker: Arc<W>,
- }
- impl<W: Worker> Drop for WorkerManager<W> {
- fn drop(&mut self) {
- self.is_running
- .store(false, std::sync::atomic::Ordering::Release);
- }
- }
- impl<W: Worker> Deref for WorkerManager<W> {
- type Target = Arc<W>;
- fn deref(&self) -> &Self::Target {
- &self.worker
- }
- }
- impl<W: Worker + 'static> WorkerManager<W> {
- /// Creates a new WorkerManager given a struct that implements the Worker trait
- pub fn new(worker: W) -> Self {
- let (sender, mut receiver) = channel(WORKER_BUFFER_SIZE);
- let worker = Arc::new(worker);
- let is_running = Arc::new(AtomicBool::new(true));
- let worker_for_thread = worker.clone();
- let worker_in_scope = is_running.clone();
- tokio::spawn(async move {
- loop {
- tokio::select! {
- Some(message) = receiver.recv() => {
- worker_for_thread.handler(message).await
- }
- _ = sleep(Duration::from_millis(CHECK_WORKER_IN_SCOPE_MS)) => {}
- }
- if !worker_in_scope.load(std::sync::atomic::Ordering::Acquire) {
- break;
- }
- }
- });
- Self {
- sender,
- is_running,
- worker,
- }
- }
- /// Sends a message to be processed in another thread
- pub fn send(&self, message: W::Payload) {
- self.sender.try_send(message).expect("foo");
- }
- }
|