worker.rs 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. //! Creates a worker thread and exposes a sender to communidate with
  2. use async_trait::async_trait;
  3. use chrono::Utc;
  4. use parking_lot::RwLock;
  5. use std::{
  6. ops::Deref,
  7. sync::{atomic::AtomicBool, Arc},
  8. time::Duration,
  9. };
  10. use tokio::{
  11. sync::mpsc::{channel, error::TrySendError, Sender},
  12. time::sleep,
  13. };
  14. /// Time to awake the main thread to check if the parent struct is still in memory if it was dropped
  15. /// already. If it was dropped the main thread has to be stopped.
  16. const CHECK_WORKER_IN_SCOPE_MS: u64 = 50;
  17. /// The maximum size for buffering messages
  18. const WORKER_BUFFER_SIZE: usize = 1_000;
  19. /// The maximum time to be idle waiting for requests to be processed. After this is reached the main
  20. /// loop is stopped.
  21. const MAXIMUM_IDLE_TIME_SEC: i64 = 60;
  22. /// Worker trait
  23. ///
  24. /// The worker trait has the definition of the code the worker has to perform in a different thread
  25. #[async_trait]
  26. pub trait Worker: Send + Sync {
  27. type Payload: Send + Sync + Clone;
  28. /// Method to be executed with a given task
  29. async fn handler(&self, payload: Self::Payload);
  30. /// Whether or not to process the request
  31. fn process_request(&self) -> bool {
  32. true
  33. }
  34. }
  35. /// Worker manager
  36. ///
  37. /// The worker manager manages the instances of the Worker trait, which is executed asynchronously
  38. /// in a separate thread from the send() context.
  39. ///
  40. /// The logic of having one or more instances of the Worker trait is abstracted in this structure.
  41. #[derive(Debug)]
  42. pub struct WorkerManager<W: Worker> {
  43. sender: RwLock<Option<Sender<W::Payload>>>,
  44. is_running: Arc<AtomicBool>,
  45. worker: Arc<W>,
  46. }
  47. impl<W: Worker> Drop for WorkerManager<W> {
  48. fn drop(&mut self) {
  49. self.is_running
  50. .store(false, std::sync::atomic::Ordering::Release);
  51. }
  52. }
  53. impl<W: Worker> Deref for WorkerManager<W> {
  54. type Target = Arc<W>;
  55. fn deref(&self) -> &Self::Target {
  56. &self.worker
  57. }
  58. }
  59. impl<W: Worker + 'static> WorkerManager<W> {
  60. /// Creates a new WorkerManager given a struct that implements the Worker trait
  61. pub fn new(worker: W) -> Self {
  62. Self {
  63. sender: RwLock::new(None),
  64. is_running: Arc::new(true.into()),
  65. worker: Arc::new(worker),
  66. }
  67. }
  68. fn start_background_worker(&self) -> Sender<W::Payload> {
  69. let (sender, mut receiver) = channel(WORKER_BUFFER_SIZE);
  70. let worker_for_thread = self.worker.clone();
  71. let worker_in_scope = self.is_running.clone();
  72. tokio::spawn(async move {
  73. let mut last_time = Utc::now();
  74. loop {
  75. tokio::select! {
  76. Some(message) = receiver.recv() => {
  77. worker_for_thread.handler(message).await;
  78. last_time = Utc::now();
  79. }
  80. _ = sleep(Duration::from_millis(CHECK_WORKER_IN_SCOPE_MS)) => {}
  81. }
  82. if !worker_in_scope.load(std::sync::atomic::Ordering::Acquire)
  83. || (last_time - Utc::now()).num_seconds() > MAXIMUM_IDLE_TIME_SEC
  84. {
  85. break;
  86. }
  87. }
  88. });
  89. sender
  90. }
  91. /// Sends a message to be processed in another thread
  92. pub fn process(&self, message: W::Payload) {
  93. if self.worker.process_request() {
  94. let sender = self.sender.read();
  95. match sender
  96. .as_ref()
  97. .map(|sender| sender.try_send(message.clone()))
  98. {
  99. None | Some(Err(TrySendError::Closed(_))) => {
  100. drop(sender);
  101. let mut sender = self.sender.write();
  102. if let Some(sender) = sender.as_ref() {
  103. // Check if another faster thread did not set sender to Some already
  104. let _ = sender.try_send(message.clone());
  105. } else {
  106. // Either there is no running worker thread and it is closed already, and
  107. // this thread is fastest and therefore will start the background worker and
  108. // will set the sender to Some
  109. let new_worker = self.start_background_worker();
  110. let _ = new_worker.try_send(message);
  111. *sender = Some(new_worker);
  112. }
  113. }
  114. _ => {}
  115. }
  116. }
  117. }
  118. }