12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485 |
- //! Creates a worker thread and exposes a sender to communidate with
- use async_trait::async_trait;
- use std::{ops::Deref, sync::Arc};
- use tokio::{
- sync::mpsc::{channel, Sender},
- task::JoinHandle,
- };
- /// The maximum size for buffering messages
- const WORKER_BUFFER_SIZE: usize = 1_000;
- /// Worker trait
- ///
- /// The worker trait has the definition of the code the worker has to perform in a different thread
- #[async_trait]
- pub trait Worker: Send + Sync {
- type Payload: Send + Sync + Clone;
- /// Method to be executed with a given task
- async fn handle(&self, payload: Self::Payload);
- /// Whether or not to process the request
- fn process_request(&self) -> bool {
- true
- }
- }
- /// Worker manager
- ///
- /// The worker manager manages the instances of the Worker trait, which is executed asynchronously
- /// in a separate thread from the send() context.
- ///
- /// The logic of having one or more instances of the Worker trait is abstracted in this structure.
- #[derive(Debug)]
- pub struct WorkerManager<W: Worker> {
- sender: Sender<W::Payload>,
- worker_thread: JoinHandle<()>,
- worker: Arc<W>,
- }
- impl<W: Worker> Drop for WorkerManager<W> {
- fn drop(&mut self) {
- self.worker_thread.abort();
- }
- }
- impl<W: Worker> Deref for WorkerManager<W> {
- type Target = Arc<W>;
- fn deref(&self) -> &Self::Target {
- &self.worker
- }
- }
- impl<W: Worker + 'static> WorkerManager<W> {
- /// Creates a new WorkerManager given a struct that implements the Worker trait
- pub fn new(worker: W) -> Self {
- let worker = Arc::new(worker);
- let (worker_thread, sender) = WorkerManager::start_background_worker(worker.clone());
- Self {
- sender,
- worker_thread,
- worker,
- }
- }
- fn start_background_worker(worker: Arc<W>) -> (JoinHandle<()>, Sender<W::Payload>) {
- let (sender, mut receiver) = channel(WORKER_BUFFER_SIZE);
- (
- tokio::spawn(async move {
- while let Some(message) = receiver.recv().await {
- worker.handle(message).await;
- }
- }),
- sender,
- )
- }
- /// Sends a message to be processed in another thread
- pub fn process(&self, message: W::Payload) {
- if self.worker.process_request() {
- let _ = self.sender.try_send(message);
- }
- }
- }
|