|
@@ -1,4 +1,4 @@
|
|
|
-use mlua::{Compiler, FromLua, Function, IntoLuaMulti, Lua, Table, UserData, Value};
|
|
|
+use mlua::{Compiler, FromLua, Function, IntoLua, IntoLuaMulti, Lua, Table, UserData, Value};
|
|
|
use sha2::{Digest, Sha256};
|
|
|
use std::{
|
|
|
collections::HashMap,
|
|
@@ -15,57 +15,84 @@ use tokio::{
|
|
|
|
|
|
pub type ProgramId = [u8; 32];
|
|
|
|
|
|
-type Sender<I, R> = mpsc::Sender<(I, oneshot::Sender<R>)>;
|
|
|
-type Receiver<I, R> = mpsc::Receiver<(I, oneshot::Sender<R>)>;
|
|
|
+#[async_trait::async_trait]
|
|
|
+pub trait VarStorage: Send + Sync {
|
|
|
+ async fn get(&self, var: &Variable) -> VarValue;
|
|
|
+ async fn set(&mut self, var: &Variable, value: VarValue);
|
|
|
+}
|
|
|
+
|
|
|
+type Sender<X, I, R> = mpsc::Sender<(Arc<X>, I, oneshot::Sender<R>)>;
|
|
|
+type Receiver<X, I, R> = mpsc::Receiver<(Arc<X>, I, oneshot::Sender<R>)>;
|
|
|
|
|
|
#[derive(Debug)]
|
|
|
-pub struct Program<I, R>
|
|
|
+pub struct Program<X, I, R>
|
|
|
where
|
|
|
+ X: VarStorage,
|
|
|
for<'lua> I: IntoLuaMulti<'lua> + Sync + Send + 'lua,
|
|
|
for<'lua> R: FromLua<'lua> + Clone + Sync + Send + 'lua,
|
|
|
{
|
|
|
opcodes: Vec<u8>,
|
|
|
instances: Arc<AtomicU16>,
|
|
|
running: Arc<AtomicU16>,
|
|
|
- sender: Sender<I, R>,
|
|
|
- receiver: Arc<Mutex<Receiver<I, R>>>,
|
|
|
+ sender: Sender<X, I, R>,
|
|
|
+ receiver: Arc<Mutex<Receiver<X, I, R>>>,
|
|
|
}
|
|
|
|
|
|
-pub enum DbState {
|
|
|
+pub enum Variable {
|
|
|
Balances,
|
|
|
Accounts,
|
|
|
Transactions,
|
|
|
+ Payments,
|
|
|
Other(String),
|
|
|
}
|
|
|
|
|
|
-impl From<String> for DbState {
|
|
|
+pub enum VarValue {
|
|
|
+ Scalar(String),
|
|
|
+ Nil,
|
|
|
+}
|
|
|
+
|
|
|
+impl From<String> for Variable {
|
|
|
fn from(s: String) -> Self {
|
|
|
match s.as_str() {
|
|
|
"balances" => Self::Balances,
|
|
|
"accounts" => Self::Accounts,
|
|
|
"transactions" => Self::Transactions,
|
|
|
+ "payments" => Self::Payments,
|
|
|
_ => Self::Other(s),
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-pub struct MetaState(Vec<DbState>);
|
|
|
+impl Variable {
|
|
|
+ pub fn name<'a>(&'a self) -> &'a str {
|
|
|
+ match self {
|
|
|
+ Self::Balances => "balances",
|
|
|
+ Self::Accounts => "accounts",
|
|
|
+ Self::Transactions => "transactions",
|
|
|
+ Self::Payments => "payments",
|
|
|
+ Self::Other(s) => s,
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+pub struct MetaState(Vec<Variable>);
|
|
|
|
|
|
impl UserData for MetaState {
|
|
|
fn add_methods<'lua, M: mlua::UserDataMethods<'lua, Self>>(methods: &mut M) {
|
|
|
methods.add_method_mut("__index", |_, _this, (_, key): (Table<'lua>, String)| {
|
|
|
- _this.0.push(key.into());
|
|
|
- Ok(1)
|
|
|
+ panic!("cesar {}", key);
|
|
|
+ Ok(format!("dyn value {}", key))
|
|
|
});
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-impl<I, R> Program<I, R>
|
|
|
+impl<X, I, R> Program<X, I, R>
|
|
|
where
|
|
|
+ for<'lua> X: VarStorage + 'lua,
|
|
|
for<'lua> I: IntoLuaMulti<'lua> + Sync + Send + 'lua,
|
|
|
for<'lua> R: FromLua<'lua> + Clone + Sync + Send + 'lua,
|
|
|
{
|
|
|
- pub fn new(opcodes: Vec<u8>) -> Program<I, R> {
|
|
|
+ pub fn new(opcodes: Vec<u8>) -> Program<X, I, R> {
|
|
|
let (sender, receiver) = mpsc::channel(100);
|
|
|
Self {
|
|
|
instances: Arc::new(AtomicU16::new(0)),
|
|
@@ -81,13 +108,40 @@ where
|
|
|
/// language, other state variables may be read/updated dynamically, which
|
|
|
/// is fine, this list is just for the initial state and any potential
|
|
|
/// optimization.
|
|
|
- fn get_lua_vm(bytecode: &[u8]) -> (Vec<DbState>, Lua) {
|
|
|
+ fn get_lua_vm(bytecode: &[u8]) -> (Vec<Variable>, Lua) {
|
|
|
let lua = Lua::new();
|
|
|
lua.sandbox(true).unwrap();
|
|
|
let globals = lua.globals();
|
|
|
- globals.set("require", Value::Nil).unwrap();
|
|
|
|
|
|
+ let require = lua
|
|
|
+ .create_function(|_, (name,): (String,)| -> mlua::Result<()> {
|
|
|
+ Err(mlua::Error::RuntimeError("require is not allowed".into()))
|
|
|
+ })
|
|
|
+ .unwrap();
|
|
|
+
|
|
|
+ let set = lua
|
|
|
+ .create_async_function(|_, (_, key): (Table, String)| async move {
|
|
|
+ let x = Ok(format!("foo bar -> {}", key));
|
|
|
+ x
|
|
|
+ })
|
|
|
+ .unwrap();
|
|
|
+ let metatable = lua.create_table().unwrap();
|
|
|
+ metatable.raw_set("__index", set).unwrap();
|
|
|
+
|
|
|
+ // remove external require
|
|
|
+ globals.set("require", require).unwrap();
|
|
|
+ match globals.get("_G") {
|
|
|
+ Ok(Value::Table(table)) => {
|
|
|
+ table.set_readonly(false);
|
|
|
+ table.set_metatable(Some(metatable));
|
|
|
+ table.set_readonly(true);
|
|
|
+ }
|
|
|
+ _ => {}
|
|
|
+ };
|
|
|
+
|
|
|
+ // load main program
|
|
|
lua.load(bytecode).exec().unwrap();
|
|
|
+ panic!("cesdar");
|
|
|
|
|
|
drop(globals);
|
|
|
|
|
@@ -98,32 +152,35 @@ where
|
|
|
bytecode: Vec<u8>,
|
|
|
instances: Arc<AtomicU16>,
|
|
|
running: Arc<AtomicU16>,
|
|
|
- receiver: Arc<Mutex<Receiver<I, R>>>,
|
|
|
+ receiver: Arc<Mutex<Receiver<X, I, R>>>,
|
|
|
) {
|
|
|
if instances.load(Ordering::Relaxed) > 10 {
|
|
|
return;
|
|
|
}
|
|
|
+ let (_, lua) = Self::get_lua_vm(&bytecode);
|
|
|
|
|
|
instances.fetch_add(1, Ordering::Relaxed);
|
|
|
let max_timeout = Duration::from_secs(30);
|
|
|
|
|
|
tokio::task::spawn_blocking(move || {
|
|
|
- let (_, lua) = Self::get_lua_vm(&bytecode);
|
|
|
- let f: Function = lua.globals().get("add").unwrap();
|
|
|
-
|
|
|
loop {
|
|
|
if let Ok(mut queue) =
|
|
|
futures::executor::block_on(timeout(max_timeout, receiver.lock()))
|
|
|
{
|
|
|
- if let Ok(Some(args)) =
|
|
|
+ if let Ok(Some((var_storage, inputs, output))) =
|
|
|
futures::executor::block_on(timeout(max_timeout, queue.recv()))
|
|
|
{
|
|
|
// drop queue lock to release the mutex so any other
|
|
|
// free VM can use it to listen for incoming messages
|
|
|
drop(queue);
|
|
|
+
|
|
|
+ let (_, lua) = Self::get_lua_vm(&bytecode);
|
|
|
+ let f: Function = lua.globals().get("add").unwrap();
|
|
|
+
|
|
|
running.fetch_add(1, Ordering::Relaxed);
|
|
|
- let ret = f.call::<I, Value>(args.0).unwrap();
|
|
|
- let _ = args.1.send(R::from_lua(ret, &lua).unwrap());
|
|
|
+ let ret = f.call::<I, Value>(inputs).unwrap();
|
|
|
+ drop(var_storage);
|
|
|
+ let _ = output.send(R::from_lua(ret, &lua).unwrap());
|
|
|
continue;
|
|
|
}
|
|
|
}
|
|
@@ -135,7 +192,7 @@ where
|
|
|
});
|
|
|
}
|
|
|
|
|
|
- pub async fn exec(&self, input: I) -> R {
|
|
|
+ pub async fn exec(&self, var_storage: Arc<X>, input: I) -> R {
|
|
|
let (return_notifier, return_listener) = oneshot::channel();
|
|
|
Self::spawn(
|
|
|
self.opcodes.clone(),
|
|
@@ -144,16 +201,26 @@ where
|
|
|
self.receiver.clone(),
|
|
|
);
|
|
|
self.sender
|
|
|
- .send((input, return_notifier))
|
|
|
+ .send((var_storage, input, return_notifier))
|
|
|
.await
|
|
|
.expect("valid");
|
|
|
return_listener.await.expect("valid")
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-#[derive(Debug)]
|
|
|
+pub struct VarStorageMem;
|
|
|
+
|
|
|
+#[async_trait::async_trait]
|
|
|
+impl VarStorage for VarStorageMem {
|
|
|
+ async fn get(&self, var: &Variable) -> VarValue {
|
|
|
+ VarValue::Nil
|
|
|
+ }
|
|
|
+
|
|
|
+ async fn set(&mut self, var: &Variable, value: VarValue) {}
|
|
|
+}
|
|
|
+
|
|
|
pub struct VirtualMachines<K: Hash + Eq> {
|
|
|
- vms: RwLock<HashMap<K, Program<(i64, i64), i64>>>,
|
|
|
+ vms: RwLock<HashMap<K, Program<VarStorageMem, (i64, i64), i64>>>,
|
|
|
}
|
|
|
|
|
|
impl<K: Hash + Eq> VirtualMachines<K> {
|
|
@@ -168,9 +235,9 @@ impl<K: Hash + Eq> VirtualMachines<K> {
|
|
|
.await
|
|
|
}
|
|
|
|
|
|
- pub async fn exec(&self, id: &K) -> Option<i64> {
|
|
|
+ pub async fn exec(&self, var_storage: Arc<VarStorageMem>, id: &K) -> Option<i64> {
|
|
|
if let Some(vm) = self.vms.read().await.get(id) {
|
|
|
- Some(vm.exec((22, 33)).await)
|
|
|
+ Some(vm.exec(var_storage, (22, 33)).await)
|
|
|
} else {
|
|
|
None
|
|
|
}
|
|
@@ -196,7 +263,10 @@ impl<K: Hash + Eq> VirtualMachines<K> {
|
|
|
#[tokio::main]
|
|
|
async fn main() {
|
|
|
use std::{sync::Arc, time::Instant};
|
|
|
- async fn do_loop(vms: Arc<VirtualMachines<String>>) {
|
|
|
+
|
|
|
+ let mem = Arc::new(VarStorageMem {});
|
|
|
+
|
|
|
+ async fn do_loop(mem: Arc<VarStorageMem>, vms: Arc<VirtualMachines<String>>) {
|
|
|
// Create N threads to execute the Lua code in parallel
|
|
|
let num_threads = 400;
|
|
|
let (tx, mut rx) = mpsc::channel(num_threads);
|
|
@@ -204,9 +274,12 @@ async fn main() {
|
|
|
let vm = vms.clone();
|
|
|
let tx_clone = tx.clone();
|
|
|
|
|
|
+ let mem_for_worker = mem.clone();
|
|
|
+ let result = vm.exec(mem_for_worker.clone(), &"foo".to_owned()).await;
|
|
|
+
|
|
|
tokio::spawn(async move {
|
|
|
let start_time = Instant::now();
|
|
|
- let result = vm.exec(&"foo".to_owned()).await;
|
|
|
+ let result = vm.exec(mem_for_worker, &"foo".to_owned()).await;
|
|
|
|
|
|
// Send the result back to the main thread
|
|
|
let _ = tx_clone.send(result).await;
|
|
@@ -236,9 +309,7 @@ async fn main() {
|
|
|
let vms = VirtualMachines::new();
|
|
|
|
|
|
// Compile Lua code
|
|
|
- let code = r#"
|
|
|
- calls = 0
|
|
|
- pid = 0
|
|
|
+ let _code = r#"
|
|
|
function add(a, b)
|
|
|
calls = calls + 1
|
|
|
print("Call from old " .. pid .. " " .. calls)
|
|
@@ -246,9 +317,13 @@ async fn main() {
|
|
|
end
|
|
|
print("hello world " .. pid)
|
|
|
"#;
|
|
|
+ let code = r#"
|
|
|
+ require("foo")
|
|
|
+ print("hello world " .. pid)
|
|
|
+ "#;
|
|
|
|
|
|
let _ = vms.register_program("foo".to_owned(), code).await;
|
|
|
- do_loop(vms.clone()).await;
|
|
|
+ do_loop(mem.clone(), vms.clone()).await;
|
|
|
|
|
|
let code = r#"
|
|
|
calls = 0
|
|
@@ -264,7 +339,7 @@ async fn main() {
|
|
|
println!("{} {:?}", "foo", y);
|
|
|
tokio::time::sleep(Duration::from_secs(3)).await;
|
|
|
|
|
|
- do_loop(vms.clone()).await;
|
|
|
+ do_loop(mem.clone(), vms.clone()).await;
|
|
|
|
|
|
vms.shutdown().await;
|
|
|
|