Cesar Rodas 1 سال پیش
والد
کامیت
3e7dde91cc
4فایلهای تغییر یافته به همراه638 افزوده شده و 0 حذف شده
  1. 291 0
      utxo/src/lua.rs
  2. 115 0
      utxo/src/vm/program.rs
  3. 101 0
      utxo/src/vm/state/memory.rs
  4. 131 0
      utxo/src/vm/state/mod.rs

+ 291 - 0
utxo/src/lua.rs

@@ -0,0 +1,291 @@
+//! Implements the Lua API for the UTXO contract.
+use crate::vm::{Error, Instance, Storage, Value, Variable};
+use futures::executor::block_on;
+use mlua::{Compiler, Lua, Table, Value as luaValue, Variadic};
+use std::{collections::HashMap, sync::Arc};
+
+impl TryFrom<&luaValue<'_>> for Value {
+    type Error = String;
+
+    fn try_from(value: &luaValue<'_>) -> Result<Self, Self::Error> {
+        match value {
+            luaValue::Nil => Ok(Value::Nil),
+            luaValue::Boolean(b) => Ok(Value::Boolean(*b)),
+            luaValue::Integer(i) => Ok(Value::Integer((*i).into())),
+            luaValue::Number(n) => Ok(Value::Number(*n)),
+            luaValue::String(s) => Ok(Value::String(s.to_str().unwrap().to_owned())),
+            luaValue::Table(t) => {
+                let mut map = HashMap::new();
+                let mut iter = t.clone().pairs::<String, luaValue>().enumerate();
+                let mut is_vector = true;
+                while let Some((id, Ok((k, v)))) = iter.next() {
+                    if id.checked_add(1) != k.parse().ok() {
+                        is_vector = false;
+                    }
+                    map.insert(k, v.as_ref().try_into()?);
+                }
+
+                Ok(if is_vector {
+                    let mut values = map
+                        .into_iter()
+                        .map(|(k, v)| k.parse().map(|k| (k, v)))
+                        .collect::<Result<Vec<(usize, Value)>, _>>()
+                        .unwrap();
+
+                    values.sort_by(|(a, _), (b, _)| a.cmp(b));
+
+                    Value::Vector(values.into_iter().map(|(_, v)| v).collect())
+                } else {
+                    Value::HashMap(map)
+                })
+            }
+            x => Err(format!("Invalid type: {:?}", x)),
+        }
+    }
+}
+
+/// Implements the Lua Virtual machine support for the ledger database
+pub struct LuaVM<X: Storage + 'static> {
+    state: Arc<X>,
+    opcodes: Vec<u8>,
+}
+
+impl<X: Storage + 'static> LuaVM<X> {
+    #[inline]
+    fn var_value_to_lua_val(lua: &Lua, value: Value) -> mlua::Result<luaValue> {
+        match value {
+            Value::Nil => Ok(luaValue::Nil),
+            Value::Boolean(b) => Ok(luaValue::Boolean(b)),
+            Value::Integer(i) => Ok(luaValue::Integer(i.try_into().unwrap())),
+            Value::Number(n) => Ok(luaValue::Number(n)),
+            Value::String(s) => Ok(luaValue::String(lua.create_string(&s)?)),
+            Value::HashMap(map) => {
+                let table = lua.create_table()?;
+                for (k, v) in map {
+                    table.set(k, Self::var_value_to_lua_val(lua, v)?)?;
+                }
+                Ok(luaValue::Table(table))
+            }
+            Value::ErrorType(e) => Err(mlua::Error::RuntimeError(e.to_string())),
+            _ => Err(mlua::Error::RuntimeError("Invalid type".into())),
+        }
+    }
+
+    #[inline]
+    fn crate_meta_variables(lua: &Lua, meta_variable: Variable<'static>) -> mlua::Result<()> {
+        let getter = lua.create_function(move |_, (_, key): (Table, String)| {
+            println!("read only variable: {:?}.{}", meta_variable, key);
+            Ok(1)
+        })?;
+
+        let setter =
+            lua.create_function(move |_, (_, key, value): (Table, String, luaValue)| {
+                Result::<String, _>::Err(mlua::Error::RuntimeError(format!(
+                    "read only variable: {:?}.{} = {:?}",
+                    meta_variable, key, value
+                )))
+            })?;
+
+        let table = lua.create_table()?;
+        let meta_table = lua.create_table()?;
+        meta_table.set("__index", getter)?;
+        meta_table.set("__newindex", setter)?;
+        table.set_metatable(Some(meta_table));
+        lua.globals().raw_set(meta_variable.name(), table)?;
+        Ok(())
+    }
+
+    #[inline]
+    fn inject_dynamic_global_state(
+        lua: &Lua,
+        storage: Arc<X>,
+        program_name: String,
+        instance: usize,
+    ) -> mlua::Result<Option<Table>> {
+        lua.set_app_data((program_name, storage));
+
+        let getter = lua.create_function(move |lua, (global, key): (Table, String)| {
+            match global.raw_get::<_, luaValue>(key.clone())? {
+                luaValue::Nil => (),
+                local_value => return Ok(local_value),
+            };
+            let (program_name, storage) = lua
+                .app_data_ref::<(String, Arc<X>)>()
+                .ok_or(mlua::Error::MismatchedRegistryKey)?
+                .clone();
+
+            let key = Variable::new(&program_name, &key);
+            // LuaVM does not like being de-scheduled to run async tasks, any async task must
+            // be executed on the same thread
+            let value = block_on(async move { storage.get(instance, key).await });
+            Self::var_value_to_lua_val(lua, value)
+        })?;
+        let setter = lua.create_function(
+            move |lua, (global, key, value): (Table, String, luaValue)| {
+                let (program_name, storage) = lua
+                    .app_data_ref::<(String, Arc<X>)>()
+                    .ok_or(mlua::Error::MismatchedRegistryKey)?
+                    .clone();
+                let value: Value = if let Ok(value) = value.as_ref().try_into() {
+                    value
+                } else {
+                    return global.raw_set(key, value);
+                };
+                let key = Variable::new(&program_name, &key);
+                // LuaVM does not like being de-scheduled to run async tasks, any async task must
+                // be executed on the same thread
+                block_on(async move {
+                    storage.set(instance, key, value).await;
+                    Ok(())
+                })
+            },
+        )?;
+
+        let metatable = lua.create_table()?;
+        metatable.raw_set("__index", getter)?;
+        metatable.raw_set("__newindex", setter)?;
+
+        Ok(Some(metatable))
+    }
+}
+
+#[async_trait::async_trait]
+impl<X: Storage + 'static> Instance<X> for LuaVM<X> {
+    async fn new(state: Arc<X>, code: &str) -> Result<Self, Error>
+    where
+        Self: Sized,
+    {
+        Ok(Self {
+            state,
+            opcodes: Compiler::new().compile(code),
+        })
+    }
+
+    async fn run(
+        &self,
+        execution_id: usize,
+        program_name: &str,
+        args: Vec<Value>,
+    ) -> Result<Value, Error> {
+        let lua = Lua::new();
+        let globals = lua.globals();
+
+        let require = lua
+            .create_function(|_, (_,): (String,)| -> mlua::Result<()> {
+                Err(mlua::Error::RuntimeError("require is not allowed".into()))
+            })
+            .map_err(|e| Error::Runtime(e.to_string()))?;
+
+        globals.set_metatable(
+            Self::inject_dynamic_global_state(
+                &lua,
+                self.state.clone(),
+                program_name.to_owned(),
+                execution_id,
+            )
+            .map_err(|e| Error::Runtime(e.to_string()))?,
+        );
+
+        for val in [
+            Variable::Accounts,
+            Variable::Balances,
+            Variable::Transactions,
+            Variable::Payments,
+        ] {
+            Self::crate_meta_variables(&lua, val).map_err(|e| Error::Runtime(e.to_string()))?;
+        }
+
+        // remove external require
+        globals
+            .set("require", require)
+            .map_err(|e| Error::Runtime(e.to_string()))?;
+        drop(globals);
+
+        let args = Variadic::from_iter(
+            args.into_iter()
+                .map(|x| Self::var_value_to_lua_val(&lua, x))
+                .collect::<Result<Vec<_>, _>>()
+                .map_err(|e| Error::Runtime(e.to_string()))?,
+        );
+
+        let result: luaValue = lua
+            .load(&self.opcodes)
+            .call(args)
+            .map_err(|e| Error::Runtime(e.to_string()))?;
+
+        result.as_ref().try_into().map_err(Error::Runtime)
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use super::*;
+    use std::sync::Arc;
+
+    #[tokio::test]
+    async fn test() {
+        let storage = Arc::new(crate::vm::memory::Storage::default());
+        let vm = LuaVM::new(
+            storage.clone(),
+            r#"
+            subscriptions = {}
+            local arg = {...}
+            function update_state()
+                set = "foo " .. arg[1]
+                return arg[1]
+            end
+
+            return update_state()
+            "#,
+        )
+        .await
+        .unwrap();
+
+        let v = vm.run(0, "test", vec![Value::Integer(2)]).await.unwrap();
+        assert_eq!(v.as_integer(), Some(2));
+
+        let vm = LuaVM::new(
+            storage.clone(),
+            r#"
+            return set
+            "#,
+        )
+        .await
+        .unwrap();
+
+        let v = vm.run(0, "test", vec![]).await.unwrap();
+        assert_eq!(v.as_str(), Some("foo 2"));
+
+        let vm = LuaVM::new(
+            storage.clone(),
+            r#"
+            return balances["foo"]
+            "#,
+        )
+        .await
+        .unwrap();
+        let v = vm.run(0, "test", vec![]).await.unwrap();
+        assert_eq!(v.as_integer(), Some(1));
+    }
+
+    #[tokio::test]
+    async fn require_not_allowed() {
+        let storage = Arc::new(crate::vm::memory::Storage::default());
+        let vm = LuaVM::new(
+            storage.clone(),
+            r#"
+            require("foo")
+            "#,
+        )
+        .await
+        .unwrap();
+
+        let r = vm.run(0, "test", vec![]).await;
+        assert!(r.is_err());
+        assert!(r
+            .unwrap_err()
+            .to_string()
+            .find("require is not allowed")
+            .is_some());
+    }
+}

+ 115 - 0
utxo/src/vm/program.rs

@@ -0,0 +1,115 @@
+//! Abstract definition of a virtual machine and their programs to be executed
+//! inside the ledger for each transaction
+use super::{Error, Storage, Value};
+use std::sync::Arc;
+use tokio::sync::{Mutex, RwLock};
+
+/// Trait to implement a program and how it will be executed
+#[async_trait::async_trait]
+pub trait Instance<S: Storage + 'static>: Send + Sync {
+    /// Creates a new instance of the program.
+    async fn new(state: Arc<S>, code: &str) -> Result<Self, Error>
+    where
+        Self: Sized;
+
+    /// Returns the code of the program with a given set of arguments. The
+    /// return is the execution ID (which is unique per execution) and the
+    /// result of the execution.
+    async fn run(
+        &self,
+        execution_id: usize,
+        program_name: &str,
+        input: Vec<Value>,
+    ) -> Result<Value, Error>;
+}
+
+pub struct Program<S: Storage + 'static, I: Instance<S>> {
+    state: Arc<S>,
+    code: String,
+    name: String,
+    instances: RwLock<Vec<I>>,
+    locks: Mutex<Vec<bool>>,
+}
+
+impl<S: Storage + 'static, I: Instance<S>> Program<S, I> {
+    pub fn new(state: Arc<S>, name: &str, code: &str) -> Result<Self, Error> {
+        Ok(Self {
+            state,
+            code: code.to_owned(),
+            name: name.to_owned(),
+            instances: RwLock::new(vec![]),
+            locks: Mutex::new(vec![]),
+        })
+    }
+
+    async fn get_available_instance(&self) -> Result<usize, Error> {
+        let mut slot_id = None;
+        let mut locks = self.locks.lock().await;
+        let total_instances = locks.len();
+        for (id, busy) in locks.iter_mut().enumerate() {
+            if !*busy {
+                slot_id = Some(id);
+                *busy = true;
+                break;
+            }
+        }
+        drop(locks);
+
+        Ok(if let Some(slot_id) = slot_id {
+            // we found a free slot, it is already locked, return the id
+            slot_id
+        } else {
+            // no free slot
+            if total_instances < 20 {
+                // there are few instances, create a new one
+                let mut instances = self.instances.write().await;
+                instances.push(I::new(self.state.clone(), &self.code).await?);
+                self.locks.lock().await.push(true);
+                total_instances
+            } else {
+                // wait until a slot is available
+                #[allow(unused_assignments)]
+                let mut slot_id = 0;
+                loop {
+                    let mut slot_id_internal = None;
+                    let mut locks = self.locks.lock().await;
+                    for (id, busy) in locks.iter_mut().enumerate() {
+                        if !*busy {
+                            slot_id_internal = Some(id);
+                            *busy = true;
+                            break;
+                        }
+                    }
+                    drop(locks);
+                    if let Some(slot_id_internal) = slot_id_internal {
+                        slot_id = slot_id_internal;
+                        break;
+                    }
+                    tokio::time::sleep(tokio::time::Duration::from_micros(1)).await;
+                }
+                slot_id
+            }
+        })
+    }
+
+    pub async fn run(&self, execution_id: usize, input: Vec<Value>) -> Result<Value, Error> {
+        // find a free instance and lock it for execution
+        let instance_id = self.get_available_instance().await?;
+
+        // execute program
+        let result = self
+            .instances
+            .read()
+            .await
+            .get(instance_id)
+            .ok_or(Error::InstanceNotFound(self.name.clone(), instance_id))?
+            .run(execution_id, &self.name, input)
+            .await;
+
+        // remove lock from instance
+        let mut locks = self.locks.lock().await;
+        locks[instance_id] = false;
+
+        result
+    }
+}

+ 101 - 0
utxo/src/vm/state/memory.rs

@@ -0,0 +1,101 @@
+use super::{Value, Variable};
+use std::collections::HashMap;
+use tokio::{
+    sync::{Mutex, RwLock},
+    time::Duration,
+};
+
+#[derive(Debug)]
+pub struct Storage {
+    storage: RwLock<HashMap<String, Value>>,
+    locks: RwLock<HashMap<String, usize>>,
+    var_locked_by_instance: Mutex<HashMap<usize, Vec<String>>>,
+}
+
+impl Default for Storage {
+    fn default() -> Self {
+        Self {
+            storage: RwLock::new(HashMap::new()),
+            locks: RwLock::new(HashMap::new()),
+            var_locked_by_instance: Mutex::new(HashMap::new()),
+        }
+    }
+}
+
+#[async_trait::async_trait]
+impl super::Storage for Storage {
+    async fn lock<'a>(&'a self, instance: usize, var: &'a Variable<'a>) {
+        let locks = self.locks.read().await;
+        let name = var.name();
+
+        if locks.get(&name).map(|v| *v) == Some(instance) {
+            // The variable is already locked by this instance
+            return;
+        }
+
+        drop(locks);
+
+        loop {
+            // wait here while the locked is not null or it is locked by another
+            // instance
+            let locks = self.locks.read().await;
+            let var_lock = locks.get(&name).map(|v| *v);
+            if var_lock.is_none() || var_lock == Some(instance) {
+                break;
+            }
+            drop(locks);
+            tokio::time::sleep(Duration::from_micros(10)).await;
+        }
+
+        loop {
+            let mut locks = self.locks.write().await;
+            let var_lock = locks.get(&name).map(|v| *v);
+
+            if !var_lock.is_none() {
+                if var_lock == Some(instance) {
+                    break;
+                }
+                drop(locks);
+                tokio::time::sleep(Duration::from_micros(10)).await;
+                continue;
+            }
+
+            locks.insert(name.to_owned(), instance);
+
+            let mut vars_by_instance = self.var_locked_by_instance.lock().await;
+            if let Some(vars) = vars_by_instance.get_mut(&instance) {
+                vars.push(name.to_owned());
+            } else {
+                vars_by_instance.insert(instance, vec![name.to_owned()]);
+            }
+        }
+    }
+
+    async fn get<'a>(&'a self, instance: usize, var: Variable<'a>) -> Value {
+        self.lock(instance, &var).await;
+        self.storage
+            .read()
+            .await
+            .get(&var.name())
+            .cloned()
+            .unwrap_or(Value::Nil)
+    }
+
+    async fn set<'a>(&'a self, instance: usize, var: Variable<'a>, value: Value) {
+        self.lock(instance, &var).await;
+        self.storage.write().await.insert(var.name(), value);
+    }
+
+    async fn shutdown(&self, execution_id: usize) {
+        let mut vars_by_instance = self.var_locked_by_instance.lock().await;
+        let mut locks = self.locks.write().await;
+
+        if let Some(vars) = vars_by_instance.remove(&execution_id) {
+            for var in vars {
+                if locks.get(&var).map(|v| *v) == Some(execution_id) {
+                    locks.remove(&var);
+                }
+            }
+        }
+    }
+}

+ 131 - 0
utxo/src/vm/state/mod.rs

@@ -0,0 +1,131 @@
+use std::collections::HashMap;
+
+#[cfg(test)]
+pub mod memory;
+
+/// The value of the data to store
+#[derive(Debug, Clone)]
+pub enum Value {
+    /// The Lua value `nil`.
+    Nil,
+    /// The Lua value `true` or `false`.
+    Boolean(bool),
+    /// Integer number
+    Integer(i128),
+    /// A floating point number.
+    Number(f64),
+    /// String
+    String(String),
+    /// A vector
+    Vector(Vec<Value>),
+    /// Dynamic value tied to a prefix, each values are retrieved one by one and
+    /// the locking is done on each element value
+    DynamicHashMap(String),
+    /// A
+    HashMap(HashMap<String, Value>),
+    /// An error
+    ErrorType(String),
+}
+
+impl Value {
+    /// Gets value as a &str if possible
+    pub fn as_str(&self) -> Option<&str> {
+        match self {
+            Self::String(s) => Some(s),
+            _ => None,
+        }
+    }
+
+    /// Gets value as a i128 if possible
+    pub fn as_integer(&self) -> Option<i128> {
+        match self {
+            Self::Integer(i) => Some(*i),
+            _ => None,
+        }
+    }
+
+    /// Is the current value a hashmap. Hashmap are stored in a flat structure
+    /// in the storage layer, each value is locked individually instead of the
+    /// whole hashmap
+    pub fn is_hashmap(&self) -> bool {
+        match self {
+            Self::HashMap(_) | Self::DynamicHashMap(_) => true,
+            Self::Vector(x) => x.is_empty(),
+            _ => false,
+        }
+    }
+}
+
+/// Variable name
+#[derive(Debug, Clone, Eq, PartialEq, Copy)]
+pub enum Variable<'a> {
+    /// Balance: This variable is shared between all programs
+    Balances,
+    /// Accounts: This variable is shared between all programs
+    Accounts,
+    /// Transactions: This variable is shared between all programs
+    Transactions,
+    /// Payments: This variable is shared between all programs
+    Payments,
+    /// Dynamic
+    Dynamic(&'a Variable<'a>, &'a str),
+    /// Any local variable, inside the namespace of the current program
+    Scalar(&'a str, &'a str),
+}
+
+impl<'a> Variable<'a> {
+    /// Creates a new variable
+    ///
+    /// The variable is bound to a namespace (the program) name unless it is an
+    /// special global / share variable such as balances, accounts, transactions
+    /// or payments.
+    pub fn new(namespace: &'a str, name: &'a str) -> Self {
+        match namespace {
+            "balances" => Self::Balances,
+            "accounts" => Self::Accounts,
+            "transactions" => Self::Transactions,
+            "payments" => Self::Payments,
+            _ => Self::Scalar(namespace, name),
+        }
+    }
+
+    /// Whether the current variable is a meta variable (balances, accounts, transactions or
+    /// payments)
+    pub fn is_meta_variable(&self) -> bool {
+        matches!(
+            self,
+            Self::Balances | Self::Accounts | Self::Transactions | Self::Payments
+        )
+    }
+
+    /// Returns the name of the variable as an String
+    pub fn name(&self) -> String {
+        match self {
+            Self::Balances => "balances".to_owned(),
+            Self::Accounts => "accounts".to_owned(),
+            Self::Transactions => "transactions".to_owned(),
+            Self::Payments => "payments".to_owned(),
+            Self::Dynamic(parent, name) => format!("{}.{}", parent.name(), name),
+            Self::Scalar(ns, name) => format!("{}.{}", ns, name),
+        }
+    }
+}
+
+#[async_trait::async_trait]
+/// The storage trait to implement a storage engine
+pub trait Storage: Send + Sync {
+    /// Locks a variable for a given execution_id, it will be automatically
+    /// released when shutdown(execution_id is being called)
+    async fn lock<'a>(&'a self, execution_id: usize, var: &'a Variable<'a>);
+
+    /// Gets a single value from the storage. The returned value should be locked by the
+    /// execution_id
+    async fn get<'a>(&'a self, execution_id: usize, variable: Variable<'a>) -> Value;
+
+    /// Sets a value in the storage. The variable should be locked by the
+    /// execution_id before updating
+    async fn set<'a>(&'a self, execution_id: usize, var: Variable<'a>, value: Value);
+
+    /// release resources)
+    async fn shutdown(&self, execution_id: usize);
+}