123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132 |
- //! Creates a worker thread and exposes a sender to communidate with
- use async_trait::async_trait;
- use chrono::Utc;
- use parking_lot::RwLock;
- use std::{
- ops::Deref,
- sync::{atomic::AtomicBool, Arc},
- time::Duration,
- };
- use tokio::{
- sync::mpsc::{channel, error::TrySendError, Sender},
- time::sleep,
- };
- /// Time to awake the main thread to check if the parent struct is still in memory if it was dropped
- /// already. If it was dropped the main thread has to be stopped.
- const CHECK_WORKER_IN_SCOPE_MS: u64 = 50;
- /// The maximum size for buffering messages
- const WORKER_BUFFER_SIZE: usize = 1_000;
- /// The maximum time to be idle waiting for requests to be processed. After this is reached the main
- /// loop is stopped.
- const MAXIMUM_IDLE_TIME_SEC: i64 = 60;
- /// Worker trait
- ///
- /// The worker trait has the definition of the code the worker has to perform in a different thread
- #[async_trait]
- pub trait Worker: Send + Sync {
- type Payload: Send + Sync + Clone;
- /// Method to be executed with a given task
- async fn handler(&self, payload: Self::Payload);
- /// Whether or not to process the request
- fn process_request(&self) -> bool {
- true
- }
- }
- /// Worker manager
- ///
- /// The worker manager manages the instances of the Worker trait, which is executed asynchronously
- /// in a separate thread from the send() context.
- ///
- /// The logic of having one or more instances of the Worker trait is abstracted in this structure.
- #[derive(Debug)]
- pub struct WorkerManager<W: Worker> {
- sender: RwLock<Option<Sender<W::Payload>>>,
- is_running: Arc<AtomicBool>,
- worker: Arc<W>,
- }
- impl<W: Worker> Drop for WorkerManager<W> {
- fn drop(&mut self) {
- self.is_running
- .store(false, std::sync::atomic::Ordering::Release);
- }
- }
- impl<W: Worker> Deref for WorkerManager<W> {
- type Target = Arc<W>;
- fn deref(&self) -> &Self::Target {
- &self.worker
- }
- }
- impl<W: Worker + 'static> WorkerManager<W> {
- /// Creates a new WorkerManager given a struct that implements the Worker trait
- pub fn new(worker: W) -> Self {
- Self {
- sender: RwLock::new(None),
- is_running: Arc::new(true.into()),
- worker: Arc::new(worker),
- }
- }
- fn start_background_worker(&self) -> Sender<W::Payload> {
- let (sender, mut receiver) = channel(WORKER_BUFFER_SIZE);
- let worker_for_thread = self.worker.clone();
- let worker_in_scope = self.is_running.clone();
- tokio::spawn(async move {
- let mut last_time = Utc::now();
- loop {
- tokio::select! {
- Some(message) = receiver.recv() => {
- worker_for_thread.handler(message).await;
- last_time = Utc::now();
- }
- _ = sleep(Duration::from_millis(CHECK_WORKER_IN_SCOPE_MS)) => {}
- }
- if !worker_in_scope.load(std::sync::atomic::Ordering::Acquire)
- || (last_time - Utc::now()).num_seconds() > MAXIMUM_IDLE_TIME_SEC
- {
- break;
- }
- }
- });
- sender
- }
- /// Sends a message to be processed in another thread
- pub fn process(&self, message: W::Payload) {
- if self.worker.process_request() {
- let sender = self.sender.read();
- match sender
- .as_ref()
- .map(|sender| sender.try_send(message.clone()))
- {
- None | Some(Err(TrySendError::Closed(_))) => {
- drop(sender);
- let mut sender = self.sender.write();
- if let Some(sender) = sender.as_ref() {
- // Check if another faster thread did not set sender to Some already
- let _ = sender.try_send(message.clone());
- } else {
- // Either there is no running worker thread and it is closed already, and
- // this thread is fastest and therefore will start the background worker and
- // will set the sender to Some
- let new_worker = self.start_background_worker();
- let _ = new_worker.try_send(message);
- *sender = Some(new_worker);
- }
- }
- _ => {}
- }
- }
- }
- }
|