Cesar Rodas 1 tahun lalu
induk
melakukan
24fe3d5ec3
1 mengubah file dengan 98 tambahan dan 36 penghapusan
  1. 98 36
      utxo/src/main.rs

+ 98 - 36
utxo/src/main.rs

@@ -1,7 +1,8 @@
-use mlua::{Compiler, FromLua, Function, IntoLuaMulti, Lua, Value};
+use mlua::{Compiler, FromLua, Function, IntoLuaMulti, Lua, Table, UserData, Value};
 use sha2::{Digest, Sha256};
 use std::{
     collections::HashMap,
+    hash::Hash,
     sync::{
         atomic::{AtomicU16, Ordering},
         Arc,
@@ -30,6 +31,35 @@ where
     receiver: Arc<Mutex<Receiver<I, R>>>,
 }
 
+pub enum DbState {
+    Balances,
+    Accounts,
+    Transactions,
+    Other(String),
+}
+
+impl From<String> for DbState {
+    fn from(s: String) -> Self {
+        match s.as_str() {
+            "balances" => Self::Balances,
+            "accounts" => Self::Accounts,
+            "transactions" => Self::Transactions,
+            _ => Self::Other(s),
+        }
+    }
+}
+
+pub struct MetaState(Vec<DbState>);
+
+impl UserData for MetaState {
+    fn add_methods<'lua, M: mlua::UserDataMethods<'lua, Self>>(methods: &mut M) {
+        methods.add_method_mut("__index", |_, _this, (_, key): (Table<'lua>, String)| {
+            _this.0.push(key.into());
+            Ok(1)
+        });
+    }
+}
+
 impl<I, R> Program<I, R>
 where
     for<'lua> I: IntoLuaMulti<'lua> + Sync + Send + 'lua,
@@ -46,6 +76,24 @@ where
         }
     }
 
+    /// Returns a new Lua VM and a list of all the global variables to be
+    /// persisted and read from the storage engine. Since lua is a dynamic
+    /// language, other state variables may be read/updated dynamically, which
+    /// is fine, this list is just for the initial state and any potential
+    /// optimization.
+    fn get_lua_vm(bytecode: &[u8]) -> (Vec<DbState>, Lua) {
+        let lua = Lua::new();
+        lua.sandbox(true).unwrap();
+        let globals = lua.globals();
+        globals.set("require", Value::Nil).unwrap();
+
+        lua.load(bytecode).exec().unwrap();
+
+        drop(globals);
+
+        (vec![], lua)
+    }
+
     fn spawn(
         bytecode: Vec<u8>,
         instances: Arc<AtomicU16>,
@@ -56,16 +104,11 @@ where
             return;
         }
 
-        let pid = instances.fetch_add(1, Ordering::Relaxed).to_string();
+        instances.fetch_add(1, Ordering::Relaxed);
         let max_timeout = Duration::from_secs(30);
-        //let running = &self.running;
 
         tokio::task::spawn_blocking(move || {
-            let lua = Lua::new();
-            lua.globals().raw_remove("require").unwrap();
-            let _ = lua.globals().set("pid", pid);
-            lua.load(&bytecode).exec().unwrap();
-
+            let (_, lua) = Self::get_lua_vm(&bytecode);
             let f: Function = lua.globals().get("add").unwrap();
 
             loop {
@@ -109,17 +152,23 @@ where
 }
 
 #[derive(Debug)]
-pub struct VirtualMachines {
-    vms: RwLock<HashMap<ProgramId, Program<(i64, i64), i64>>>,
+pub struct VirtualMachines<K: Hash + Eq> {
+    vms: RwLock<HashMap<K, Program<(i64, i64), i64>>>,
 }
 
-impl VirtualMachines {
-    pub async fn register_program(&self, program: &str) -> ProgramId {
-        self.register_opcodes(Compiler::new().compile(program))
+impl<K: Hash + Eq> VirtualMachines<K> {
+    pub fn new() -> Arc<Self> {
+        Arc::new(Self {
+            vms: RwLock::new(HashMap::new()),
+        })
+    }
+
+    pub async fn register_program(&self, name: K, program: &str) -> (ProgramId, bool) {
+        self.register_opcodes(name, Compiler::new().compile(program))
             .await
     }
 
-    pub async fn exec(&self, id: &ProgramId) -> Option<i64> {
+    pub async fn exec(&self, id: &K) -> Option<i64> {
         if let Some(vm) = self.vms.read().await.get(id) {
             Some(vm.exec((22, 33)).await)
         } else {
@@ -127,32 +176,27 @@ impl VirtualMachines {
         }
     }
 
-    pub async fn register_opcodes(&self, opcodes: Vec<u8>) -> ProgramId {
+    pub async fn register_opcodes(&self, name: K, opcodes: Vec<u8>) -> (ProgramId, bool) {
         let mut vms = self.vms.write().await;
 
         let mut hasher = Sha256::new();
         hasher.update(&opcodes);
-        let id: ProgramId = hasher.finalize().into();
-
-        if !vms.contains_key(&id) {
-            vms.insert(id.clone(), Program::new(opcodes));
-        }
-        id
+        (
+            hasher.finalize().into(),
+            vms.insert(name, Program::new(opcodes)).is_some(),
+        )
     }
-}
 
-impl Default for VirtualMachines {
-    fn default() -> Self {
-        Self {
-            vms: RwLock::new(HashMap::new()),
-        }
+    pub async fn shutdown(&self) {
+        let mut vms = self.vms.write().await;
+        vms.clear();
     }
 }
 
 #[tokio::main]
 async fn main() {
     use std::{sync::Arc, time::Instant};
-    async fn do_loop(vms: Arc<VirtualMachines>, id: ProgramId) {
+    async fn do_loop(vms: Arc<VirtualMachines<String>>) {
         // Create N threads to execute the Lua code in parallel
         let num_threads = 400;
         let (tx, mut rx) = mpsc::channel(num_threads);
@@ -162,7 +206,7 @@ async fn main() {
 
             tokio::spawn(async move {
                 let start_time = Instant::now();
-                let result = vm.exec(&id).await;
+                let result = vm.exec(&"foo".to_owned()).await;
 
                 // Send the result back to the main thread
                 let _ = tx_clone.send(result).await;
@@ -189,22 +233,40 @@ async fn main() {
         }
     }
 
-    let vms = Arc::new(VirtualMachines::default());
+    let vms = VirtualMachines::new();
 
     // Compile Lua code
     let code = r#"
         calls = 0
+        pid = 0
         function add(a, b)
             calls = calls + 1
-            print("Call " .. pid .. " "  .. calls)
+            print("Call from old " .. pid .. " "  .. calls)
             return a + b
         end
         print("hello world " .. pid)
     "#;
 
-    let id = vms.register_program(code).await;
-    do_loop(vms.clone(), id).await;
-    tokio::time::sleep(Duration::from_secs(30)).await;
-    do_loop(vms.clone(), id).await;
-    tokio::time::sleep(Duration::from_secs(41)).await;
+    let _ = vms.register_program("foo".to_owned(), code).await;
+    do_loop(vms.clone()).await;
+
+    let code = r#"
+        calls = 0
+        pid = 1
+        function add(a, b)
+            calls = calls + 1
+            print("Call from new " .. pid .. " "  .. calls)
+            return a + b
+        end
+        print("hello world " .. pid)
+    "#;
+    let y = vms.register_program("foo".to_owned(), code).await;
+    println!("{} {:?}", "foo", y);
+    tokio::time::sleep(Duration::from_secs(3)).await;
+
+    do_loop(vms.clone()).await;
+
+    vms.shutdown().await;
+
+    tokio::time::sleep(Duration::from_secs(1)).await;
 }