worker.rs 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. //! Creates a worker thread and exposes a sender to communidate with
  2. use async_trait::async_trait;
  3. use std::{
  4. ops::Deref,
  5. sync::{atomic::AtomicBool, Arc},
  6. time::Duration,
  7. };
  8. use tokio::{
  9. sync::mpsc::{channel, Sender},
  10. time::sleep,
  11. };
  12. const CHECK_WORKER_IN_SCOPE_MS: u64 = 50;
  13. const WORKER_BUFFER_SIZE: usize = 1_000;
  14. /// Worker trait
  15. ///
  16. /// The worker trait has the definition of the code the worker has to perform in a different thread
  17. #[async_trait]
  18. pub trait Worker: Send + Sync {
  19. type Payload: Send + Sync;
  20. /// Method to be executed with a given task
  21. async fn handler(&self, payload: Self::Payload);
  22. }
  23. /// Worker manager
  24. ///
  25. /// The worker manager manages the instances of the Worker trait, which is executed asynchronously
  26. /// in a separate thread from the send() context.
  27. ///
  28. /// The logic of having one or more instances of the Worker trait is abstracted in this structure.
  29. ///
  30. /// TODO:
  31. /// * Boot the spawn thread on demand (right now it is at instantiation time)
  32. /// * Respawn on the worker thread need (while the Worker is in memory / not dropped)
  33. /// * Kill off the worker thread after a long time without any activity. This will reboot (step 1)
  34. /// when needed
  35. #[derive(Debug)]
  36. pub struct WorkerManager<W: Worker> {
  37. sender: Sender<W::Payload>,
  38. is_running: Arc<AtomicBool>,
  39. worker: Arc<W>,
  40. }
  41. impl<W: Worker> Drop for WorkerManager<W> {
  42. fn drop(&mut self) {
  43. self.is_running
  44. .store(false, std::sync::atomic::Ordering::Release);
  45. }
  46. }
  47. impl<W: Worker> Deref for WorkerManager<W> {
  48. type Target = Arc<W>;
  49. fn deref(&self) -> &Self::Target {
  50. &self.worker
  51. }
  52. }
  53. impl<W: Worker + 'static> WorkerManager<W> {
  54. /// Creates a new WorkerManager given a struct that implements the Worker trait
  55. pub fn new(worker: W) -> Self {
  56. let (sender, mut receiver) = channel(WORKER_BUFFER_SIZE);
  57. let worker = Arc::new(worker);
  58. let is_running = Arc::new(AtomicBool::new(true));
  59. let worker_for_thread = worker.clone();
  60. let worker_in_scope = is_running.clone();
  61. tokio::spawn(async move {
  62. loop {
  63. tokio::select! {
  64. Some(message) = receiver.recv() => {
  65. worker_for_thread.handler(message).await
  66. }
  67. _ = sleep(Duration::from_millis(CHECK_WORKER_IN_SCOPE_MS)) => {}
  68. }
  69. if !worker_in_scope.load(std::sync::atomic::Ordering::Acquire) {
  70. break;
  71. }
  72. }
  73. });
  74. Self {
  75. sender,
  76. is_running,
  77. worker,
  78. }
  79. }
  80. /// Sends a message to be processed in another thread
  81. pub fn send(&self, message: W::Payload) {
  82. self.sender.try_send(message).expect("foo");
  83. }
  84. }