Răsfoiți Sursa

Made process() sync and use RwLock (from parking_lot) instead of a Mutex

Cesar Rodas 10 luni în urmă
părinte
comite
14e39a31d9
8 a modificat fișierele cu 583 adăugiri și 30 ștergeri
  1. 5 4
      Cargo.lock
  2. 11 10
      TODO.md
  3. 1 0
      src/main.rs
  4. 1 0
      utxo/Cargo.toml
  5. 540 0
      utxo/main.rs
  6. 1 1
      utxo/src/broadcaster.rs
  7. 1 1
      utxo/src/ledger.rs
  8. 23 14
      utxo/src/worker.rs

+ 5 - 4
Cargo.lock

@@ -1111,7 +1111,7 @@ checksum = "1d930c203dd0b6ff06e0201a4a2fe9149b43c684fd4420555b26d21b1a02956f"
 dependencies = [
  "futures-core",
  "lock_api",
- "parking_lot 0.12.1",
+ "parking_lot 0.12.2",
 ]
 
 [[package]]
@@ -1906,9 +1906,9 @@ dependencies = [
 
 [[package]]
 name = "parking_lot"
-version = "0.12.1"
+version = "0.12.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f"
+checksum = "7e4af0ca4f6caed20e900d564c242b8e5d4903fdacf31d3daf527b66fe6f42fb"
 dependencies = [
  "lock_api",
  "parking_lot_core 0.9.8",
@@ -3049,7 +3049,7 @@ dependencies = [
  "libc",
  "mio 0.8.8",
  "num_cpus",
- "parking_lot 0.12.1",
+ "parking_lot 0.12.2",
  "pin-project-lite 0.2.13",
  "signal-hook-registry",
  "socket2 0.5.4",
@@ -3263,6 +3263,7 @@ dependencies = [
  "borsh",
  "chrono",
  "futures",
+ "parking_lot 0.12.2",
  "rand 0.8.5",
  "serde",
  "sha2",

+ 11 - 10
TODO.md

@@ -1,10 +1,11 @@
-- [x] Optimize `select_inputs_from_accounts` to return a single change operation instead of a vector
-- [x] Write cache layer on top of the storage layer, specially if accounts are settled
-- [ ] Improve read performance with SQLite
-- [ ] Add a locking mechanism, to either a start a tx per account, or use the storage engine as a lock mechanism (to lock the utxos)
-- [ ] Add ability to query accounts in a point in time
-- [ ] Write other servers, other than the restful server
-- [ ] Add caching layer: This cache layer can built on top of the utxo::ledger, because all operations can be safely cached until a new transaction referencing their account is issued, by that point, all the caches related to anaccount can be evicted
-- [ ] Build admin interface
-- [ ] Add memo to changes. Build append only table with all movements as
-      inserts. Wraps the objects to all their changes
+- [x] Improve subscription API to detect when the client drops and stop listening
+- [ ] Improve the detection of drop-client reactive, instead of waiting until an attempted delivery fails.
+- [ ] Implement the filter by status
+- [ ] Implement the filter by since / until (this is implemented at the primary key level at the cursor)
+- [ ] Have more tests
+- [ ] Move the broadcasting of events to a different thread
+- [ ] Stop the working thread when the ledger goes out of scope
+
+Bonus
+
+- [ ] Improve the HashMap of Filter to use the filter much more efficiently with the primary key (similar to the cursor)

+ 1 - 0
src/main.rs

@@ -34,6 +34,7 @@ impl Deposit {
                 &self.account,
                 self.amount.try_into()?,
                 self.status,
+                vec![],
                 self.memo,
             )
             .await?;

+ 1 - 0
utxo/Cargo.toml

@@ -9,6 +9,7 @@ bech32 = "0.11.0"
 borsh = { version = "1.3.1", features = ["derive", "bytes", "de_strict_order"] }
 chrono = { version = "0.4.31", features = ["serde"] }
 futures = { version = "0.3.30", optional = true }
+parking_lot = "0.12.2"
 serde = { version = "1.0.188", features = ["derive"] }
 sha2 = "0.10.7"
 sqlx = { version = "0.7.1", features = [

+ 540 - 0
utxo/main.rs

@@ -0,0 +1,540 @@
+use futures::executor::block_on;
+use mlua::{Compiler, Lua, Table, Value};
+use std::{
+    collections::HashMap,
+    hash::Hash,
+    sync::{
+        atomic::{AtomicU16, AtomicUsize, Ordering},
+        Arc,
+    },
+};
+use tokio::{
+    sync::{mpsc, oneshot, Mutex, RwLock},
+    time::{timeout, Duration},
+};
+
+#[async_trait::async_trait]
+pub trait VarStorage: Send + Sync {
+    async fn get(&self, instance: usize, var: Variable) -> VarValue;
+
+    async fn set(&self, instance: usize, var: Variable, value: VarValue);
+
+    async fn shutdown(&self, instance: usize);
+}
+
+type Sender<I, R> = mpsc::Sender<(Vec<I>, oneshot::Sender<R>)>;
+type Receiver<I, R> = mpsc::Receiver<(Vec<I>, oneshot::Sender<R>)>;
+
+#[derive(Debug)]
+pub struct Program<X>
+where
+    X: VarStorage + 'static,
+{
+    opcodes: Vec<u8>,
+    instances: Arc<AtomicUsize>,
+    running: Arc<AtomicU16>,
+    execution_id: Arc<AtomicUsize>,
+    storage: Arc<X>,
+    sender: Sender<VarValue, VarValue>,
+    receiver: Arc<Mutex<Receiver<VarValue, VarValue>>>,
+}
+
+#[derive(Debug, Clone)]
+pub enum VarValue {
+    /// The Lua value `nil`.
+    Nil,
+    /// The Lua value `true` or `false`.
+    Boolean(bool),
+    /// Integer number
+    Integer(i128),
+    /// A floating point number.
+    Number(f64),
+    /// String
+    String(String),
+    /// A vector
+    Vector(Vec<VarValue>),
+    /// A
+    HashMap(HashMap<String, VarValue>),
+    /// An error
+    ErrorType(String),
+}
+
+pub enum Variable {
+    Balances,
+    Accounts,
+    Transactions,
+    Payments,
+    Other(String),
+}
+
+impl From<String> for Variable {
+    fn from(s: String) -> Self {
+        match s.as_str() {
+            "balances" => Self::Balances,
+            "accounts" => Self::Accounts,
+            "transactions" => Self::Transactions,
+            "payments" => Self::Payments,
+            _ => Self::Other(s),
+        }
+    }
+}
+
+impl Variable {
+    pub fn name<'a>(&'a self) -> &'a str {
+        match self {
+            Self::Balances => "balances",
+            Self::Accounts => "accounts",
+            Self::Transactions => "transactions",
+            Self::Payments => "payments",
+            Self::Other(s) => s,
+        }
+    }
+}
+
+impl<X> Program<X>
+where
+    X: VarStorage + 'static,
+{
+    pub fn new(opcodes: Vec<u8>, storage: Arc<X>) -> Program<X> {
+        let (sender, receiver) = mpsc::channel(100);
+        Self {
+            storage,
+            instances: Arc::new(AtomicUsize::new(0)),
+            execution_id: Arc::new(AtomicUsize::new(0)),
+            running: Arc::new(AtomicU16::new(0)),
+            receiver: Arc::new(Mutex::new(receiver)),
+            opcodes,
+            sender,
+        }
+    }
+
+    fn var_value_to_lua_val(lua: &Lua, value: VarValue) -> mlua::Result<Value> {
+        match value {
+            VarValue::Nil => Ok(Value::Nil),
+            VarValue::Boolean(b) => Ok(Value::Boolean(b)),
+            VarValue::Integer(i) => Ok(Value::Integer(i.try_into().unwrap())),
+            VarValue::Number(n) => Ok(Value::Number(n)),
+            VarValue::String(s) => Ok(Value::String(lua.create_string(&s)?)),
+            VarValue::HashMap(map) => {
+                let table = lua.create_table()?;
+                for (k, v) in map {
+                    table.set(k, Self::var_value_to_lua_val(lua, v)?)?;
+                }
+                Ok(Value::Table(table))
+            }
+            VarValue::ErrorType(e) => Err(mlua::Error::RuntimeError(e.to_string())),
+            _ => Err(mlua::Error::RuntimeError("Invalid type".into())),
+        }
+    }
+
+    fn inject_dynamic_global_state(
+        lua: &Lua,
+        storage: Arc<X>,
+        instance: usize,
+    ) -> mlua::Result<Option<Table>> {
+        lua.set_app_data(storage);
+
+        let getter = lua.create_function(move |lua, (global, key): (Table, String)| {
+            match global.raw_get::<_, Value>(key.clone())?.into() {
+                Value::Nil => (),
+                local_value => return Ok(local_value),
+            };
+            let storage = lua
+                .app_data_ref::<Arc<X>>()
+                .ok_or(mlua::Error::MismatchedRegistryKey)?
+                .clone();
+            let value = block_on(async move { storage.get(instance, key.into()).await });
+            Self::var_value_to_lua_val(lua, value)
+        })?;
+        let setter =
+            lua.create_function(move |lua, (global, key, value): (Table, String, Value)| {
+                let storage = lua
+                    .app_data_ref::<Arc<X>>()
+                    .ok_or(mlua::Error::MismatchedRegistryKey)?
+                    .clone();
+                let value: VarValue = if let Ok(value) = value.as_ref().try_into() {
+                    value
+                } else {
+                    return global.raw_set(key, value);
+                };
+                block_on(async move {
+                    storage.set(instance, key.into(), value).await;
+                    Ok(())
+                })
+            })?;
+
+        let metatable = lua.create_table()?;
+        metatable.raw_set("__index", getter)?;
+        metatable.raw_set("__newindex", setter)?;
+
+        Ok(Some(metatable))
+    }
+
+    /// Returns a new Lua VM and a list of all the global variables to be
+    /// persisted and read from the storage engine. Since lua is a dynamic
+    /// language, other state variables may be read/updated dynamically, which
+    /// is fine, this list is just for the initial state and any potential
+    /// optimization.
+    fn execute_program(state: Arc<X>, instance: usize, bytecode: &[u8]) -> mlua::Result<VarValue> {
+        let lua = Lua::new();
+        let globals = lua.globals();
+
+        let require = lua.create_function(|_, (_,): (String,)| -> mlua::Result<()> {
+            Err(mlua::Error::RuntimeError("require is not allowed".into()))
+        })?;
+
+        globals.set_metatable(Self::inject_dynamic_global_state(
+            &lua,
+            state.clone(),
+            instance,
+        )?);
+        lua.set_memory_limit(100 * 1024 * 1024)?;
+
+        // remove external require
+        globals.set("require", require)?;
+        drop(globals);
+
+        // load main program
+        let x: Value = lua.load(bytecode).call(())?;
+
+        // shutdown the execution and let the storage / state engine know so all
+        // locked variables by this execution_id can be released
+        block_on(async move {
+            state.shutdown(instance).await;
+        });
+
+        x.as_ref().try_into().map_err(|_| mlua::Error::StackError)
+    }
+
+    fn spawn(
+        storage: Arc<X>,
+        bytecode: Vec<u8>,
+        instances: Arc<AtomicUsize>,
+        exec_id: Arc<AtomicUsize>,
+        running: Arc<AtomicU16>,
+        receiver: Arc<Mutex<Receiver<VarValue, VarValue>>>,
+    ) {
+        if instances.load(Ordering::Relaxed) > 100 {
+            return;
+        }
+
+        instances.fetch_add(1, Ordering::Relaxed);
+        let max_timeout = Duration::from_secs(30);
+
+        tokio::task::spawn_blocking(move || {
+            loop {
+                if let Ok(mut queue) =
+                    futures::executor::block_on(timeout(max_timeout, receiver.lock()))
+                {
+                    if let Ok(Some((_inputs, output))) =
+                        futures::executor::block_on(timeout(max_timeout, queue.recv()))
+                    {
+                        let exec_id: usize = exec_id.fetch_add(1, Ordering::Relaxed);
+                        // drop queue lock to release the mutex so any other
+                        // free VM can use it to listen for incoming messages
+                        drop(queue);
+
+                        let ret =
+                            Self::execute_program(storage.clone(), exec_id, &bytecode).unwrap();
+
+                        running.fetch_add(1, Ordering::Relaxed);
+                        let _ = output.send(ret).unwrap();
+                        continue;
+                    }
+                }
+                break;
+            }
+
+            println!("Lua listener is exiting");
+            instances.fetch_sub(1, Ordering::Relaxed);
+        });
+    }
+
+    pub async fn exec(&self, input: Vec<VarValue>) -> VarValue {
+        let (return_notifier, return_listener) = oneshot::channel();
+        Self::spawn(
+            self.storage.clone(),
+            self.opcodes.clone(),
+            self.instances.clone(),
+            self.execution_id.clone(),
+            self.running.clone(),
+            self.receiver.clone(),
+        );
+        self.sender
+            .send((input, return_notifier))
+            .await
+            .expect("valid");
+        return_listener.await.expect("valid")
+    }
+}
+
+impl TryFrom<&Value<'_>> for VarValue {
+    type Error = String;
+
+    fn try_from(value: &Value<'_>) -> Result<Self, Self::Error> {
+        match value {
+            Value::Nil => Ok(VarValue::Nil),
+            Value::Boolean(b) => Ok(VarValue::Boolean(*b)),
+            Value::Integer(i) => Ok(VarValue::Integer((*i).into())),
+            Value::Number(n) => Ok(VarValue::Number(*n)),
+            Value::String(s) => Ok(VarValue::String(s.to_str().unwrap().to_owned())),
+            Value::Table(t) => {
+                let mut map = HashMap::new();
+                let mut iter = t.clone().pairs::<String, Value>().enumerate();
+                let mut is_vector = true;
+                while let Some((id, Ok((k, v)))) = iter.next() {
+                    if Ok(id + 1) != k.parse() {
+                        is_vector = false;
+                    }
+                    map.insert(k, v.as_ref().try_into()?);
+                }
+
+                Ok(if is_vector {
+                    let mut values = map
+                        .into_iter()
+                        .map(|(k, v)| k.parse().map(|k| (k, v)))
+                        .collect::<Result<Vec<(usize, VarValue)>, _>>()
+                        .unwrap();
+
+                    values.sort_by(|(a, _), (b, _)| a.cmp(b));
+
+                    VarValue::Vector(values.into_iter().map(|(_, v)| v).collect())
+                } else {
+                    VarValue::HashMap(map)
+                })
+            }
+            x => Err(format!("Invalid type: {:?}", x)),
+        }
+    }
+}
+
+#[derive(Debug)]
+pub struct VarStorageMem {
+    storage: RwLock<HashMap<String, VarValue>>,
+    locks: RwLock<HashMap<String, usize>>,
+    var_locked_by_instance: Mutex<HashMap<usize, Vec<String>>>,
+}
+
+impl Default for VarStorageMem {
+    fn default() -> Self {
+        Self {
+            storage: RwLock::new(HashMap::new()),
+            locks: RwLock::new(HashMap::new()),
+            var_locked_by_instance: Mutex::new(HashMap::new()),
+        }
+    }
+}
+
+impl VarStorageMem {
+    async fn lock(&self, instance: usize, var: &Variable) {
+        let locks = self.locks.read().await;
+        let name = var.name();
+
+        if locks.get(name).map(|v| *v) == Some(instance) {
+            // The variable is already locked by this instance
+            return;
+        }
+
+        drop(locks);
+
+        loop {
+            // wait here while the locked is not null or it is locked by another
+            // instance
+            let locks = self.locks.read().await;
+            let var_lock = locks.get(name).map(|v| *v);
+            if var_lock.is_none() || var_lock == Some(instance) {
+                break;
+            }
+            drop(locks);
+            tokio::time::sleep(Duration::from_micros(10)).await;
+        }
+
+        loop {
+            let mut locks = self.locks.write().await;
+            let var_lock = locks.get(name).map(|v| *v);
+
+            if !var_lock.is_none() {
+                if var_lock == Some(instance) {
+                    break;
+                }
+                drop(locks);
+                tokio::time::sleep(Duration::from_micros(10)).await;
+                continue;
+            }
+
+            locks.insert(name.to_owned(), instance);
+
+            let mut vars_by_instance = self.var_locked_by_instance.lock().await;
+            if let Some(vars) = vars_by_instance.get_mut(&instance) {
+                vars.push(name.to_owned());
+            } else {
+                vars_by_instance.insert(instance, vec![name.to_owned()]);
+            }
+        }
+    }
+}
+
+#[async_trait::async_trait]
+impl VarStorage for VarStorageMem {
+    async fn get(&self, instance: usize, var: Variable) -> VarValue {
+        self.lock(instance, &var).await;
+        self.storage
+            .read()
+            .await
+            .get(var.name())
+            .cloned()
+            .unwrap_or(VarValue::Nil)
+    }
+
+    async fn set(&self, instance: usize, var: Variable, value: VarValue) {
+        self.lock(instance, &var).await;
+        self.storage
+            .write()
+            .await
+            .insert(var.name().to_owned(), value);
+    }
+
+    async fn shutdown(&self, instance: usize) {
+        let mut vars_by_instance = self.var_locked_by_instance.lock().await;
+        let mut locks = self.locks.write().await;
+
+        if let Some(vars) = vars_by_instance.remove(&instance) {
+            for var in vars {
+                if locks.get(&var).map(|v| *v) == Some(instance) {
+                    locks.remove(&var);
+                }
+            }
+        }
+    }
+}
+
+pub struct Runtime<K: Hash + Eq, X: VarStorage + 'static> {
+    vms: RwLock<HashMap<K, Program<X>>>,
+}
+
+impl<K: Hash + Eq, X: VarStorage + 'static> Runtime<K, X> {
+    pub fn new() -> Arc<Self> {
+        Arc::new(Self {
+            vms: RwLock::new(HashMap::new()),
+        })
+    }
+
+    pub async fn register_program(&self, name: K, program: &str, storage: Arc<X>) -> bool {
+        self.register_opcodes(name, Compiler::new().compile(program), storage)
+            .await
+    }
+
+    pub async fn exec(&self, id: &K) -> Option<VarValue> {
+        if let Some(vm) = self.vms.read().await.get(id) {
+            Some(vm.exec(vec![VarValue::Integer(1)]).await)
+        } else {
+            None
+        }
+    }
+
+    pub async fn register_opcodes(&self, name: K, opcodes: Vec<u8>, storage: Arc<X>) -> bool {
+        let mut vms = self.vms.write().await;
+
+        vms.insert(name, Program::new(opcodes, storage)).is_some()
+    }
+
+    pub async fn shutdown(&self) {
+        let mut vms = self.vms.write().await;
+        vms.clear();
+    }
+}
+
+#[tokio::main]
+async fn main() {
+    use std::{sync::Arc, time::Instant};
+
+    let mem = Arc::new(VarStorageMem::default());
+
+    async fn do_loop(vms: Arc<Runtime<String, VarStorageMem>>) {
+        // Create N threads to execute the Lua code in parallel
+        let num_threads = 400;
+        let (tx, mut rx) = mpsc::channel(num_threads);
+        for _ in 0..num_threads {
+            let vm = vms.clone();
+            let tx_clone = tx.clone();
+
+            tokio::spawn(async move {
+                let start_time = Instant::now();
+                let result = vm.exec(&"foo".to_owned()).await;
+
+                // Send the result back to the main thread
+                let _ = tx_clone.send(result).await;
+
+                let elapsed_time = Instant::now() - start_time;
+
+                // Print the elapsed time in seconds and milliseconds
+                println!(
+                    "Elapsed time: {} seconds {} milliseconds",
+                    elapsed_time.as_secs(),
+                    elapsed_time.as_millis(),
+                );
+            });
+        }
+
+        drop(tx);
+
+        loop {
+            let result = rx.recv().await;
+            if result.is_none() {
+                break;
+            }
+            println!("Result: {:?}", result.unwrap());
+        }
+    }
+
+    let vms = Runtime::new();
+
+    // Compile Lua code
+    let _code = r#"
+        function add(a, b)
+            calls = calls + 1
+            print("Call from old " .. pid .. " "  .. calls)
+            return a + b
+        end
+        print("hello world " .. pid)
+    "#;
+    let code = r#"
+        if pid == nil then
+            pid = 0
+        end
+        pid = pid + 1
+        print("hello world " .. pid)
+        return true
+    "#;
+
+    let _ = vms
+        .register_program("foo".to_owned(), code, mem.clone())
+        .await;
+    do_loop(vms.clone()).await;
+
+    let code = r#"
+        if pid == nil then
+            pid = 0
+        end
+        pid = pid + 1
+        function add(a, b)
+            foo = {1,"foo"}
+            print("Call from new " .. pid .. " ")
+            return a + b
+        end
+        print("hello world " .. pid .. " = " .. add(pid, pid))
+        return false
+    "#;
+    let y = vms
+        .register_program("foo".to_owned(), code, mem.clone())
+        .await;
+    tokio::time::sleep(Duration::from_secs(3)).await;
+
+    do_loop(vms.clone()).await;
+
+    vms.shutdown().await;
+
+    tokio::time::sleep(Duration::from_secs(1)).await;
+
+    println!("{} {:?}", "foo", mem);
+}

+ 1 - 1
utxo/src/broadcaster.rs

@@ -66,7 +66,7 @@ impl Broadcaster {
 impl Worker for Broadcaster {
     type Payload = Transaction;
 
-    async fn process_request(&self) -> bool {
+    fn process_request(&self) -> bool {
         self.is_there_any_subscriber
             .load(std::sync::atomic::Ordering::Acquire)
     }

+ 1 - 1
utxo/src/ledger.rs

@@ -178,7 +178,7 @@ where
     /// instead of having them at the transaction layer
     async fn persist(&self, mut transaction: Transaction) -> Result<Transaction, Error> {
         transaction.persist(&self.config).await?;
-        self.brodcaster.process(transaction.clone()).await;
+        self.brodcaster.process(transaction.clone());
         Ok(transaction)
     }
 

+ 23 - 14
utxo/src/worker.rs

@@ -1,16 +1,14 @@
 //! Creates a worker thread and exposes a sender to communidate with
 use async_trait::async_trait;
 use chrono::Utc;
+use parking_lot::RwLock;
 use std::{
     ops::Deref,
     sync::{atomic::AtomicBool, Arc},
     time::Duration,
 };
 use tokio::{
-    sync::{
-        mpsc::{channel, error::TrySendError, Sender},
-        Mutex,
-    },
+    sync::mpsc::{channel, error::TrySendError, Sender},
     time::sleep,
 };
 
@@ -34,7 +32,7 @@ pub trait Worker: Send + Sync {
     async fn handler(&self, payload: Self::Payload);
 
     /// Whether or not to process the request
-    async fn process_request(&self) -> bool {
+    fn process_request(&self) -> bool {
         true
     }
 }
@@ -47,7 +45,7 @@ pub trait Worker: Send + Sync {
 /// The logic of having one or more instances of the Worker trait is abstracted in this structure.
 #[derive(Debug)]
 pub struct WorkerManager<W: Worker> {
-    sender: Mutex<Option<Sender<W::Payload>>>,
+    sender: RwLock<Option<Sender<W::Payload>>>,
     is_running: Arc<AtomicBool>,
     worker: Arc<W>,
 }
@@ -71,7 +69,7 @@ impl<W: Worker + 'static> WorkerManager<W> {
     /// Creates a new WorkerManager given a struct that implements the Worker trait
     pub fn new(worker: W) -> Self {
         Self {
-            sender: Mutex::new(None),
+            sender: RwLock::new(None),
             is_running: Arc::new(true.into()),
             worker: Arc::new(worker),
         }
@@ -103,18 +101,29 @@ impl<W: Worker + 'static> WorkerManager<W> {
     }
 
     /// Sends a message to be processed in another thread
-    pub async fn process(&self, message: W::Payload) {
-        if self.worker.process_request().await {
-            let mut sender = self.sender.lock().await;
+    pub fn process(&self, message: W::Payload) {
+        if self.worker.process_request() {
+            let sender = self.sender.read();
             match sender
                 .as_ref()
                 .map(|sender| sender.try_send(message.clone()))
             {
                 None | Some(Err(TrySendError::Closed(_))) => {
-                    // Either there is no running worker thread and it is closed already
-                    let new_worker = self.start_background_worker();
-                    let _ = new_worker.try_send(message);
-                    *sender = Some(new_worker);
+                    drop(sender);
+
+                    let mut sender = self.sender.write();
+
+                    if let Some(sender) = sender.as_ref() {
+                        // Check if another faster thread did not set sender to Some already
+                        let _ = sender.try_send(message.clone());
+                    } else {
+                        // Either there is no running worker thread and it is closed already, and
+                        // this thread is fastest and therefore will start the background worker and
+                        // will set the sender to Some
+                        let new_worker = self.start_background_worker();
+                        let _ = new_worker.try_send(message);
+                        *sender = Some(new_worker);
+                    }
                 }
                 _ => {}
             }