|
@@ -1,4 +1,4 @@
|
|
|
-use mlua::{Compiler, FromLua, Function, IntoLua, IntoLuaMulti, Lua, Table, UserData, Value};
|
|
|
+use mlua::{Compiler, FromLua, Function, IntoLuaMulti, Lua, Table, Value};
|
|
|
use sha2::{Digest, Sha256};
|
|
|
use std::{
|
|
|
collections::HashMap,
|
|
@@ -17,25 +17,49 @@ pub type ProgramId = [u8; 32];
|
|
|
|
|
|
#[async_trait::async_trait]
|
|
|
pub trait VarStorage: Send + Sync {
|
|
|
- async fn get(&self, var: &Variable) -> VarValue;
|
|
|
- async fn set(&mut self, var: &Variable, value: VarValue);
|
|
|
+ type Error: ToString;
|
|
|
+
|
|
|
+ async fn get(&self, var: Variable) -> &VarValue<Self::Error>;
|
|
|
+
|
|
|
+ async fn set(&mut self, var: Variable, value: VarValue<Self::Error>);
|
|
|
}
|
|
|
|
|
|
-type Sender<X, I, R> = mpsc::Sender<(Arc<X>, I, oneshot::Sender<R>)>;
|
|
|
-type Receiver<X, I, R> = mpsc::Receiver<(Arc<X>, I, oneshot::Sender<R>)>;
|
|
|
+type Sender<I, R> = mpsc::Sender<(I, oneshot::Sender<R>)>;
|
|
|
+type Receiver<I, R> = mpsc::Receiver<(I, oneshot::Sender<R>)>;
|
|
|
|
|
|
#[derive(Debug)]
|
|
|
pub struct Program<X, I, R>
|
|
|
where
|
|
|
- X: VarStorage,
|
|
|
+ X: VarStorage + 'static,
|
|
|
for<'lua> I: IntoLuaMulti<'lua> + Sync + Send + 'lua,
|
|
|
for<'lua> R: FromLua<'lua> + Clone + Sync + Send + 'lua,
|
|
|
{
|
|
|
opcodes: Vec<u8>,
|
|
|
instances: Arc<AtomicU16>,
|
|
|
running: Arc<AtomicU16>,
|
|
|
- sender: Sender<X, I, R>,
|
|
|
- receiver: Arc<Mutex<Receiver<X, I, R>>>,
|
|
|
+ storage: Arc<X>,
|
|
|
+ sender: Sender<I, R>,
|
|
|
+ receiver: Arc<Mutex<Receiver<I, R>>>,
|
|
|
+}
|
|
|
+
|
|
|
+#[derive(Debug, Clone)]
|
|
|
+pub enum VarValue<E: ToString> {
|
|
|
+ /// The Lua value `nil`.
|
|
|
+ Nil,
|
|
|
+ /// The Lua value `true` or `false`.
|
|
|
+ Boolean(bool),
|
|
|
+ /// Integer number
|
|
|
+ Integer(i128),
|
|
|
+ /// A floating point number.
|
|
|
+ Number(f64),
|
|
|
+ /// String
|
|
|
+ String(String),
|
|
|
+ /// A vector
|
|
|
+ Vector(Vec<VarValue<E>>),
|
|
|
+ /// A
|
|
|
+ HashMap(HashMap<String, VarValue<E>>),
|
|
|
+ /// An error
|
|
|
+ Error(E),
|
|
|
}
|
|
|
|
|
|
pub enum Variable {
|
|
@@ -46,11 +70,6 @@ pub enum Variable {
|
|
|
Other(String),
|
|
|
}
|
|
|
|
|
|
-pub enum VarValue {
|
|
|
- Scalar(String),
|
|
|
- Nil,
|
|
|
-}
|
|
|
-
|
|
|
impl From<String> for Variable {
|
|
|
fn from(s: String) -> Self {
|
|
|
match s.as_str() {
|
|
@@ -75,26 +94,16 @@ impl Variable {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-pub struct MetaState(Vec<Variable>);
|
|
|
-
|
|
|
-impl UserData for MetaState {
|
|
|
- fn add_methods<'lua, M: mlua::UserDataMethods<'lua, Self>>(methods: &mut M) {
|
|
|
- methods.add_method_mut("__index", |_, _this, (_, key): (Table<'lua>, String)| {
|
|
|
- panic!("cesar {}", key);
|
|
|
- Ok(format!("dyn value {}", key))
|
|
|
- });
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
impl<X, I, R> Program<X, I, R>
|
|
|
where
|
|
|
- for<'lua> X: VarStorage + 'lua,
|
|
|
+ X: VarStorage + 'static,
|
|
|
for<'lua> I: IntoLuaMulti<'lua> + Sync + Send + 'lua,
|
|
|
for<'lua> R: FromLua<'lua> + Clone + Sync + Send + 'lua,
|
|
|
{
|
|
|
- pub fn new(opcodes: Vec<u8>) -> Program<X, I, R> {
|
|
|
+ pub fn new(opcodes: Vec<u8>, storage: Arc<X>) -> Program<X, I, R> {
|
|
|
let (sender, receiver) = mpsc::channel(100);
|
|
|
Self {
|
|
|
+ storage,
|
|
|
instances: Arc::new(AtomicU16::new(0)),
|
|
|
running: Arc::new(AtomicU16::new(0)),
|
|
|
receiver: Arc::new(Mutex::new(receiver)),
|
|
@@ -103,61 +112,71 @@ where
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ fn dynamic_global_state(storage: Arc<X>, lua: &Lua) -> Result<Option<Table>, mlua::Error> {
|
|
|
+ lua.set_app_data(storage);
|
|
|
+
|
|
|
+ let getter = lua.create_async_function(|lua, (_, key): (Table, String)| async move {
|
|
|
+ let storage = lua.app_data_ref::<Arc<X>>().unwrap();
|
|
|
+ let x = storage.get(key.into()).await;
|
|
|
+
|
|
|
+ let x = Ok(format!("foo bar -> {}", key));
|
|
|
+
|
|
|
+ drop(storage);
|
|
|
+ x
|
|
|
+ })?;
|
|
|
+ let setter =
|
|
|
+ lua.create_async_function(|_, (_, key, value): (Table, String, Value)| async move {
|
|
|
+ panic!("set {} -> {:?}", key, value);
|
|
|
+ Ok(())
|
|
|
+ })?;
|
|
|
+
|
|
|
+ let metatable = lua.create_table().unwrap();
|
|
|
+ metatable.raw_set("__index", getter).unwrap();
|
|
|
+ metatable.raw_set("__newindex", setter).unwrap();
|
|
|
+
|
|
|
+ Ok(Some(metatable))
|
|
|
+ }
|
|
|
+
|
|
|
/// Returns a new Lua VM and a list of all the global variables to be
|
|
|
/// persisted and read from the storage engine. Since lua is a dynamic
|
|
|
/// language, other state variables may be read/updated dynamically, which
|
|
|
/// is fine, this list is just for the initial state and any potential
|
|
|
/// optimization.
|
|
|
- fn get_lua_vm(bytecode: &[u8]) -> (Vec<Variable>, Lua) {
|
|
|
+ fn get_lua_vm(storage: Arc<X>, bytecode: &[u8]) -> (Vec<Variable>, Lua) {
|
|
|
let lua = Lua::new();
|
|
|
- lua.sandbox(true).unwrap();
|
|
|
let globals = lua.globals();
|
|
|
|
|
|
let require = lua
|
|
|
- .create_function(|_, (name,): (String,)| -> mlua::Result<()> {
|
|
|
+ .create_function(|_, (_,): (String,)| -> mlua::Result<()> {
|
|
|
Err(mlua::Error::RuntimeError("require is not allowed".into()))
|
|
|
})
|
|
|
.unwrap();
|
|
|
|
|
|
- let set = lua
|
|
|
- .create_async_function(|_, (_, key): (Table, String)| async move {
|
|
|
- let x = Ok(format!("foo bar -> {}", key));
|
|
|
- x
|
|
|
- })
|
|
|
- .unwrap();
|
|
|
- let metatable = lua.create_table().unwrap();
|
|
|
- metatable.raw_set("__index", set).unwrap();
|
|
|
+ globals.set_metatable(Self::dynamic_global_state(storage, &lua).unwrap());
|
|
|
+ lua.set_memory_limit(100 * 1024 * 1024).unwrap();
|
|
|
|
|
|
// remove external require
|
|
|
globals.set("require", require).unwrap();
|
|
|
- match globals.get("_G") {
|
|
|
- Ok(Value::Table(table)) => {
|
|
|
- table.set_readonly(false);
|
|
|
- table.set_metatable(Some(metatable));
|
|
|
- table.set_readonly(true);
|
|
|
- }
|
|
|
- _ => {}
|
|
|
- };
|
|
|
+ drop(globals);
|
|
|
|
|
|
// load main program
|
|
|
lua.load(bytecode).exec().unwrap();
|
|
|
panic!("cesdar");
|
|
|
|
|
|
- drop(globals);
|
|
|
-
|
|
|
(vec![], lua)
|
|
|
}
|
|
|
|
|
|
fn spawn(
|
|
|
+ storage: Arc<X>,
|
|
|
bytecode: Vec<u8>,
|
|
|
instances: Arc<AtomicU16>,
|
|
|
running: Arc<AtomicU16>,
|
|
|
- receiver: Arc<Mutex<Receiver<X, I, R>>>,
|
|
|
+ receiver: Arc<Mutex<Receiver<I, R>>>,
|
|
|
) {
|
|
|
if instances.load(Ordering::Relaxed) > 10 {
|
|
|
return;
|
|
|
}
|
|
|
- let (_, lua) = Self::get_lua_vm(&bytecode);
|
|
|
+ let (_, lua) = Self::get_lua_vm(storage.clone(), &bytecode);
|
|
|
|
|
|
instances.fetch_add(1, Ordering::Relaxed);
|
|
|
let max_timeout = Duration::from_secs(30);
|
|
@@ -167,19 +186,18 @@ where
|
|
|
if let Ok(mut queue) =
|
|
|
futures::executor::block_on(timeout(max_timeout, receiver.lock()))
|
|
|
{
|
|
|
- if let Ok(Some((var_storage, inputs, output))) =
|
|
|
+ if let Ok(Some((inputs, output))) =
|
|
|
futures::executor::block_on(timeout(max_timeout, queue.recv()))
|
|
|
{
|
|
|
// drop queue lock to release the mutex so any other
|
|
|
// free VM can use it to listen for incoming messages
|
|
|
drop(queue);
|
|
|
|
|
|
- let (_, lua) = Self::get_lua_vm(&bytecode);
|
|
|
+ let (_, lua) = Self::get_lua_vm(storage.clone(), &bytecode);
|
|
|
let f: Function = lua.globals().get("add").unwrap();
|
|
|
|
|
|
running.fetch_add(1, Ordering::Relaxed);
|
|
|
let ret = f.call::<I, Value>(inputs).unwrap();
|
|
|
- drop(var_storage);
|
|
|
let _ = output.send(R::from_lua(ret, &lua).unwrap());
|
|
|
continue;
|
|
|
}
|
|
@@ -192,65 +210,116 @@ where
|
|
|
});
|
|
|
}
|
|
|
|
|
|
- pub async fn exec(&self, var_storage: Arc<X>, input: I) -> R {
|
|
|
+ pub async fn exec(&self, input: I) -> R {
|
|
|
let (return_notifier, return_listener) = oneshot::channel();
|
|
|
Self::spawn(
|
|
|
+ self.storage.clone(),
|
|
|
self.opcodes.clone(),
|
|
|
self.instances.clone(),
|
|
|
self.running.clone(),
|
|
|
self.receiver.clone(),
|
|
|
);
|
|
|
self.sender
|
|
|
- .send((var_storage, input, return_notifier))
|
|
|
+ .send((input, return_notifier))
|
|
|
.await
|
|
|
.expect("valid");
|
|
|
return_listener.await.expect("valid")
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-pub struct VarStorageMem;
|
|
|
+pub struct VarStorageMem {
|
|
|
+ storage: HashMap<String, VarValue<mlua::Error>>,
|
|
|
+}
|
|
|
+
|
|
|
+impl Default for VarStorageMem {
|
|
|
+ fn default() -> Self {
|
|
|
+ Self {
|
|
|
+ storage: HashMap::new(),
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+impl TryInto<VarValue<mlua::Error>> for Value<'_> {
|
|
|
+ type Error = String;
|
|
|
+
|
|
|
+ fn try_into(
|
|
|
+ self,
|
|
|
+ ) -> Result<VarValue<mlua::Error>, <Value<'static> as TryInto<VarValue<mlua::Error>>>::Error>
|
|
|
+ {
|
|
|
+ match self {
|
|
|
+ Value::Nil => Ok(VarValue::Nil),
|
|
|
+ Value::Boolean(b) => Ok(VarValue::Boolean(b)),
|
|
|
+ Value::Integer(i) => Ok(VarValue::Integer(i.into())),
|
|
|
+ Value::Number(n) => Ok(VarValue::Number(n)),
|
|
|
+ Value::String(s) => Ok(VarValue::String(s.to_str().unwrap().to_owned())),
|
|
|
+ Value::Table(t) => {
|
|
|
+ let mut map = HashMap::new();
|
|
|
+ let mut iter = t.pairs::<String, Value>();
|
|
|
+ while let Some(Ok((k, v))) = iter.next() {
|
|
|
+ map.insert(k, v.try_into()?);
|
|
|
+ }
|
|
|
+ Ok(VarValue::HashMap(map))
|
|
|
+ }
|
|
|
+ _ => Err("Invalid type".into()),
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
|
|
|
#[async_trait::async_trait]
|
|
|
impl VarStorage for VarStorageMem {
|
|
|
- async fn get(&self, var: &Variable) -> VarValue {
|
|
|
- VarValue::Nil
|
|
|
+ type Error = mlua::Error;
|
|
|
+
|
|
|
+ async fn get(&self, var: Variable) -> &VarValue<Self::Error> {
|
|
|
+ self.storage.get(var.name()).unwrap_or(&VarValue::Nil)
|
|
|
}
|
|
|
|
|
|
- async fn set(&mut self, var: &Variable, value: VarValue) {}
|
|
|
+ async fn set(&mut self, var: Variable, value: VarValue<Self::Error>) {
|
|
|
+ self.storage.insert(var.name().to_owned(), value);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
-pub struct VirtualMachines<K: Hash + Eq> {
|
|
|
- vms: RwLock<HashMap<K, Program<VarStorageMem, (i64, i64), i64>>>,
|
|
|
+pub struct Runtime<K: Hash + Eq, X: VarStorage + 'static> {
|
|
|
+ vms: RwLock<HashMap<K, Program<X, (i64, i64), i64>>>,
|
|
|
}
|
|
|
|
|
|
-impl<K: Hash + Eq> VirtualMachines<K> {
|
|
|
+impl<K: Hash + Eq, X: VarStorage + 'static> Runtime<K, X> {
|
|
|
pub fn new() -> Arc<Self> {
|
|
|
Arc::new(Self {
|
|
|
vms: RwLock::new(HashMap::new()),
|
|
|
})
|
|
|
}
|
|
|
|
|
|
- pub async fn register_program(&self, name: K, program: &str) -> (ProgramId, bool) {
|
|
|
- self.register_opcodes(name, Compiler::new().compile(program))
|
|
|
+ pub async fn register_program(
|
|
|
+ &self,
|
|
|
+ name: K,
|
|
|
+ program: &str,
|
|
|
+ storage: Arc<X>,
|
|
|
+ ) -> (ProgramId, bool) {
|
|
|
+ self.register_opcodes(name, Compiler::new().compile(program), storage)
|
|
|
.await
|
|
|
}
|
|
|
|
|
|
- pub async fn exec(&self, var_storage: Arc<VarStorageMem>, id: &K) -> Option<i64> {
|
|
|
+ pub async fn exec(&self, id: &K) -> Option<i64> {
|
|
|
if let Some(vm) = self.vms.read().await.get(id) {
|
|
|
- Some(vm.exec(var_storage, (22, 33)).await)
|
|
|
+ Some(vm.exec((22, 33)).await)
|
|
|
} else {
|
|
|
None
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- pub async fn register_opcodes(&self, name: K, opcodes: Vec<u8>) -> (ProgramId, bool) {
|
|
|
+ pub async fn register_opcodes(
|
|
|
+ &self,
|
|
|
+ name: K,
|
|
|
+ opcodes: Vec<u8>,
|
|
|
+ storage: Arc<X>,
|
|
|
+ ) -> (ProgramId, bool) {
|
|
|
let mut vms = self.vms.write().await;
|
|
|
|
|
|
let mut hasher = Sha256::new();
|
|
|
hasher.update(&opcodes);
|
|
|
(
|
|
|
hasher.finalize().into(),
|
|
|
- vms.insert(name, Program::new(opcodes)).is_some(),
|
|
|
+ vms.insert(name, Program::new(opcodes, storage)).is_some(),
|
|
|
)
|
|
|
}
|
|
|
|
|
@@ -264,9 +333,9 @@ impl<K: Hash + Eq> VirtualMachines<K> {
|
|
|
async fn main() {
|
|
|
use std::{sync::Arc, time::Instant};
|
|
|
|
|
|
- let mem = Arc::new(VarStorageMem {});
|
|
|
+ let mem = Arc::new(VarStorageMem::default());
|
|
|
|
|
|
- async fn do_loop(mem: Arc<VarStorageMem>, vms: Arc<VirtualMachines<String>>) {
|
|
|
+ async fn do_loop(mem: Arc<VarStorageMem>, vms: Arc<Runtime<String, VarStorageMem>>) {
|
|
|
// Create N threads to execute the Lua code in parallel
|
|
|
let num_threads = 400;
|
|
|
let (tx, mut rx) = mpsc::channel(num_threads);
|
|
@@ -274,12 +343,11 @@ async fn main() {
|
|
|
let vm = vms.clone();
|
|
|
let tx_clone = tx.clone();
|
|
|
|
|
|
- let mem_for_worker = mem.clone();
|
|
|
- let result = vm.exec(mem_for_worker.clone(), &"foo".to_owned()).await;
|
|
|
+ let result = vm.exec(&"foo".to_owned()).await;
|
|
|
|
|
|
tokio::spawn(async move {
|
|
|
let start_time = Instant::now();
|
|
|
- let result = vm.exec(mem_for_worker, &"foo".to_owned()).await;
|
|
|
+ let result = vm.exec(&"foo".to_owned()).await;
|
|
|
|
|
|
// Send the result back to the main thread
|
|
|
let _ = tx_clone.send(result).await;
|
|
@@ -306,7 +374,7 @@ async fn main() {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- let vms = VirtualMachines::new();
|
|
|
+ let vms = Runtime::new();
|
|
|
|
|
|
// Compile Lua code
|
|
|
let _code = r#"
|
|
@@ -318,11 +386,18 @@ async fn main() {
|
|
|
print("hello world " .. pid)
|
|
|
"#;
|
|
|
let code = r#"
|
|
|
- require("foo")
|
|
|
+ --require("foo")
|
|
|
+ local foobar = "foo"
|
|
|
+ foo.foobar = "yy"
|
|
|
+ print("hello world " .. pid)
|
|
|
+ pid = 999
|
|
|
+ foo = "cesar"
|
|
|
print("hello world " .. pid)
|
|
|
"#;
|
|
|
|
|
|
- let _ = vms.register_program("foo".to_owned(), code).await;
|
|
|
+ let _ = vms
|
|
|
+ .register_program("foo".to_owned(), code, mem.clone())
|
|
|
+ .await;
|
|
|
do_loop(mem.clone(), vms.clone()).await;
|
|
|
|
|
|
let code = r#"
|
|
@@ -335,7 +410,9 @@ async fn main() {
|
|
|
end
|
|
|
print("hello world " .. pid)
|
|
|
"#;
|
|
|
- let y = vms.register_program("foo".to_owned(), code).await;
|
|
|
+ let y = vms
|
|
|
+ .register_program("foo".to_owned(), code, mem.clone())
|
|
|
+ .await;
|
|
|
println!("{} {:?}", "foo", y);
|
|
|
tokio::time::sleep(Duration::from_secs(3)).await;
|
|
|
|