Browse Source

Merge pull request #3 from crodas/real-time-subscription

Working on having the subscription API
César D. Rodas 10 tháng trước cách đây
mục cha
commit
088174b3a0

+ 5 - 4
Cargo.lock

@@ -1111,7 +1111,7 @@ checksum = "1d930c203dd0b6ff06e0201a4a2fe9149b43c684fd4420555b26d21b1a02956f"
 dependencies = [
 dependencies = [
  "futures-core",
  "futures-core",
  "lock_api",
  "lock_api",
- "parking_lot 0.12.1",
+ "parking_lot 0.12.2",
 ]
 ]
 
 
 [[package]]
 [[package]]
@@ -1906,9 +1906,9 @@ dependencies = [
 
 
 [[package]]
 [[package]]
 name = "parking_lot"
 name = "parking_lot"
-version = "0.12.1"
+version = "0.12.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f"
+checksum = "7e4af0ca4f6caed20e900d564c242b8e5d4903fdacf31d3daf527b66fe6f42fb"
 dependencies = [
 dependencies = [
  "lock_api",
  "lock_api",
  "parking_lot_core 0.9.8",
  "parking_lot_core 0.9.8",
@@ -3049,7 +3049,7 @@ dependencies = [
  "libc",
  "libc",
  "mio 0.8.8",
  "mio 0.8.8",
  "num_cpus",
  "num_cpus",
- "parking_lot 0.12.1",
+ "parking_lot 0.12.2",
  "pin-project-lite 0.2.13",
  "pin-project-lite 0.2.13",
  "signal-hook-registry",
  "signal-hook-registry",
  "socket2 0.5.4",
  "socket2 0.5.4",
@@ -3263,6 +3263,7 @@ dependencies = [
  "borsh",
  "borsh",
  "chrono",
  "chrono",
  "futures",
  "futures",
+ "parking_lot 0.12.2",
  "rand 0.8.5",
  "rand 0.8.5",
  "serde",
  "serde",
  "sha2",
  "sha2",

+ 0 - 10
TODO.md

@@ -1,10 +0,0 @@
-- [x] Optimize `select_inputs_from_accounts` to return a single change operation instead of a vector
-- [x] Write cache layer on top of the storage layer, specially if accounts are settled
-- [ ] Improve read performance with SQLite
-- [ ] Add a locking mechanism, to either a start a tx per account, or use the storage engine as a lock mechanism (to lock the utxos)
-- [ ] Add ability to query accounts in a point in time
-- [ ] Write other servers, other than the restful server
-- [ ] Add caching layer: This cache layer can built on top of the utxo::ledger, because all operations can be safely cached until a new transaction referencing their account is issued, by that point, all the caches related to anaccount can be evicted
-- [ ] Build admin interface
-- [ ] Add memo to changes. Build append only table with all movements as
-      inserts. Wraps the objects to all their changes

+ 4 - 1
src/main.rs

@@ -1,3 +1,5 @@
+use std::sync::Arc;
+
 use actix_web::{
 use actix_web::{
     error::InternalError, get, middleware::Logger, post, web, App, HttpResponse, HttpServer,
     error::InternalError, get, middleware::Logger, post, web, App, HttpResponse, HttpServer,
     Responder,
     Responder,
@@ -34,6 +36,7 @@ impl Deposit {
                 &self.account,
                 &self.account,
                 self.amount.try_into()?,
                 self.amount.try_into()?,
                 self.status,
                 self.status,
+                vec![],
                 self.memo,
                 self.memo,
             )
             )
             .await?;
             .await?;
@@ -247,7 +250,7 @@ async fn update_status(
 }
 }
 
 
 pub struct Ledger {
 pub struct Ledger {
-    _inner: verax::Ledger<verax::storage::Cache<verax::storage::SQLite>>,
+    _inner: Arc<verax::Ledger<verax::storage::Cache<verax::storage::SQLite>>>,
 }
 }
 
 
 #[actix_web::main]
 #[actix_web::main]

+ 1 - 0
utxo/Cargo.toml

@@ -9,6 +9,7 @@ bech32 = "0.11.0"
 borsh = { version = "1.3.1", features = ["derive", "bytes", "de_strict_order"] }
 borsh = { version = "1.3.1", features = ["derive", "bytes", "de_strict_order"] }
 chrono = { version = "0.4.31", features = ["serde"] }
 chrono = { version = "0.4.31", features = ["serde"] }
 futures = { version = "0.3.30", optional = true }
 futures = { version = "0.3.30", optional = true }
+parking_lot = "0.12.2"
 serde = { version = "1.0.188", features = ["derive"] }
 serde = { version = "1.0.188", features = ["derive"] }
 sha2 = "0.10.7"
 sha2 = "0.10.7"
 sqlx = { version = "0.7.1", features = [
 sqlx = { version = "0.7.1", features = [

+ 540 - 0
utxo/main.rs

@@ -0,0 +1,540 @@
+use futures::executor::block_on;
+use mlua::{Compiler, Lua, Table, Value};
+use std::{
+    collections::HashMap,
+    hash::Hash,
+    sync::{
+        atomic::{AtomicU16, AtomicUsize, Ordering},
+        Arc,
+    },
+};
+use tokio::{
+    sync::{mpsc, oneshot, Mutex, RwLock},
+    time::{timeout, Duration},
+};
+
+#[async_trait::async_trait]
+pub trait VarStorage: Send + Sync {
+    async fn get(&self, instance: usize, var: Variable) -> VarValue;
+
+    async fn set(&self, instance: usize, var: Variable, value: VarValue);
+
+    async fn shutdown(&self, instance: usize);
+}
+
+type Sender<I, R> = mpsc::Sender<(Vec<I>, oneshot::Sender<R>)>;
+type Receiver<I, R> = mpsc::Receiver<(Vec<I>, oneshot::Sender<R>)>;
+
+#[derive(Debug)]
+pub struct Program<X>
+where
+    X: VarStorage + 'static,
+{
+    opcodes: Vec<u8>,
+    instances: Arc<AtomicUsize>,
+    running: Arc<AtomicU16>,
+    execution_id: Arc<AtomicUsize>,
+    storage: Arc<X>,
+    sender: Sender<VarValue, VarValue>,
+    receiver: Arc<Mutex<Receiver<VarValue, VarValue>>>,
+}
+
+#[derive(Debug, Clone)]
+pub enum VarValue {
+    /// The Lua value `nil`.
+    Nil,
+    /// The Lua value `true` or `false`.
+    Boolean(bool),
+    /// Integer number
+    Integer(i128),
+    /// A floating point number.
+    Number(f64),
+    /// String
+    String(String),
+    /// A vector
+    Vector(Vec<VarValue>),
+    /// A
+    HashMap(HashMap<String, VarValue>),
+    /// An error
+    ErrorType(String),
+}
+
+pub enum Variable {
+    Balances,
+    Accounts,
+    Transactions,
+    Payments,
+    Other(String),
+}
+
+impl From<String> for Variable {
+    fn from(s: String) -> Self {
+        match s.as_str() {
+            "balances" => Self::Balances,
+            "accounts" => Self::Accounts,
+            "transactions" => Self::Transactions,
+            "payments" => Self::Payments,
+            _ => Self::Other(s),
+        }
+    }
+}
+
+impl Variable {
+    pub fn name<'a>(&'a self) -> &'a str {
+        match self {
+            Self::Balances => "balances",
+            Self::Accounts => "accounts",
+            Self::Transactions => "transactions",
+            Self::Payments => "payments",
+            Self::Other(s) => s,
+        }
+    }
+}
+
+impl<X> Program<X>
+where
+    X: VarStorage + 'static,
+{
+    pub fn new(opcodes: Vec<u8>, storage: Arc<X>) -> Program<X> {
+        let (sender, receiver) = mpsc::channel(100);
+        Self {
+            storage,
+            instances: Arc::new(AtomicUsize::new(0)),
+            execution_id: Arc::new(AtomicUsize::new(0)),
+            running: Arc::new(AtomicU16::new(0)),
+            receiver: Arc::new(Mutex::new(receiver)),
+            opcodes,
+            sender,
+        }
+    }
+
+    fn var_value_to_lua_val(lua: &Lua, value: VarValue) -> mlua::Result<Value> {
+        match value {
+            VarValue::Nil => Ok(Value::Nil),
+            VarValue::Boolean(b) => Ok(Value::Boolean(b)),
+            VarValue::Integer(i) => Ok(Value::Integer(i.try_into().unwrap())),
+            VarValue::Number(n) => Ok(Value::Number(n)),
+            VarValue::String(s) => Ok(Value::String(lua.create_string(&s)?)),
+            VarValue::HashMap(map) => {
+                let table = lua.create_table()?;
+                for (k, v) in map {
+                    table.set(k, Self::var_value_to_lua_val(lua, v)?)?;
+                }
+                Ok(Value::Table(table))
+            }
+            VarValue::ErrorType(e) => Err(mlua::Error::RuntimeError(e.to_string())),
+            _ => Err(mlua::Error::RuntimeError("Invalid type".into())),
+        }
+    }
+
+    fn inject_dynamic_global_state(
+        lua: &Lua,
+        storage: Arc<X>,
+        instance: usize,
+    ) -> mlua::Result<Option<Table>> {
+        lua.set_app_data(storage);
+
+        let getter = lua.create_function(move |lua, (global, key): (Table, String)| {
+            match global.raw_get::<_, Value>(key.clone())?.into() {
+                Value::Nil => (),
+                local_value => return Ok(local_value),
+            };
+            let storage = lua
+                .app_data_ref::<Arc<X>>()
+                .ok_or(mlua::Error::MismatchedRegistryKey)?
+                .clone();
+            let value = block_on(async move { storage.get(instance, key.into()).await });
+            Self::var_value_to_lua_val(lua, value)
+        })?;
+        let setter =
+            lua.create_function(move |lua, (global, key, value): (Table, String, Value)| {
+                let storage = lua
+                    .app_data_ref::<Arc<X>>()
+                    .ok_or(mlua::Error::MismatchedRegistryKey)?
+                    .clone();
+                let value: VarValue = if let Ok(value) = value.as_ref().try_into() {
+                    value
+                } else {
+                    return global.raw_set(key, value);
+                };
+                block_on(async move {
+                    storage.set(instance, key.into(), value).await;
+                    Ok(())
+                })
+            })?;
+
+        let metatable = lua.create_table()?;
+        metatable.raw_set("__index", getter)?;
+        metatable.raw_set("__newindex", setter)?;
+
+        Ok(Some(metatable))
+    }
+
+    /// Returns a new Lua VM and a list of all the global variables to be
+    /// persisted and read from the storage engine. Since lua is a dynamic
+    /// language, other state variables may be read/updated dynamically, which
+    /// is fine, this list is just for the initial state and any potential
+    /// optimization.
+    fn execute_program(state: Arc<X>, instance: usize, bytecode: &[u8]) -> mlua::Result<VarValue> {
+        let lua = Lua::new();
+        let globals = lua.globals();
+
+        let require = lua.create_function(|_, (_,): (String,)| -> mlua::Result<()> {
+            Err(mlua::Error::RuntimeError("require is not allowed".into()))
+        })?;
+
+        globals.set_metatable(Self::inject_dynamic_global_state(
+            &lua,
+            state.clone(),
+            instance,
+        )?);
+        lua.set_memory_limit(100 * 1024 * 1024)?;
+
+        // remove external require
+        globals.set("require", require)?;
+        drop(globals);
+
+        // load main program
+        let x: Value = lua.load(bytecode).call(())?;
+
+        // shutdown the execution and let the storage / state engine know so all
+        // locked variables by this execution_id can be released
+        block_on(async move {
+            state.shutdown(instance).await;
+        });
+
+        x.as_ref().try_into().map_err(|_| mlua::Error::StackError)
+    }
+
+    fn spawn(
+        storage: Arc<X>,
+        bytecode: Vec<u8>,
+        instances: Arc<AtomicUsize>,
+        exec_id: Arc<AtomicUsize>,
+        running: Arc<AtomicU16>,
+        receiver: Arc<Mutex<Receiver<VarValue, VarValue>>>,
+    ) {
+        if instances.load(Ordering::Relaxed) > 100 {
+            return;
+        }
+
+        instances.fetch_add(1, Ordering::Relaxed);
+        let max_timeout = Duration::from_secs(30);
+
+        tokio::task::spawn_blocking(move || {
+            loop {
+                if let Ok(mut queue) =
+                    futures::executor::block_on(timeout(max_timeout, receiver.lock()))
+                {
+                    if let Ok(Some((_inputs, output))) =
+                        futures::executor::block_on(timeout(max_timeout, queue.recv()))
+                    {
+                        let exec_id: usize = exec_id.fetch_add(1, Ordering::Relaxed);
+                        // drop queue lock to release the mutex so any other
+                        // free VM can use it to listen for incoming messages
+                        drop(queue);
+
+                        let ret =
+                            Self::execute_program(storage.clone(), exec_id, &bytecode).unwrap();
+
+                        running.fetch_add(1, Ordering::Relaxed);
+                        let _ = output.send(ret).unwrap();
+                        continue;
+                    }
+                }
+                break;
+            }
+
+            println!("Lua listener is exiting");
+            instances.fetch_sub(1, Ordering::Relaxed);
+        });
+    }
+
+    pub async fn exec(&self, input: Vec<VarValue>) -> VarValue {
+        let (return_notifier, return_listener) = oneshot::channel();
+        Self::spawn(
+            self.storage.clone(),
+            self.opcodes.clone(),
+            self.instances.clone(),
+            self.execution_id.clone(),
+            self.running.clone(),
+            self.receiver.clone(),
+        );
+        self.sender
+            .send((input, return_notifier))
+            .await
+            .expect("valid");
+        return_listener.await.expect("valid")
+    }
+}
+
+impl TryFrom<&Value<'_>> for VarValue {
+    type Error = String;
+
+    fn try_from(value: &Value<'_>) -> Result<Self, Self::Error> {
+        match value {
+            Value::Nil => Ok(VarValue::Nil),
+            Value::Boolean(b) => Ok(VarValue::Boolean(*b)),
+            Value::Integer(i) => Ok(VarValue::Integer((*i).into())),
+            Value::Number(n) => Ok(VarValue::Number(*n)),
+            Value::String(s) => Ok(VarValue::String(s.to_str().unwrap().to_owned())),
+            Value::Table(t) => {
+                let mut map = HashMap::new();
+                let mut iter = t.clone().pairs::<String, Value>().enumerate();
+                let mut is_vector = true;
+                while let Some((id, Ok((k, v)))) = iter.next() {
+                    if Ok(id + 1) != k.parse() {
+                        is_vector = false;
+                    }
+                    map.insert(k, v.as_ref().try_into()?);
+                }
+
+                Ok(if is_vector {
+                    let mut values = map
+                        .into_iter()
+                        .map(|(k, v)| k.parse().map(|k| (k, v)))
+                        .collect::<Result<Vec<(usize, VarValue)>, _>>()
+                        .unwrap();
+
+                    values.sort_by(|(a, _), (b, _)| a.cmp(b));
+
+                    VarValue::Vector(values.into_iter().map(|(_, v)| v).collect())
+                } else {
+                    VarValue::HashMap(map)
+                })
+            }
+            x => Err(format!("Invalid type: {:?}", x)),
+        }
+    }
+}
+
+#[derive(Debug)]
+pub struct VarStorageMem {
+    storage: RwLock<HashMap<String, VarValue>>,
+    locks: RwLock<HashMap<String, usize>>,
+    var_locked_by_instance: Mutex<HashMap<usize, Vec<String>>>,
+}
+
+impl Default for VarStorageMem {
+    fn default() -> Self {
+        Self {
+            storage: RwLock::new(HashMap::new()),
+            locks: RwLock::new(HashMap::new()),
+            var_locked_by_instance: Mutex::new(HashMap::new()),
+        }
+    }
+}
+
+impl VarStorageMem {
+    async fn lock(&self, instance: usize, var: &Variable) {
+        let locks = self.locks.read().await;
+        let name = var.name();
+
+        if locks.get(name).map(|v| *v) == Some(instance) {
+            // The variable is already locked by this instance
+            return;
+        }
+
+        drop(locks);
+
+        loop {
+            // wait here while the locked is not null or it is locked by another
+            // instance
+            let locks = self.locks.read().await;
+            let var_lock = locks.get(name).map(|v| *v);
+            if var_lock.is_none() || var_lock == Some(instance) {
+                break;
+            }
+            drop(locks);
+            tokio::time::sleep(Duration::from_micros(10)).await;
+        }
+
+        loop {
+            let mut locks = self.locks.write().await;
+            let var_lock = locks.get(name).map(|v| *v);
+
+            if !var_lock.is_none() {
+                if var_lock == Some(instance) {
+                    break;
+                }
+                drop(locks);
+                tokio::time::sleep(Duration::from_micros(10)).await;
+                continue;
+            }
+
+            locks.insert(name.to_owned(), instance);
+
+            let mut vars_by_instance = self.var_locked_by_instance.lock().await;
+            if let Some(vars) = vars_by_instance.get_mut(&instance) {
+                vars.push(name.to_owned());
+            } else {
+                vars_by_instance.insert(instance, vec![name.to_owned()]);
+            }
+        }
+    }
+}
+
+#[async_trait::async_trait]
+impl VarStorage for VarStorageMem {
+    async fn get(&self, instance: usize, var: Variable) -> VarValue {
+        self.lock(instance, &var).await;
+        self.storage
+            .read()
+            .await
+            .get(var.name())
+            .cloned()
+            .unwrap_or(VarValue::Nil)
+    }
+
+    async fn set(&self, instance: usize, var: Variable, value: VarValue) {
+        self.lock(instance, &var).await;
+        self.storage
+            .write()
+            .await
+            .insert(var.name().to_owned(), value);
+    }
+
+    async fn shutdown(&self, instance: usize) {
+        let mut vars_by_instance = self.var_locked_by_instance.lock().await;
+        let mut locks = self.locks.write().await;
+
+        if let Some(vars) = vars_by_instance.remove(&instance) {
+            for var in vars {
+                if locks.get(&var).map(|v| *v) == Some(instance) {
+                    locks.remove(&var);
+                }
+            }
+        }
+    }
+}
+
+pub struct Runtime<K: Hash + Eq, X: VarStorage + 'static> {
+    vms: RwLock<HashMap<K, Program<X>>>,
+}
+
+impl<K: Hash + Eq, X: VarStorage + 'static> Runtime<K, X> {
+    pub fn new() -> Arc<Self> {
+        Arc::new(Self {
+            vms: RwLock::new(HashMap::new()),
+        })
+    }
+
+    pub async fn register_program(&self, name: K, program: &str, storage: Arc<X>) -> bool {
+        self.register_opcodes(name, Compiler::new().compile(program), storage)
+            .await
+    }
+
+    pub async fn exec(&self, id: &K) -> Option<VarValue> {
+        if let Some(vm) = self.vms.read().await.get(id) {
+            Some(vm.exec(vec![VarValue::Integer(1)]).await)
+        } else {
+            None
+        }
+    }
+
+    pub async fn register_opcodes(&self, name: K, opcodes: Vec<u8>, storage: Arc<X>) -> bool {
+        let mut vms = self.vms.write().await;
+
+        vms.insert(name, Program::new(opcodes, storage)).is_some()
+    }
+
+    pub async fn shutdown(&self) {
+        let mut vms = self.vms.write().await;
+        vms.clear();
+    }
+}
+
+#[tokio::main]
+async fn main() {
+    use std::{sync::Arc, time::Instant};
+
+    let mem = Arc::new(VarStorageMem::default());
+
+    async fn do_loop(vms: Arc<Runtime<String, VarStorageMem>>) {
+        // Create N threads to execute the Lua code in parallel
+        let num_threads = 400;
+        let (tx, mut rx) = mpsc::channel(num_threads);
+        for _ in 0..num_threads {
+            let vm = vms.clone();
+            let tx_clone = tx.clone();
+
+            tokio::spawn(async move {
+                let start_time = Instant::now();
+                let result = vm.exec(&"foo".to_owned()).await;
+
+                // Send the result back to the main thread
+                let _ = tx_clone.send(result).await;
+
+                let elapsed_time = Instant::now() - start_time;
+
+                // Print the elapsed time in seconds and milliseconds
+                println!(
+                    "Elapsed time: {} seconds {} milliseconds",
+                    elapsed_time.as_secs(),
+                    elapsed_time.as_millis(),
+                );
+            });
+        }
+
+        drop(tx);
+
+        loop {
+            let result = rx.recv().await;
+            if result.is_none() {
+                break;
+            }
+            println!("Result: {:?}", result.unwrap());
+        }
+    }
+
+    let vms = Runtime::new();
+
+    // Compile Lua code
+    let _code = r#"
+        function add(a, b)
+            calls = calls + 1
+            print("Call from old " .. pid .. " "  .. calls)
+            return a + b
+        end
+        print("hello world " .. pid)
+    "#;
+    let code = r#"
+        if pid == nil then
+            pid = 0
+        end
+        pid = pid + 1
+        print("hello world " .. pid)
+        return true
+    "#;
+
+    let _ = vms
+        .register_program("foo".to_owned(), code, mem.clone())
+        .await;
+    do_loop(vms.clone()).await;
+
+    let code = r#"
+        if pid == nil then
+            pid = 0
+        end
+        pid = pid + 1
+        function add(a, b)
+            foo = {1,"foo"}
+            print("Call from new " .. pid .. " ")
+            return a + b
+        end
+        print("hello world " .. pid .. " = " .. add(pid, pid))
+        return false
+    "#;
+    let y = vms
+        .register_program("foo".to_owned(), code, mem.clone())
+        .await;
+    tokio::time::sleep(Duration::from_secs(3)).await;
+
+    do_loop(vms.clone()).await;
+
+    vms.shutdown().await;
+
+    tokio::time::sleep(Duration::from_secs(1)).await;
+
+    println!("{} {:?}", "foo", mem);
+}

+ 135 - 0
utxo/src/broadcaster.rs

@@ -0,0 +1,135 @@
+//! Broadcaster implementation
+use crate::{worker::Worker, Filter, FilterableValue, Transaction};
+use async_trait::async_trait;
+use std::{
+    collections::HashMap,
+    sync::atomic::{AtomicBool, AtomicUsize},
+};
+use tokio::sync::{
+    mpsc::{error::TrySendError, Sender},
+    RwLock,
+};
+
+#[derive(Debug)]
+/// Broadcaster
+///
+/// This structure broadcasts the transactions to all subscribers in a separated working thread.
+pub struct Broadcaster {
+    subscribers: RwLock<HashMap<usize, Sender<Transaction>>>,
+    subscriptions: RwLock<HashMap<FilterableValue, Vec<(Filter, usize)>>>,
+    is_there_any_subscriber: AtomicBool,
+    index: AtomicUsize,
+}
+
+impl Default for Broadcaster {
+    fn default() -> Self {
+        Self {
+            subscribers: RwLock::new(HashMap::<usize, Sender<_>>::new()),
+            subscriptions: RwLock::new(HashMap::<FilterableValue, Vec<_>>::new()),
+            is_there_any_subscriber: false.into(),
+            index: 0.into(),
+        }
+    }
+}
+
+impl Broadcaster {
+    pub async fn subscribers(&self) -> usize {
+        self.subscribers.read().await.len()
+    }
+
+    /// Adds a subscriber to new transactions given a filter
+    pub async fn subscribe(&self, filter: Filter, sender: Sender<Transaction>) {
+        let mut listeners = self.subscriptions.write().await;
+        let filter = filter.prepare();
+
+        let sender_index = self
+            .index
+            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
+
+        self.subscribers.write().await.insert(sender_index, sender);
+
+        self.is_there_any_subscriber
+            .store(true, std::sync::atomic::Ordering::Release);
+
+        let (primary_filter, filter) = filter.get_primary_filter();
+        let key_filter: Vec<FilterableValue> = primary_filter.into();
+
+        for key_filter in key_filter {
+            if let Some(previous_values) = listeners.get_mut(&key_filter) {
+                previous_values.push((filter.clone(), sender_index));
+            } else {
+                listeners.insert(key_filter, vec![(filter.clone(), sender_index)]);
+            }
+        }
+    }
+}
+
+#[async_trait]
+impl Worker for Broadcaster {
+    type Payload = Transaction;
+
+    fn process_request(&self) -> bool {
+        self.is_there_any_subscriber
+            .load(std::sync::atomic::Ordering::Acquire)
+    }
+
+    async fn handler(&self, transaction: Self::Payload) {
+        let listeners = self.subscriptions.read().await;
+        let senders = self.subscribers.read().await;
+
+        let mut subscriptions_to_reindex = vec![];
+        let mut senders_to_remove = vec![];
+
+        for primary_filter in transaction.get_filterable_fields() {
+            let listeners = if let Some(listeners) = listeners.get(&primary_filter) {
+                listeners
+            } else {
+                continue;
+            };
+
+            for (filter, sender_index) in listeners {
+                if filter.matches(&transaction.transaction, &transaction.revision) {
+                    if let Some(Err(TrySendError::Closed(_))) = senders
+                        .get(sender_index)
+                        .map(|sender| sender.try_send(transaction.clone()))
+                    {
+                        senders_to_remove.push(*sender_index);
+                        subscriptions_to_reindex.push(primary_filter.clone());
+                    }
+                }
+            }
+        }
+
+        drop(listeners);
+        drop(senders);
+
+        if !senders_to_remove.is_empty() {
+            let mut listeners = self.subscriptions.write().await;
+            let mut senders = self.subscribers.write().await;
+
+            for to_remove in &senders_to_remove {
+                senders.remove(to_remove);
+            }
+
+            for to_rebuild in subscriptions_to_reindex {
+                if let Some(list_of_senders) = listeners.remove(&to_rebuild) {
+                    listeners.insert(
+                        to_rebuild,
+                        list_of_senders
+                            .into_iter()
+                            .filter(|x| senders.contains_key(&x.1))
+                            .collect::<Vec<_>>(),
+                    );
+                }
+            }
+
+            drop(listeners);
+            drop(senders);
+
+            if self.subscribers().await == 0 {
+                self.is_there_any_subscriber
+                    .store(false, std::sync::atomic::Ordering::Release);
+            }
+        }
+    }
+}

+ 215 - 2
utxo/src/filter.rs

@@ -1,12 +1,76 @@
-use crate::{AccountId, RevId, Tag, TxId, Type};
+use crate::{AccountId, BaseTx, RevId, Revision, Status, Tag, TxId, Type};
 use chrono::{DateTime, Utc};
 use chrono::{DateTime, Utc};
 use serde::{Deserialize, Serialize};
 use serde::{Deserialize, Serialize};
 
 
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+/// The primary filter is used to filter the transactions before applying the other filters. Think
+/// of it like which key is used to filter transactions quickly before applying the other filters.
+pub enum PrimaryFilter {
+    /// By transaction ID
+    TxId(Vec<TxId>),
+    /// By revision ID
+    Revision(Vec<RevId>),
+    /// By accounts
+    Account(Vec<AccountId>),
+    /// By transaction type
+    Type(Vec<Type>),
+    /// By transaction status
+    Status(Vec<Status>),
+    /// By tags
+    Tags(Vec<Tag>),
+    /// By transaction status
+    Stream,
+}
+
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+#[allow(warnings)]
+/// Filterable Value
+///
+/// Individual value from a transaction, and their type, that can be used to filter a transaction,
+/// either at realtime throught the subscription API, or through the filter API in the storage layer
+pub enum FilterableValue {
+    /// Transaction ID
+    TxId(TxId),
+    /// Revision ID
+    Revision(RevId),
+    /// Account
+    Account(AccountId),
+    /// Status
+    Status(Status),
+    /// Transaction type
+    Type(Type),
+    /// Tag
+    Tag(Tag),
+    /// Any other filterable value. This is being used to subscribe to all new transactions
+    Anything,
+}
+
+impl From<PrimaryFilter> for Vec<FilterableValue> {
+    fn from(value: PrimaryFilter) -> Self {
+        match value {
+            PrimaryFilter::TxId(ids) => ids.into_iter().map(FilterableValue::TxId).collect(),
+            PrimaryFilter::Revision(revisions) => revisions
+                .into_iter()
+                .map(FilterableValue::Revision)
+                .collect(),
+            PrimaryFilter::Account(accounts) => {
+                accounts.into_iter().map(FilterableValue::Account).collect()
+            }
+            PrimaryFilter::Status(statuses) => {
+                statuses.into_iter().map(FilterableValue::Status).collect()
+            }
+            PrimaryFilter::Type(types) => types.into_iter().map(FilterableValue::Type).collect(),
+            PrimaryFilter::Tags(tags) => tags.into_iter().map(FilterableValue::Tag).collect(),
+            PrimaryFilter::Stream => vec![FilterableValue::Anything],
+        }
+    }
+}
+
 /// Filter transactions
 /// Filter transactions
 ///
 ///
 /// All this filters options are AND, meaning that an object must match all the
 /// All this filters options are AND, meaning that an object must match all the
 /// requirements in order to be included in the resultset.
 /// requirements in order to be included in the resultset.
-#[derive(Clone, Debug, Serialize, Deserialize, Default)]
+#[derive(Clone, Debug, Serialize, PartialEq, Eq, Hash, Deserialize, Default)]
 pub struct Filter {
 pub struct Filter {
     /// List of transaction IDs to query
     /// List of transaction IDs to query
     #[serde(skip_serializing_if = "Vec::is_empty")]
     #[serde(skip_serializing_if = "Vec::is_empty")]
@@ -20,6 +84,9 @@ pub struct Filter {
     /// List of transaction types-kind
     /// List of transaction types-kind
     #[serde(rename = "type", skip_serializing_if = "Vec::is_empty")]
     #[serde(rename = "type", skip_serializing_if = "Vec::is_empty")]
     pub typ: Vec<Type>,
     pub typ: Vec<Type>,
+    /// List of statuses to query
+    #[serde(skip_serializing_if = "Vec::is_empty")]
+    pub status: Vec<Status>,
     /// List of transactions by tags
     /// List of transactions by tags
     #[serde(skip_serializing_if = "Vec::is_empty")]
     #[serde(skip_serializing_if = "Vec::is_empty")]
     pub tags: Vec<Tag>,
     pub tags: Vec<Tag>,
@@ -46,6 +113,94 @@ pub struct Filter {
 }
 }
 
 
 impl Filter {
 impl Filter {
+    /// Extracts the PrimaryFilter from the filter and prepares the fitler to be used with `matches`
+    /// method
+    pub fn get_primary_filter(mut self) -> (PrimaryFilter, Self) {
+        let primary_filter = if !self.revisions.is_empty() {
+            PrimaryFilter::Revision(std::mem::take(&mut self.revisions))
+        } else if !self.ids.is_empty() {
+            PrimaryFilter::TxId(std::mem::take(&mut self.ids))
+        } else if !self.accounts.is_empty() {
+            PrimaryFilter::Account(std::mem::take(&mut self.accounts))
+        } else if !self.typ.is_empty() {
+            PrimaryFilter::Type(std::mem::take(&mut self.typ))
+        } else if !self.tags.is_empty() {
+            PrimaryFilter::Tags(std::mem::take(&mut self.tags))
+        } else if !self.status.is_empty() {
+            PrimaryFilter::Status(std::mem::take(&mut self.status))
+        } else {
+            PrimaryFilter::Stream
+        };
+
+        (primary_filter, self.prepare())
+    }
+
+    /// Takes the filter and sorts it. This is a pre-requist for matching
+    pub fn prepare(mut self) -> Self {
+        self.ids.sort();
+        self.revisions.sort();
+        self.accounts.sort();
+        self.typ.sort();
+        self.status.sort();
+        self.tags.sort();
+        self
+    }
+
+    /// Check if the base transaction and the revision matches the current cursor filter
+    ///
+    /// Before this function is called, the filter needs to be prepareted with `prepare` function
+    pub fn matches(&self, base: &BaseTx, revision: &Revision) -> bool {
+        let since = self
+            .since
+            .map(|since| since <= base.created_at)
+            .unwrap_or(true);
+
+        let until = self
+            .until
+            .map(|until| until >= base.created_at)
+            .unwrap_or(true);
+
+        if !since || !until {
+            return false;
+        }
+
+        if !self.status.is_empty() && self.status.binary_search(&revision.status).is_err() {
+            return false;
+        }
+
+        if !self.ids.is_empty() && self.ids.binary_search(&revision.transaction_id).is_err() {
+            return false;
+        }
+
+        if !self.revisions.is_empty()
+            && self
+                .revisions
+                .binary_search(&revision.rev_id().expect("vv"))
+                .is_err()
+        {
+            return false;
+        }
+
+        if !self.typ.is_empty() && self.typ.binary_search(&base.typ).is_err() {
+            return false;
+        }
+
+        if !self.tags.is_empty() {
+            let mut found = false;
+            for tag in revision.tags.iter() {
+                if self.tags.binary_search(tag).is_ok() {
+                    found = true;
+                    break;
+                }
+            }
+            if !found {
+                return false;
+            }
+        }
+
+        true
+    }
+
     /// Adds a given kind to the filter
     /// Adds a given kind to the filter
     pub fn kind(mut self, kind: Type) -> Self {
     pub fn kind(mut self, kind: Type) -> Self {
         self.typ.push(kind);
         self.typ.push(kind);
@@ -147,3 +302,61 @@ pub(crate) mod option_ts_seconds {
         }
         }
     }
     }
 }
 }
+
+#[cfg(test)]
+mod test {
+    use chrono::DateTime;
+
+    use crate::{Filter, Transaction};
+
+    #[test]
+    fn since_and_until() {
+        let current = Transaction::new_external_deposit(
+            "test".to_owned(),
+            "settled".parse().expect("valid status"),
+            vec![],
+            vec![],
+        )
+        .expect("valid transaction");
+
+        let filter_until = Filter {
+            until: DateTime::from_timestamp(9999, 99),
+            ..Default::default()
+        }
+        .prepare();
+        let filter_since = Filter {
+            since: DateTime::from_timestamp(9999, 99),
+            ..Default::default()
+        }
+        .prepare();
+        assert!(!filter_until.matches(&current.transaction, &current.revision));
+        assert!(filter_since.matches(&current.transaction, &current.revision));
+    }
+
+    #[test]
+    fn status() {
+        let current = Transaction::new_external_deposit(
+            "test".to_owned(),
+            "settled".parse().expect("valid status"),
+            vec![],
+            vec![],
+        )
+        .expect("valid transaction");
+
+        let filter_foo = Filter {
+            status: vec!["foo".parse().expect("valid status")],
+            ..Default::default()
+        }
+        .prepare();
+        let filter_settled = Filter {
+            status: vec![
+                "settled".parse().expect("valid status"),
+                "a".parse().expect("valid statuses"),
+            ],
+            ..Default::default()
+        }
+        .prepare();
+        assert!(!filter_foo.matches(&current.transaction, &current.revision));
+        assert!(filter_settled.matches(&current.transaction, &current.revision));
+    }
+}

+ 219 - 65
utxo/src/ledger.rs

@@ -1,9 +1,17 @@
 use crate::{
 use crate::{
-    amount::AmountCents, config::Config, status::StatusManager, storage::Storage,
-    transaction::Type, AccountId, Amount, Error, Filter, PaymentFrom, PaymentId, RevId, Status,
-    Tag, Transaction, TxId,
+    amount::AmountCents,
+    broadcaster::Broadcaster,
+    config::Config,
+    status::{InternalStatus, StatusManager},
+    storage::{Batch, ReceivedPaymentStatus, Storage},
+    transaction::Error as TxError,
+    transaction::Type,
+    worker::WorkerManager,
+    AccountId, Amount, Error, Filter, PaymentFrom, PaymentId, RevId, Status, Tag, Transaction,
+    TxId,
 };
 };
 use std::{cmp::Ordering, collections::HashMap, sync::Arc};
 use std::{cmp::Ordering, collections::HashMap, sync::Arc};
+use tokio::sync::mpsc::{self, Receiver, Sender};
 
 
 /// The Verax ledger
 /// The Verax ledger
 #[derive(Debug)]
 #[derive(Debug)]
@@ -11,18 +19,8 @@ pub struct Ledger<S>
 where
 where
     S: Storage + Sync + Send,
     S: Storage + Sync + Send,
 {
 {
-    config: Arc<Config<S>>,
-}
-
-impl<S> Clone for Ledger<S>
-where
-    S: Storage + Sync + Send,
-{
-    fn clone(&self) -> Self {
-        Self {
-            config: self.config.clone(),
-        }
-    }
+    config: Config<S>,
+    broadcaster: WorkerManager<Broadcaster>,
 }
 }
 
 
 impl<S> Ledger<S>
 impl<S> Ledger<S>
@@ -30,28 +28,44 @@ where
     S: Storage + Sync + Send,
     S: Storage + Sync + Send,
 {
 {
     /// Creates a new ledger instance
     /// Creates a new ledger instance
-    pub fn new(config: Config<S>) -> Self {
-        Self {
-            config: Arc::new(config),
-        }
+    pub fn new(config: Config<S>) -> Arc<Self> {
+        Arc::new(Self {
+            config,
+            broadcaster: WorkerManager::new(Broadcaster::default()),
+        })
+    }
+
+    /// Subscribes to new transactions and revisions with a given filter
+    pub async fn subscribe(&self, filter: Filter) -> (Sender<Transaction>, Receiver<Transaction>) {
+        let (sender, receiver) = mpsc::channel(100);
+        self.subscribe_with_sender(filter, sender.clone()).await;
+        (sender, receiver)
+    }
+
+    /// Subscribe to new transactions and revisions with a given filter and a receiver.
+    pub async fn subscribe_with_sender(&self, filter: Filter, sender: Sender<Transaction>) {
+        self.broadcaster.subscribe(filter, sender).await;
+    }
+
+    /// Returns the total number of active subscribers
+    pub async fn subscribers(&self) -> usize {
+        self.broadcaster.subscribers().await
     }
     }
 
 
-    /// The internal usage is to select unspent payments for each account to
-    /// create new transactions. The external API however does not expose that
-    /// level of usage, instead it exposes a simple API to move funds using
-    /// accounts to debit from and accounts to credit to. A single transaction
-    /// can use multiple accounts to debit and credit, instead of a single
+    /// The internal usage is to select unspent payments for each account to create new
+    /// transactions. The external API however does not expose that level of usage, instead it
+    /// exposes a simple API to move funds using accounts to debit from and accounts to credit to. A
+    /// single transaction can use multiple accounts to debit and credit, instead of a single
     /// account.
     /// account.
     ///
     ///
-    /// This function selects the unspent payments to be used in a transaction,
-    /// in a descending order (making sure to include any negative deposit).
+    /// This function selects the unspent payments to be used in a transaction, in a descending
+    /// order (making sure to include any negative deposit).
     ///
     ///
-    /// This function returns a vector of payments to be used as inputs and
-    /// optionally a dependent transaction to be executed first. This
-    /// transaction is an internal transaction and it settles immediately. It is
-    /// used to split an existing payment into two payments, one to be used as
-    /// input and the other to be used as change. This is done to avoid locking
-    /// any change amount until the main transaction settles.
+    /// This function returns a vector of payments to be used as inputs and optionally a dependent
+    /// transaction to be executed first. This transaction is an internal transaction and it settles
+    /// immediately. It is used to split an existing payment into two payments, one to be used as
+    /// input and the  other to be used as change. This is done to avoid locking any change amount
+    /// until the main transaction settles.
     async fn select_payments_from_accounts(
     async fn select_payments_from_accounts(
         &self,
         &self,
         payments: Vec<(AccountId, Amount)>,
         payments: Vec<(AccountId, Amount)>,
@@ -172,6 +186,141 @@ where
         Ok((exchange_tx, payments))
         Ok((exchange_tx, payments))
     }
     }
 
 
+    #[inline]
+    /// Persist a new base transaction
+    ///
+    /// This operation should only happen once, because if it is executed multiple times the storage
+    /// layer should fail. Base transactions are not allowed to be ammened, only revisions.
+    async fn store_base_transaction(
+        transaction: &Transaction,
+        batch: &mut S::Batch<'_>,
+    ) -> Result<(), Error> {
+        let spends = transaction
+            .spends
+            .iter()
+            .map(|x| x.id.clone())
+            .collect::<Vec<_>>();
+        batch
+            .spend_payments(
+                &transaction.revision.transaction_id,
+                spends,
+                ReceivedPaymentStatus::Locked,
+            )
+            .await?;
+        batch
+            .create_payments(
+                &transaction.revision.transaction_id,
+                &transaction.creates,
+                ReceivedPaymentStatus::Locked,
+            )
+            .await?;
+
+        for account in transaction.accounts() {
+            batch
+                .relate_account_to_transaction(
+                    &transaction.revision.transaction_id,
+                    &account,
+                    transaction.typ,
+                )
+                .await?;
+        }
+        batch
+            .store_base_transaction(
+                &transaction.revision.transaction_id,
+                &transaction.transaction,
+            )
+            .await?;
+        Ok(())
+    }
+
+    /// Stores the current transaction object to the storage layer.
+    ///
+    /// This method is not idempotent, and it will fail if the transaction if the requested update
+    /// is not allowed.
+    ///
+    /// This function will store the base transaction if it is the first revision, and will create a
+    /// new revision otherwise.
+    pub async fn store(&self, transaction: Transaction) -> Result<Transaction, Error> {
+        transaction.validate()?;
+
+        let mut batch = self.config.storage.begin().await?;
+        if transaction.revision.previous.is_none() {
+            Self::store_base_transaction(&transaction, &mut batch).await?;
+        }
+
+        let (created_updated, spent_updated) = match self
+            .config
+            .status
+            .internal_type(&transaction.revision.status)
+        {
+            InternalStatus::Reverted => {
+                batch
+                    .update_transaction_payments(
+                        &transaction.id,
+                        ReceivedPaymentStatus::Failed,
+                        ReceivedPaymentStatus::Spendable,
+                    )
+                    .await?
+            }
+            InternalStatus::Spendable => {
+                batch
+                    .update_transaction_payments(
+                        &transaction.id,
+                        ReceivedPaymentStatus::Spendable,
+                        ReceivedPaymentStatus::Spent,
+                    )
+                    .await?
+            }
+            _ => (transaction.creates.len(), transaction.spends.len()),
+        };
+
+        if transaction.creates.len() != created_updated || transaction.spends.len() != spent_updated
+        {
+            return Err(Error::Transaction(TxError::NoUpdate));
+        }
+
+        if self
+            .config
+            .status
+            .is_spendable(&transaction.revision.status)
+        {
+            batch
+                .update_transaction_payments(
+                    &transaction.id,
+                    ReceivedPaymentStatus::Spendable,
+                    ReceivedPaymentStatus::Spent,
+                )
+                .await?;
+        }
+
+        batch
+            .store_revision(&transaction.revision_id, &transaction.revision)
+            .await?;
+
+        batch
+            .tag_transaction(
+                &transaction.id,
+                &transaction.transaction,
+                &transaction.revision.tags,
+            )
+            .await?;
+
+        batch
+            .update_transaction_revision(
+                &transaction.id,
+                &transaction.revision_id,
+                transaction.revision.previous.as_ref(),
+            )
+            .await?;
+
+        batch.commit().await?;
+
+        // The transaction is persisted and now it is time to broadcast it to any possible listener
+        self.broadcaster.process(transaction.clone());
+
+        Ok(transaction)
+    }
+
     /// Creates a new transaction and returns it.
     /// Creates a new transaction and returns it.
     ///
     ///
     /// The input is pretty simple, take this amounts from these given accounts
     /// The input is pretty simple, take this amounts from these given accounts
@@ -200,13 +349,11 @@ where
         to: Vec<(AccountId, Amount)>,
         to: Vec<(AccountId, Amount)>,
     ) -> Result<Transaction, Error> {
     ) -> Result<Transaction, Error> {
         let (change_transaction, payments) = self.select_payments_from_accounts(from).await?;
         let (change_transaction, payments) = self.select_payments_from_accounts(from).await?;
-        if let Some(mut change_tx) = change_transaction {
-            change_tx.persist(&self.config).await?;
+        if let Some(change_tx) = change_transaction {
+            self.store(change_tx).await?;
         }
         }
-        let mut transaction =
-            Transaction::new(reference, status, Type::Transaction, payments, to).await?;
-        transaction.persist(&self.config).await?;
-        Ok(transaction)
+        self.store(Transaction::new(reference, status, Type::Transaction, payments, to).await?)
+            .await
     }
     }
 
 
     /// Return the balances from a given account
     /// Return the balances from a given account
@@ -230,12 +377,16 @@ where
         account: &AccountId,
         account: &AccountId,
         amount: Amount,
         amount: Amount,
         status: Status,
         status: Status,
+        tags: Vec<Tag>,
         reference: String,
         reference: String,
     ) -> Result<Transaction, Error> {
     ) -> Result<Transaction, Error> {
-        let mut transaction =
-            Transaction::new_external_deposit(reference, status, vec![(account.clone(), amount)])?;
-        transaction.persist(&self.config).await?;
-        Ok(transaction)
+        self.store(Transaction::new_external_deposit(
+            reference,
+            status,
+            tags,
+            vec![(account.clone(), amount)],
+        )?)
+        .await
     }
     }
 
 
     /// Creates a new withdrawal transaction and returns it.
     /// Creates a new withdrawal transaction and returns it.
@@ -254,12 +405,13 @@ where
         let (change_transactions, payments) = self
         let (change_transactions, payments) = self
             .select_payments_from_accounts(vec![(account.clone(), amount)])
             .select_payments_from_accounts(vec![(account.clone(), amount)])
             .await?;
             .await?;
-        for mut change_tx in change_transactions.into_iter() {
-            change_tx.persist(&self.config).await?;
+        for change_tx in change_transactions.into_iter() {
+            self.store(change_tx).await?;
         }
         }
-        let mut transaction = Transaction::new_external_withdrawal(reference, status, payments)?;
-        transaction.persist(&self.config).await?;
-        Ok(transaction)
+        self.store(Transaction::new_external_withdrawal(
+            reference, status, payments,
+        )?)
+        .await
     }
     }
 
 
     /// Returns the payment object by a given payment id
     /// Returns the payment object by a given payment id
@@ -306,15 +458,16 @@ where
             limit: 1,
             limit: 1,
             ..Default::default()
             ..Default::default()
         };
         };
-        Ok(self
-            .config
-            .storage
-            .find(filter)
-            .await?
-            .pop()
-            .ok_or(Error::TxNotFound)?
-            .set_tags(&self.config, tags, reason)
-            .await?)
+        self.store(
+            self.config
+                .storage
+                .find(filter)
+                .await?
+                .pop()
+                .ok_or(Error::TxNotFound)?
+                .set_tags(tags, reason)?,
+        )
+        .await
     }
     }
 
 
     /// Attempts to change the status of a given transaction id. On success the
     /// Attempts to change the status of a given transaction id. On success the
@@ -330,14 +483,15 @@ where
             limit: 1,
             limit: 1,
             ..Default::default()
             ..Default::default()
         };
         };
-        Ok(self
-            .config
-            .storage
-            .find(filter)
-            .await?
-            .pop()
-            .ok_or(Error::TxNotFound)?
-            .change_status(&self.config, new_status, reason)
-            .await?)
+        self.store(
+            self.config
+                .storage
+                .find(filter)
+                .await?
+                .pop()
+                .ok_or(Error::TxNotFound)?
+                .change_status(&self.config, new_status, reason)?,
+        )
+        .await
     }
     }
 }
 }

+ 3 - 1
utxo/src/lib.rs

@@ -25,6 +25,7 @@
 
 
 mod amount;
 mod amount;
 mod asset;
 mod asset;
+mod broadcaster;
 mod config;
 mod config;
 mod error;
 mod error;
 mod filter;
 mod filter;
@@ -37,6 +38,7 @@ pub mod storage;
 #[cfg(test)]
 #[cfg(test)]
 mod tests;
 mod tests;
 mod transaction;
 mod transaction;
+mod worker;
 
 
 #[cfg(test)]
 #[cfg(test)]
 pub use self::storage::test as storage_test;
 pub use self::storage::test as storage_test;
@@ -45,7 +47,7 @@ pub use self::{
     amount::{Amount, AnyAmount, HumanAmount},
     amount::{Amount, AnyAmount, HumanAmount},
     asset::Asset,
     asset::Asset,
     error::Error,
     error::Error,
-    filter::Filter,
+    filter::{Filter, FilterableValue, PrimaryFilter},
     id::*,
     id::*,
     ledger::Ledger,
     ledger::Ledger,
     payment::PaymentFrom,
     payment::PaymentFrom,

+ 1 - 1
utxo/src/storage/cache/mod.rs

@@ -149,7 +149,7 @@ mod test {
 
 
     storage_test_suite!();
     storage_test_suite!();
 
 
-    pub async fn get_ledger_and_asset_manager() -> Ledger<Cache<SQLite>> {
+    pub async fn get_ledger_and_asset_manager() -> Arc<Ledger<Cache<SQLite>>> {
         let pool = SqlitePoolOptions::new()
         let pool = SqlitePoolOptions::new()
             .max_connections(1)
             .max_connections(1)
             .idle_timeout(None)
             .idle_timeout(None)

+ 6 - 80
utxo/src/storage/cursor.rs

@@ -1,25 +1,7 @@
 //! Cursor implementation
 //! Cursor implementation
-use crate::{AccountId, BaseTx, Filter, RevId, Revision, Tag, TxId, Type};
+use crate::{BaseTx, Filter, PrimaryFilter, Revision};
 
 
-#[derive(Debug, Clone)]
-/// The primary filter is used to filter the transactions before applying the other filters. Think
-/// of it like which key is used to filter transactions quickly before applying the other filters.
-pub enum PrimaryFilter {
-    /// By transaction ID
-    TxId(Vec<TxId>),
-    /// By revision ID
-    Revision(Vec<RevId>),
-    /// By accounts
-    Account(Vec<AccountId>),
-    /// By transaction type
-    Type(Vec<Type>),
-    /// By tags
-    Tags(Vec<Tag>),
-    /// By transaction status
-    Stream,
-}
-
-#[derive(Debug, Clone)]
+#[derive(Debug, PartialEq, Eq, Hash, Clone)]
 /// The cursor
 /// The cursor
 pub struct Cursor {
 pub struct Cursor {
     /// The primary filter
     /// The primary filter
@@ -29,27 +11,8 @@ pub struct Cursor {
 }
 }
 
 
 impl From<Filter> for Cursor {
 impl From<Filter> for Cursor {
-    fn from(mut filter: Filter) -> Self {
-        let primary_filter = if !filter.revisions.is_empty() {
-            PrimaryFilter::Revision(std::mem::take(&mut filter.revisions))
-        } else if !filter.ids.is_empty() {
-            PrimaryFilter::TxId(std::mem::take(&mut filter.ids))
-        } else if !filter.accounts.is_empty() {
-            PrimaryFilter::Account(std::mem::take(&mut filter.accounts))
-        } else if !filter.typ.is_empty() {
-            PrimaryFilter::Type(std::mem::take(&mut filter.typ))
-        } else if !filter.tags.is_empty() {
-            PrimaryFilter::Tags(std::mem::take(&mut filter.tags))
-        } else {
-            PrimaryFilter::Stream
-        };
-
-        filter.ids.sort();
-        filter.revisions.sort();
-        filter.accounts.sort();
-        filter.typ.sort();
-        filter.tags.sort();
-
+    fn from(filter: Filter) -> Self {
+        let (primary_filter, filter) = filter.get_primary_filter();
         Self {
         Self {
             primary_filter,
             primary_filter,
             filter,
             filter,
@@ -59,44 +22,7 @@ impl From<Filter> for Cursor {
 
 
 impl Cursor {
 impl Cursor {
     /// Check if the base transaction and the revision matches the current cursor filter
     /// Check if the base transaction and the revision matches the current cursor filter
-    pub fn matches(&self, base: &BaseTx, revision: &Revision) -> bool {
-        if !self.filter.ids.is_empty()
-            && self
-                .filter
-                .ids
-                .binary_search(&revision.transaction_id)
-                .is_err()
-        {
-            return false;
-        }
-
-        if !self.filter.revisions.is_empty()
-            && self
-                .filter
-                .revisions
-                .binary_search(&revision.rev_id().expect("vv"))
-                .is_err()
-        {
-            return false;
-        }
-
-        if !self.filter.typ.is_empty() && self.filter.typ.binary_search(&base.typ).is_err() {
-            return false;
-        }
-
-        if !self.filter.tags.is_empty() {
-            let mut found = false;
-            for tag in revision.tags.iter() {
-                if self.filter.tags.binary_search(tag).is_ok() {
-                    found = true;
-                    break;
-                }
-            }
-            if !found {
-                return false;
-            }
-        }
-
-        true
+    pub fn matches(&self, base_tx: &BaseTx, revision: &Revision) -> bool {
+        self.filter.matches(base_tx, revision)
     }
     }
 }
 }

+ 201 - 16
utxo/src/storage/mod.rs

@@ -12,7 +12,7 @@ mod cursor;
 #[cfg(any(feature = "sqlite", test))]
 #[cfg(any(feature = "sqlite", test))]
 pub mod sqlite;
 pub mod sqlite;
 pub use self::cache::Cache;
 pub use self::cache::Cache;
-pub use self::cursor::{Cursor, PrimaryFilter};
+pub use self::cursor::Cursor;
 #[cfg(any(feature = "sqlite", test))]
 #[cfg(any(feature = "sqlite", test))]
 pub use self::sqlite::SQLite;
 pub use self::sqlite::SQLite;
 
 
@@ -323,11 +323,11 @@ pub trait Storage {
 
 
 #[cfg(test)]
 #[cfg(test)]
 pub mod test {
 pub mod test {
-    use std::collections::HashMap;
-
     use super::*;
     use super::*;
     use crate::{config::Config, status::StatusManager, Ledger, Transaction};
     use crate::{config::Config, status::StatusManager, Ledger, Transaction};
     use rand::Rng;
     use rand::Rng;
+    use std::{collections::HashMap, time::Duration};
+    use tokio::time::sleep;
 
 
     #[macro_export]
     #[macro_export]
     macro_rules! storage_unit_test {
     macro_rules! storage_unit_test {
@@ -344,7 +344,7 @@ pub mod test {
     macro_rules! storage_test_suite {
     macro_rules! storage_test_suite {
         () => {
         () => {
             $crate::storage_unit_test!(transaction);
             $crate::storage_unit_test!(transaction);
-            $crate::storage_unit_test!(transaction_does_not_update_stale_transactions);
+            $crate::storage_unit_test!(transaction_does_not_update_stale_revision);
             $crate::storage_unit_test!(transaction_not_available_until_commit);
             $crate::storage_unit_test!(transaction_not_available_until_commit);
             $crate::storage_unit_test!(payments_always_include_negative_amounts);
             $crate::storage_unit_test!(payments_always_include_negative_amounts);
             $crate::storage_unit_test!(does_not_update_spent_payments);
             $crate::storage_unit_test!(does_not_update_spent_payments);
@@ -352,11 +352,13 @@ pub mod test {
             $crate::storage_unit_test!(spend_spendable_payments);
             $crate::storage_unit_test!(spend_spendable_payments);
             $crate::storage_unit_test!(relate_account_to_transaction);
             $crate::storage_unit_test!(relate_account_to_transaction);
             $crate::storage_unit_test!(find_transactions_by_tags);
             $crate::storage_unit_test!(find_transactions_by_tags);
+            $crate::storage_unit_test!(find_transactions_by_status);
             $crate::storage_unit_test!(not_spendable_new_payments_not_spendable);
             $crate::storage_unit_test!(not_spendable_new_payments_not_spendable);
+            $crate::storage_unit_test!(subscribe_realtime);
         };
         };
     }
     }
 
 
-    pub async fn transaction_does_not_update_stale_transactions<T>(storage: T)
+    pub async fn transaction_does_not_update_stale_revision<T>(storage: T)
     where
     where
         T: Storage + Send + Sync,
         T: Storage + Send + Sync,
     {
     {
@@ -365,30 +367,36 @@ pub mod test {
             status: StatusManager::default(),
             status: StatusManager::default(),
         };
         };
 
 
+        let ledger = Ledger::new(config);
+
         let asset: Asset = "USD/2".parse().expect("valid asset");
         let asset: Asset = "USD/2".parse().expect("valid asset");
-        let mut pending = Transaction::new_external_deposit(
+        let deposit = Transaction::new_external_deposit(
             "test reference".to_owned(),
             "test reference".to_owned(),
             "pending".into(),
             "pending".into(),
+            vec![],
             vec![(
             vec![(
                 "alice".parse().expect("account"),
                 "alice".parse().expect("account"),
                 asset.from_human("100.99").expect("valid amount"),
                 asset.from_human("100.99").expect("valid amount"),
             )],
             )],
         )
         )
         .expect("valid tx");
         .expect("valid tx");
-        pending.persist(&config).await.expect("valid insert");
 
 
-        pending
-            .clone()
-            .change_status(&config, "processing".into(), "some text".to_owned())
+        let deposit = ledger.store(deposit).await.expect("valid insert");
+
+        ledger
+            .change_status(
+                deposit.revision_id.clone(),
+                "processing".into(),
+                "some text".to_owned(),
+            )
             .await
             .await
-            .expect("valid update");
+            .expect("valid updated");
 
 
-        pending
+        ledger
             .change_status(
             .change_status(
-                &config,
-                "failed".into(),
-                "update from pending to failed (which is not the latest state and should fail)"
-                    .to_owned(),
+                deposit.revision_id.clone(),
+                "processing".into(),
+                "some text".to_owned(),
             )
             )
             .await
             .await
             .expect_err("stale updates are rejected by storage");
             .expect_err("stale updates are rejected by storage");
@@ -402,6 +410,7 @@ pub mod test {
         let deposit = Transaction::new_external_deposit(
         let deposit = Transaction::new_external_deposit(
             "test reference".to_owned(),
             "test reference".to_owned(),
             "settled".into(),
             "settled".into(),
+            vec![],
             vec![(
             vec![(
                 "alice".parse().expect("account"),
                 "alice".parse().expect("account"),
                 asset.from_human("100.99").expect("valid amount"),
                 asset.from_human("100.99").expect("valid amount"),
@@ -464,6 +473,7 @@ pub mod test {
         let deposit = Transaction::new_external_deposit(
         let deposit = Transaction::new_external_deposit(
             "test reference".to_owned(),
             "test reference".to_owned(),
             "settled".into(),
             "settled".into(),
+            vec![],
             vec![(
             vec![(
                 "alice".parse().expect("account"),
                 "alice".parse().expect("account"),
                 usd.from_human("100.99").expect("valid amount"),
                 usd.from_human("100.99").expect("valid amount"),
@@ -595,6 +605,110 @@ pub mod test {
         }
         }
     }
     }
 
 
+    pub async fn subscribe_realtime<T>(storage: T)
+    where
+        T: Storage + Send + Sync,
+    {
+        let ledger = Ledger::new(Config {
+            storage,
+            status: Default::default(),
+        });
+
+        let (_, mut subscription) = ledger
+            .subscribe(Filter {
+                tags: vec!["even".parse().expect("valid tag")],
+                ..Default::default()
+            })
+            .await;
+
+        assert_eq!(1, ledger.subscribers().await);
+
+        for i in 0..10 {
+            let usd: Asset = "USD/2".parse().expect("valid asset");
+            let account = format!("account-{}", i).parse().expect("valid account");
+
+            let deposit = ledger
+                .deposit(
+                    &account,
+                    usd.from_human(&format!("10{}.99", i))
+                        .expect("valid amount"),
+                    "settled".into(),
+                    vec![],
+                    format!("test deposit {}", i),
+                )
+                .await
+                .expect("valid deposit");
+
+            if i % 2 == 0 {
+                ledger
+                    .set_tags(
+                        deposit.revision_id,
+                        vec![
+                            "even".parse().expect("valid tag"),
+                            "all".parse().expect("valid tag"),
+                        ],
+                        "add tags".to_owned(),
+                    )
+                    .await
+                    .expect("tag tx");
+            } else {
+                ledger
+                    .set_tags(
+                        deposit.revision_id,
+                        vec![
+                            "odd".parse().expect("valid tag"),
+                            "all".parse().expect("valid tag"),
+                        ],
+                        "add tags".to_owned(),
+                    )
+                    .await
+                    .expect("tag tx");
+            }
+        }
+
+        sleep(Duration::from_secs(1)).await;
+
+        let expectactions = vec!["100.99", "102.99", "104.99", "106.99", "108.99", ""];
+
+        assert_eq!(1, ledger.subscribers().await);
+        for expectation in expectactions {
+            assert_eq!(
+                expectation.to_string(),
+                subscription
+                    .try_recv()
+                    .map(|t| t.creates[0].amount.to_string())
+                    .unwrap_or_default()
+            )
+        }
+        assert_eq!(1, ledger.subscribers().await);
+
+        drop(subscription);
+
+        // TODO: Update this test when the drop() triggers the subscribers hashmap to cleanup
+        assert_eq!(1, ledger.subscribers().await);
+
+        let usd: Asset = "USD/2".parse().expect("valid asset");
+        let account = "account-99".parse().expect("valid account");
+
+        ledger
+            .deposit(
+                &account,
+                usd.from_human(&"1010.99").expect("valid amount"),
+                "settled".into(),
+                vec!["even".parse().expect("valid tag")],
+                "test deposit after the subscription listener is dropped".to_owned(),
+            )
+            .await
+            .expect("valid deposit");
+
+        sleep(Duration::from_secs(1)).await;
+
+        assert_eq!(0, ledger.subscribers().await);
+
+        drop(ledger);
+        sleep(Duration::from_secs(1)).await;
+    }
+
     pub async fn find_transactions_by_tags<T>(storage: T)
     pub async fn find_transactions_by_tags<T>(storage: T)
     where
     where
         T: Storage + Send + Sync,
         T: Storage + Send + Sync,
@@ -613,6 +727,7 @@ pub mod test {
                     &account,
                     &account,
                     usd.from_human("100.99").expect("valid amount"),
                     usd.from_human("100.99").expect("valid amount"),
                     "settled".into(),
                     "settled".into(),
+                    vec![],
                     format!("test deposit {}", i),
                     format!("test deposit {}", i),
                 )
                 )
                 .await
                 .await
@@ -683,6 +798,75 @@ pub mod test {
         );
         );
     }
     }
 
 
+    pub async fn find_transactions_by_status<T>(storage: T)
+    where
+        T: Storage + Send + Sync,
+    {
+        let ledger = Ledger::new(Config {
+            storage,
+            status: Default::default(),
+        });
+
+        for i in 0..10 {
+            let usd: Asset = "USD/2".parse().expect("valid asset");
+            let account = format!("account-{}", i).parse().expect("valid account");
+
+            ledger
+                .deposit(
+                    &account,
+                    usd.from_human("100.99").expect("valid amount"),
+                    if i % 2 == 0 {
+                        "even".into()
+                    } else {
+                        "odd".into()
+                    },
+                    vec![],
+                    format!("test deposit {}", i),
+                )
+                .await
+                .expect("valid deposit");
+        }
+
+        assert_eq!(
+            5,
+            ledger
+                .get_transactions(Filter {
+                    status: vec!["even".parse().expect("valid tag")],
+                    ..Default::default()
+                })
+                .await
+                .expect("valid filter")
+                .len()
+        );
+
+        assert_eq!(
+            5,
+            ledger
+                .get_transactions(Filter {
+                    status: vec!["odd".parse().expect("valid tag")],
+                    ..Default::default()
+                })
+                .await
+                .expect("valid filter")
+                .len()
+        );
+
+        assert_eq!(
+            10,
+            ledger
+                .get_transactions(Filter {
+                    status: vec![
+                        "even".parse().expect("valid tag"),
+                        "odd".parse().expect("valid tag")
+                    ],
+                    ..Default::default()
+                })
+                .await
+                .expect("valid filter")
+                .len()
+        );
+    }
+
     pub async fn spend_spendable_payments<T>(storage: T)
     pub async fn spend_spendable_payments<T>(storage: T)
     where
     where
         T: Storage + Send + Sync,
         T: Storage + Send + Sync,
@@ -877,6 +1061,7 @@ pub mod test {
         let deposit = Transaction::new_external_deposit(
         let deposit = Transaction::new_external_deposit(
             "test reference".to_owned(),
             "test reference".to_owned(),
             "settled".into(),
             "settled".into(),
+            vec![],
             vec![(
             vec![(
                 account1.clone(),
                 account1.clone(),
                 usd.from_human("100.99").expect("valid amount"),
                 usd.from_human("100.99").expect("valid amount"),

+ 34 - 5
utxo/src/storage/sqlite/mod.rs

@@ -1,10 +1,10 @@
 //! SQLite storage layer for Verax
 //! SQLite storage layer for Verax
-use super::{Cursor, PrimaryFilter, ReceivedPaymentStatus};
+use super::{Cursor, ReceivedPaymentStatus};
 use crate::{
 use crate::{
     amount::AmountCents,
     amount::AmountCents,
     storage::{Error, Storage},
     storage::{Error, Storage},
     transaction::Revision,
     transaction::Revision,
-    AccountId, Amount, Asset, BaseTx, Filter, PaymentFrom, PaymentId, RevId, TxId,
+    AccountId, Amount, Asset, BaseTx, Filter, PaymentFrom, PaymentId, PrimaryFilter, RevId, TxId,
 };
 };
 use borsh::from_slice;
 use borsh::from_slice;
 use futures::TryStreamExt;
 use futures::TryStreamExt;
@@ -201,6 +201,36 @@ where
             }
             }
             query.fetch_all(executor).await
             query.fetch_all(executor).await
         }
         }
+        PrimaryFilter::Status(statuses) => {
+            let sql = format!(
+                r#"
+                  SELECT
+                      "bt"."blob",
+                      "b"."blob"
+                  FROM
+                      "transactions" as "t",
+                      "base_transactions" as "bt",
+                      "revisions" as "b"
+                  WHERE
+                      "b"."status" IN ({})
+                      AND "t"."revision_id" = "b"."revision_id"
+                      AND "t"."transaction_id" = "bt"."transaction_id"
+                      {} {}
+                  ORDER BY "t"."created_at" DESC
+                  LIMIT {} OFFSET {}
+                  "#,
+                "?,".repeat(statuses.len()).trim_end_matches(","),
+                since,
+                until,
+                limit,
+                cursor.filter.skip
+            );
+            let mut query = sqlx::query(&sql);
+            for status in statuses.iter() {
+                query = query.bind(status.to_string());
+            }
+            query.fetch_all(executor).await
+        }
         PrimaryFilter::Stream => {
         PrimaryFilter::Stream => {
             let sql = format!(
             let sql = format!(
                 r#"
                 r#"
@@ -301,7 +331,7 @@ impl SQLite {
             "to" VARCHAR(64) NOT NULL,
             "to" VARCHAR(64) NOT NULL,
             "status" INT NOT NULL,
             "status" INT NOT NULL,
             "asset" VARCHAR(10) NOT NULL,
             "asset" VARCHAR(10) NOT NULL,
-            "cents" STRING NOT NULL,
+            "cents" BIGINT NOT NULL,
             "is_negative" INT DEFAULT '0',
             "is_negative" INT DEFAULT '0',
             "spent_by" VARCHAR(64) DEFAULT NULL,
             "spent_by" VARCHAR(64) DEFAULT NULL,
             "created_at" DATETIME DEFAULT CURRENT_TIMESTAMP,
             "created_at" DATETIME DEFAULT CURRENT_TIMESTAMP,
@@ -320,8 +350,7 @@ impl SQLite {
         CREATE INDEX IF NOT EXISTS "sorted_account_transaction" ON "transaction_accounts" ("account_id", "id" desc);
         CREATE INDEX IF NOT EXISTS "sorted_account_transaction" ON "transaction_accounts" ("account_id", "id" desc);
         "#,
         "#,
         )
         )
-        .await
-        .expect("valid");
+        .await?;
         x.commit().await?;
         x.commit().await?;
         Ok(())
         Ok(())
     }
     }

+ 1 - 0
utxo/src/tests/deposit.rs

@@ -11,6 +11,7 @@ async fn pending_deposit_and_failure() {
             &source,
             &source,
             usd.from_human("30").expect("amount"),
             usd.from_human("30").expect("amount"),
             "processing".into(),
             "processing".into(),
+            vec![],
             "Test".to_owned(),
             "Test".to_owned(),
         )
         )
         .await
         .await

+ 10 - 3
utxo/src/tests/mod.rs

@@ -3,9 +3,10 @@ use crate::{
     AccountId, Amount, Error, Ledger, RevId, Status,
     AccountId, Amount, Error, Ledger, RevId, Status,
 };
 };
 use sqlx::sqlite::SqlitePoolOptions;
 use sqlx::sqlite::SqlitePoolOptions;
+use std::sync::Arc;
 
 
 #[allow(unused)]
 #[allow(unused)]
-pub async fn get_file_asset_manager_and_ledger<'a>(name: &str) -> Ledger<SQLite> {
+pub async fn get_file_asset_manager_and_ledger<'a>(name: &str) -> Arc<Ledger<SQLite>> {
     let pool = SqlitePoolOptions::new()
     let pool = SqlitePoolOptions::new()
         .max_connections(1)
         .max_connections(1)
         .idle_timeout(None)
         .idle_timeout(None)
@@ -19,7 +20,7 @@ pub async fn get_file_asset_manager_and_ledger<'a>(name: &str) -> Ledger<SQLite>
     Ledger::new(db.into())
     Ledger::new(db.into())
 }
 }
 
 
-pub async fn get_asset_manager_and_ledger() -> Ledger<SQLite> {
+pub async fn get_asset_manager_and_ledger() -> Arc<Ledger<SQLite>> {
     let pool = SqlitePoolOptions::new()
     let pool = SqlitePoolOptions::new()
         .max_connections(1)
         .max_connections(1)
         .idle_timeout(None)
         .idle_timeout(None)
@@ -51,7 +52,13 @@ where
     S: Storage + Send + Sync,
     S: Storage + Send + Sync,
 {
 {
     ledger
     ledger
-        .deposit(account_id, amount, "settled".into(), "Test".to_owned())
+        .deposit(
+            account_id,
+            amount,
+            "settled".into(),
+            vec![],
+            "Test".to_owned(),
+        )
         .await
         .await
         .expect("valid tx")
         .expect("valid tx")
         .revision_id
         .revision_id

+ 42 - 136
utxo/src/transaction/mod.rs

@@ -1,9 +1,6 @@
 use crate::{
 use crate::{
-    config::Config,
-    payment::PaymentTo,
-    status::InternalStatus,
-    storage::{Batch, ReceivedPaymentStatus, Storage},
-    AccountId, Amount, MaxLengthString, PaymentFrom, RevId, Status, TxId,
+    config::Config, payment::PaymentTo, storage::Storage, AccountId, Amount, FilterableValue,
+    MaxLengthString, PaymentFrom, RevId, Status, TxId,
 };
 };
 use chrono::{DateTime, TimeZone, Utc};
 use chrono::{DateTime, TimeZone, Utc};
 use serde::{Deserialize, Serialize};
 use serde::{Deserialize, Serialize};
@@ -123,6 +120,28 @@ impl Transaction {
         })
         })
     }
     }
 
 
+    /// Returns the filterable fields
+    pub fn get_filterable_fields(&self) -> Vec<FilterableValue> {
+        let mut filters = vec![
+            FilterableValue::Anything,
+            FilterableValue::Type(self.transaction.typ.clone()),
+            FilterableValue::TxId(self.id.clone()),
+            FilterableValue::Status(self.revision.status.clone()),
+            FilterableValue::Revision(self.revision_id.clone()),
+            FilterableValue::Type(self.typ),
+        ];
+
+        for account in self.accounts() {
+            filters.push(FilterableValue::Account(account));
+        }
+
+        for tag in &self.revision.tags {
+            filters.push(FilterableValue::Tag(tag.clone()));
+        }
+
+        filters
+    }
+
     /// Creates a new external deposit transaction
     /// Creates a new external deposit transaction
     ///
     ///
     /// All transactions must be balanced, same amounts that are spent should be
     /// All transactions must be balanced, same amounts that are spent should be
@@ -131,14 +150,16 @@ impl Transaction {
     pub fn new_external_deposit(
     pub fn new_external_deposit(
         reference: String,
         reference: String,
         status: Status,
         status: Status,
+        tags: Vec<Tag>,
         creates: Vec<(AccountId, Amount)>,
         creates: Vec<(AccountId, Amount)>,
     ) -> Result<Self, Error> {
     ) -> Result<Self, Error> {
         let creates = creates
         let creates = creates
             .into_iter()
             .into_iter()
             .map(|(to, amount)| PaymentTo { to, amount })
             .map(|(to, amount)| PaymentTo { to, amount })
             .collect();
             .collect();
-        let (transaction, revision) =
+        let (transaction, mut revision) =
             BaseTx::new(Vec::new(), creates, reference, Type::Deposit, status)?;
             BaseTx::new(Vec::new(), creates, reference, Type::Deposit, status)?;
+        revision.tags = tags;
         let revision_id = revision.rev_id()?;
         let revision_id = revision.rev_id()?;
 
 
         Ok(Self {
         Ok(Self {
@@ -172,15 +193,7 @@ impl Transaction {
     }
     }
 
 
     /// Updates the transaction tags
     /// Updates the transaction tags
-    pub async fn set_tags<S>(
-        self,
-        config: &Config<S>,
-        new_tags: Vec<Tag>,
-        reason: String,
-    ) -> Result<Self, Error>
-    where
-        S: Storage + Sync + Send,
-    {
+    pub fn set_tags(self, new_tags: Vec<Tag>, reason: String) -> Result<Self, Error> {
         let new_revision = Revision {
         let new_revision = Revision {
             transaction_id: self.revision.transaction_id,
             transaction_id: self.revision.transaction_id,
             changelog: reason,
             changelog: reason,
@@ -193,26 +206,24 @@ impl Transaction {
         let mut revisions = self.revisions;
         let mut revisions = self.revisions;
         revisions.push(revision_id.clone());
         revisions.push(revision_id.clone());
 
 
-        let mut new_transaction = Transaction {
+        Ok(Transaction {
             id: self.id,
             id: self.id,
             revisions,
             revisions,
             revision_id,
             revision_id,
             transaction: self.transaction,
             transaction: self.transaction,
             revision: new_revision,
             revision: new_revision,
-        };
-        new_transaction.persist(config).await?;
-        Ok(new_transaction)
+        })
     }
     }
 
 
     /// Prepares a new revision to change the transaction status
     /// Prepares a new revision to change the transaction status
     ///
     ///
     /// If the status transaction is not allowed, it will return an error.
     /// If the status transaction is not allowed, it will return an error.
     ///
     ///
-    /// The new transaction with revision is returned, which is already persisted. The previous
-    /// struct is consumed and the latest revision is preserved for historical purposes but it is no
-    /// longer the latest revision
+    /// The new transaction with revision is returned, which should be persisted. When it is
+    /// persisted, the previous struct is consumed and the latest revision is preserved for
+    /// historical purposes but it is no longer the latest revision
     #[inline]
     #[inline]
-    pub async fn change_status<S>(
+    pub fn change_status<S>(
         self,
         self,
         config: &Config<S>,
         config: &Config<S>,
         new_status: Status,
         new_status: Status,
@@ -233,20 +244,20 @@ impl Transaction {
             created_at: Utc::now(),
             created_at: Utc::now(),
         };
         };
         let revision_id = new_revision.rev_id()?;
         let revision_id = new_revision.rev_id()?;
+        let mut revisions = self.revisions;
+        revisions.push(revision_id.clone());
 
 
-        let mut new_transaction = Transaction {
+        Ok(Transaction {
             id: self.id,
             id: self.id,
-            revisions: self.revisions,
-            revision_id: revision_id.clone(),
+            revisions,
+            revision_id,
             transaction: self.transaction,
             transaction: self.transaction,
             revision: new_revision,
             revision: new_revision,
-        };
-        new_transaction.revisions.push(revision_id);
-        new_transaction.persist(config).await?;
-        Ok(new_transaction)
+        })
     }
     }
 
 
-    fn validate(&self) -> Result<(), Error> {
+    /// Validates the transaction and its revisions
+    pub fn validate(&self) -> Result<(), Error> {
         let rev_id = self.revision.rev_id()?;
         let rev_id = self.revision.rev_id()?;
         let tx_id = self.transaction.id()?;
         let tx_id = self.transaction.id()?;
         if self.revision_id != rev_id {
         if self.revision_id != rev_id {
@@ -261,109 +272,4 @@ impl Transaction {
         self.transaction.validate()?;
         self.transaction.validate()?;
         Ok(())
         Ok(())
     }
     }
-
-    #[inline]
-    async fn store_transaction<'a, S>(&mut self, batch: &mut S::Batch<'a>) -> Result<(), Error>
-    where
-        S: Storage + Sync + Send,
-    {
-        self.validate()?;
-        let spends = self.spends.iter().map(|x| x.id.clone()).collect::<Vec<_>>();
-        batch
-            .spend_payments(
-                &self.revision.transaction_id,
-                spends,
-                ReceivedPaymentStatus::Locked,
-            )
-            .await?;
-        batch
-            .create_payments(
-                &self.revision.transaction_id,
-                &self.creates,
-                ReceivedPaymentStatus::Locked,
-            )
-            .await?;
-
-        for account in self.accounts() {
-            batch
-                .relate_account_to_transaction(&self.revision.transaction_id, &account, self.typ)
-                .await?;
-        }
-        batch
-            .store_base_transaction(&self.revision.transaction_id, &self.transaction)
-            .await?;
-        Ok(())
-    }
-
-    /// Persists the changes done to this transaction object.
-    /// This method is not idempotent, and it will fail if the transaction if the requested update
-    /// is not allowed.
-    pub async fn persist<'a, S>(&mut self, config: &'a Config<S>) -> Result<(), Error>
-    where
-        S: Storage + Sync + Send,
-    {
-        self.validate()?;
-        let mut batch = config.storage.begin().await?;
-        if self.revision.previous.is_none() {
-            self.store_transaction::<S>(&mut batch).await?;
-        }
-
-        let (created_updated, spent_updated) =
-            match config.status.internal_type(&self.revision.status) {
-                InternalStatus::Reverted => {
-                    batch
-                        .update_transaction_payments(
-                            &self.id,
-                            ReceivedPaymentStatus::Failed,
-                            ReceivedPaymentStatus::Spendable,
-                        )
-                        .await?
-                }
-                InternalStatus::Spendable => {
-                    batch
-                        .update_transaction_payments(
-                            &self.id,
-                            ReceivedPaymentStatus::Spendable,
-                            ReceivedPaymentStatus::Spent,
-                        )
-                        .await?
-                }
-                _ => (self.creates.len(), self.spends.len()),
-            };
-
-        if self.creates.len() != created_updated || self.spends.len() != spent_updated {
-            return Err(Error::NoUpdate);
-        }
-
-        if config.status.is_spendable(&self.revision.status) {
-            batch
-                .update_transaction_payments(
-                    &self.id,
-                    ReceivedPaymentStatus::Spendable,
-                    ReceivedPaymentStatus::Spent,
-                )
-                .await?;
-        }
-
-        batch
-            .store_revision(&self.revision_id, &self.revision)
-            .await?;
-
-        batch
-            .tag_transaction(&self.id, &self.transaction, &self.revision.tags)
-            .await?;
-
-        batch
-            .update_transaction_revision(
-                &self.id,
-                &self.revision_id,
-                self.revision.previous.as_ref(),
-            )
-            .await
-            .expect("foo3");
-
-        batch.commit().await?;
-
-        Ok(())
-    }
 }
 }

+ 1 - 0
utxo/src/transaction/typ.rs

@@ -13,6 +13,7 @@ pub enum Error {
     PartialEq,
     PartialEq,
     PartialOrd,
     PartialOrd,
     Ord,
     Ord,
+    Hash,
     Eq,
     Eq,
     Serialize,
     Serialize,
     Deserialize,
     Deserialize,

+ 132 - 0
utxo/src/worker.rs

@@ -0,0 +1,132 @@
+//! Creates a worker thread and exposes a sender to communidate with
+use async_trait::async_trait;
+use chrono::Utc;
+use parking_lot::RwLock;
+use std::{
+    ops::Deref,
+    sync::{atomic::AtomicBool, Arc},
+    time::Duration,
+};
+use tokio::{
+    sync::mpsc::{channel, error::TrySendError, Sender},
+    time::sleep,
+};
+
+/// Time to awake the main thread to check if the parent struct is still in memory if it was dropped
+/// already. If it was dropped the main thread has to be stopped.
+const CHECK_WORKER_IN_SCOPE_MS: u64 = 50;
+/// The maximum size for buffering messages
+const WORKER_BUFFER_SIZE: usize = 1_000;
+/// The maximum time to be idle waiting for requests to be processed. After this is reached the main
+/// loop is stopped.
+const MAXIMUM_IDLE_TIME_SEC: i64 = 60;
+
+/// Worker trait
+///
+/// The worker trait has the definition of the code the worker has to perform in a different thread
+#[async_trait]
+pub trait Worker: Send + Sync {
+    type Payload: Send + Sync + Clone;
+
+    /// Method to be executed with a given task
+    async fn handler(&self, payload: Self::Payload);
+
+    /// Whether or not to process the request
+    fn process_request(&self) -> bool {
+        true
+    }
+}
+
+/// Worker manager
+///
+/// The worker manager manages the instances of the Worker trait, which is executed asynchronously
+/// in a separate thread from the send() context.
+///
+/// The logic of having one or more instances of the Worker trait is abstracted in this structure.
+#[derive(Debug)]
+pub struct WorkerManager<W: Worker> {
+    sender: RwLock<Option<Sender<W::Payload>>>,
+    is_running: Arc<AtomicBool>,
+    worker: Arc<W>,
+}
+
+impl<W: Worker> Drop for WorkerManager<W> {
+    fn drop(&mut self) {
+        self.is_running
+            .store(false, std::sync::atomic::Ordering::Release);
+    }
+}
+
+impl<W: Worker> Deref for WorkerManager<W> {
+    type Target = Arc<W>;
+
+    fn deref(&self) -> &Self::Target {
+        &self.worker
+    }
+}
+
+impl<W: Worker + 'static> WorkerManager<W> {
+    /// Creates a new WorkerManager given a struct that implements the Worker trait
+    pub fn new(worker: W) -> Self {
+        Self {
+            sender: RwLock::new(None),
+            is_running: Arc::new(true.into()),
+            worker: Arc::new(worker),
+        }
+    }
+
+    fn start_background_worker(&self) -> Sender<W::Payload> {
+        let (sender, mut receiver) = channel(WORKER_BUFFER_SIZE);
+        let worker_for_thread = self.worker.clone();
+        let worker_in_scope = self.is_running.clone();
+        tokio::spawn(async move {
+            let mut last_time = Utc::now();
+            loop {
+                tokio::select! {
+                    Some(message) = receiver.recv() => {
+                        worker_for_thread.handler(message).await;
+                        last_time = Utc::now();
+                    }
+                    _ = sleep(Duration::from_millis(CHECK_WORKER_IN_SCOPE_MS))  => {}
+                }
+
+                if !worker_in_scope.load(std::sync::atomic::Ordering::Acquire)
+                    || (last_time - Utc::now()).num_seconds() > MAXIMUM_IDLE_TIME_SEC
+                {
+                    break;
+                }
+            }
+        });
+        sender
+    }
+
+    /// Sends a message to be processed in another thread
+    pub fn process(&self, message: W::Payload) {
+        if self.worker.process_request() {
+            let sender = self.sender.read();
+            match sender
+                .as_ref()
+                .map(|sender| sender.try_send(message.clone()))
+            {
+                None | Some(Err(TrySendError::Closed(_))) => {
+                    drop(sender);
+
+                    let mut sender = self.sender.write();
+
+                    if let Some(sender) = sender.as_ref() {
+                        // Check if another faster thread did not set sender to Some already
+                        let _ = sender.try_send(message.clone());
+                    } else {
+                        // Either there is no running worker thread and it is closed already, and
+                        // this thread is fastest and therefore will start the background worker and
+                        // will set the sender to Some
+                        let new_worker = self.start_background_worker();
+                        let _ = new_worker.try_send(message);
+                        *sender = Some(new_worker);
+                    }
+                }
+                _ => {}
+            }
+        }
+    }
+}