worker.rs 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. //! Creates a worker thread and exposes a sender to communidate with
  2. use async_trait::async_trait;
  3. use std::{ops::Deref, sync::Arc};
  4. use tokio::{
  5. sync::mpsc::{channel, Sender},
  6. task::JoinHandle,
  7. };
  8. /// The maximum size for buffering messages
  9. const WORKER_BUFFER_SIZE: usize = 1_000;
  10. /// Worker trait
  11. ///
  12. /// The worker trait has the definition of the code the worker has to perform in a different thread
  13. #[async_trait]
  14. pub trait Worker: Send + Sync {
  15. type Payload: Send + Sync + Clone;
  16. /// Method to be executed with a given task
  17. async fn handle(&self, payload: Self::Payload);
  18. /// Whether or not to process the request
  19. fn process_request(&self) -> bool {
  20. true
  21. }
  22. }
  23. /// Worker manager
  24. ///
  25. /// The worker manager manages the instances of the Worker trait, which is executed asynchronously
  26. /// in a separate thread from the send() context.
  27. ///
  28. /// The logic of having one or more instances of the Worker trait is abstracted in this structure.
  29. #[derive(Debug)]
  30. pub struct WorkerManager<W: Worker> {
  31. sender: Sender<W::Payload>,
  32. worker_thread: JoinHandle<()>,
  33. worker: Arc<W>,
  34. }
  35. impl<W: Worker> Drop for WorkerManager<W> {
  36. fn drop(&mut self) {
  37. self.worker_thread.abort();
  38. }
  39. }
  40. impl<W: Worker> Deref for WorkerManager<W> {
  41. type Target = Arc<W>;
  42. fn deref(&self) -> &Self::Target {
  43. &self.worker
  44. }
  45. }
  46. impl<W: Worker + 'static> WorkerManager<W> {
  47. /// Creates a new WorkerManager given a struct that implements the Worker trait
  48. pub fn new(worker: W) -> Self {
  49. let worker = Arc::new(worker);
  50. let (worker_thread, sender) = WorkerManager::start_background_worker(worker.clone());
  51. Self {
  52. sender,
  53. worker_thread,
  54. worker,
  55. }
  56. }
  57. fn start_background_worker(worker: Arc<W>) -> (JoinHandle<()>, Sender<W::Payload>) {
  58. let (sender, mut receiver) = channel(WORKER_BUFFER_SIZE);
  59. (
  60. tokio::spawn(async move {
  61. while let Some(message) = receiver.recv().await {
  62. worker.handle(message).await;
  63. }
  64. }),
  65. sender,
  66. )
  67. }
  68. /// Sends a message to be processed in another thread
  69. pub fn process(&self, message: W::Payload) {
  70. if self.worker.process_request() {
  71. let _ = self.sender.try_send(message);
  72. }
  73. }
  74. }