|
@@ -0,0 +1,210 @@
|
|
|
+use mlua::{Compiler, FromLua, Function, IntoLuaMulti, Lua, Value};
|
|
|
+use sha2::{Digest, Sha256};
|
|
|
+use std::{
|
|
|
+ collections::HashMap,
|
|
|
+ sync::{
|
|
|
+ atomic::{AtomicU16, Ordering},
|
|
|
+ Arc,
|
|
|
+ },
|
|
|
+};
|
|
|
+use tokio::{
|
|
|
+ sync::{mpsc, oneshot, Mutex, RwLock},
|
|
|
+ time::{timeout, Duration},
|
|
|
+};
|
|
|
+
|
|
|
+pub type ProgramId = [u8; 32];
|
|
|
+
|
|
|
+type Sender<I, R> = mpsc::Sender<(I, oneshot::Sender<R>)>;
|
|
|
+type Receiver<I, R> = mpsc::Receiver<(I, oneshot::Sender<R>)>;
|
|
|
+
|
|
|
+#[derive(Debug)]
|
|
|
+pub struct Program<I, R>
|
|
|
+where
|
|
|
+ for<'lua> I: IntoLuaMulti<'lua> + Sync + Send + 'lua,
|
|
|
+ for<'lua> R: FromLua<'lua> + Clone + Sync + Send + 'lua,
|
|
|
+{
|
|
|
+ opcodes: Vec<u8>,
|
|
|
+ instances: Arc<AtomicU16>,
|
|
|
+ running: Arc<AtomicU16>,
|
|
|
+ sender: Sender<I, R>,
|
|
|
+ receiver: Arc<Mutex<Receiver<I, R>>>,
|
|
|
+}
|
|
|
+
|
|
|
+impl<I, R> Program<I, R>
|
|
|
+where
|
|
|
+ for<'lua> I: IntoLuaMulti<'lua> + Sync + Send + 'lua,
|
|
|
+ for<'lua> R: FromLua<'lua> + Clone + Sync + Send + 'lua,
|
|
|
+{
|
|
|
+ pub fn new(opcodes: Vec<u8>) -> Program<I, R> {
|
|
|
+ let (sender, receiver) = mpsc::channel(100);
|
|
|
+ Self {
|
|
|
+ instances: Arc::new(AtomicU16::new(0)),
|
|
|
+ running: Arc::new(AtomicU16::new(0)),
|
|
|
+ receiver: Arc::new(Mutex::new(receiver)),
|
|
|
+ opcodes,
|
|
|
+ sender,
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ fn spawn(
|
|
|
+ bytecode: Vec<u8>,
|
|
|
+ instances: Arc<AtomicU16>,
|
|
|
+ running: Arc<AtomicU16>,
|
|
|
+ receiver: Arc<Mutex<Receiver<I, R>>>,
|
|
|
+ ) {
|
|
|
+ if instances.load(Ordering::Relaxed) > 10 {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ let pid = instances.fetch_add(1, Ordering::Relaxed).to_string();
|
|
|
+ let max_timeout = Duration::from_secs(30);
|
|
|
+ //let running = &self.running;
|
|
|
+
|
|
|
+ tokio::task::spawn_blocking(move || {
|
|
|
+ let lua = Lua::new();
|
|
|
+ lua.globals().raw_remove("require").unwrap();
|
|
|
+ let _ = lua.globals().set("pid", pid);
|
|
|
+ lua.load(&bytecode).exec().unwrap();
|
|
|
+
|
|
|
+ let f: Function = lua.globals().get("add").unwrap();
|
|
|
+
|
|
|
+ loop {
|
|
|
+ if let Ok(mut queue) =
|
|
|
+ futures::executor::block_on(timeout(max_timeout, receiver.lock()))
|
|
|
+ {
|
|
|
+ if let Ok(Some(args)) =
|
|
|
+ futures::executor::block_on(timeout(max_timeout, queue.recv()))
|
|
|
+ {
|
|
|
+ // drop queue lock to release the mutex so any other
|
|
|
+ // free VM can use it to listen for incoming messages
|
|
|
+ drop(queue);
|
|
|
+ running.fetch_add(1, Ordering::Relaxed);
|
|
|
+ let ret = f.call::<I, Value>(args.0).unwrap();
|
|
|
+ let _ = args.1.send(R::from_lua(ret, &lua).unwrap());
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ println!("Lua is exiting");
|
|
|
+ instances.fetch_sub(1, Ordering::Relaxed);
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ pub async fn exec(&self, input: I) -> R {
|
|
|
+ let (return_notifier, return_listener) = oneshot::channel();
|
|
|
+ Self::spawn(
|
|
|
+ self.opcodes.clone(),
|
|
|
+ self.instances.clone(),
|
|
|
+ self.running.clone(),
|
|
|
+ self.receiver.clone(),
|
|
|
+ );
|
|
|
+ self.sender
|
|
|
+ .send((input, return_notifier))
|
|
|
+ .await
|
|
|
+ .expect("valid");
|
|
|
+ return_listener.await.expect("valid")
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+#[derive(Debug)]
|
|
|
+pub struct VirtualMachines {
|
|
|
+ vms: RwLock<HashMap<ProgramId, Program<(i64, i64), i64>>>,
|
|
|
+}
|
|
|
+
|
|
|
+impl VirtualMachines {
|
|
|
+ pub async fn register_program(&self, program: &str) -> ProgramId {
|
|
|
+ self.register_opcodes(Compiler::new().compile(program))
|
|
|
+ .await
|
|
|
+ }
|
|
|
+
|
|
|
+ pub async fn exec(&self, id: &ProgramId) -> Option<i64> {
|
|
|
+ if let Some(vm) = self.vms.read().await.get(id) {
|
|
|
+ Some(vm.exec((22, 33)).await)
|
|
|
+ } else {
|
|
|
+ None
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ pub async fn register_opcodes(&self, opcodes: Vec<u8>) -> ProgramId {
|
|
|
+ let mut vms = self.vms.write().await;
|
|
|
+
|
|
|
+ let mut hasher = Sha256::new();
|
|
|
+ hasher.update(&opcodes);
|
|
|
+ let id: ProgramId = hasher.finalize().into();
|
|
|
+
|
|
|
+ if !vms.contains_key(&id) {
|
|
|
+ vms.insert(id.clone(), Program::new(opcodes));
|
|
|
+ }
|
|
|
+ id
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+impl Default for VirtualMachines {
|
|
|
+ fn default() -> Self {
|
|
|
+ Self {
|
|
|
+ vms: RwLock::new(HashMap::new()),
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+#[tokio::main]
|
|
|
+async fn main() {
|
|
|
+ use std::{sync::Arc, time::Instant};
|
|
|
+ async fn do_loop(vms: Arc<VirtualMachines>, id: ProgramId) {
|
|
|
+ // Create N threads to execute the Lua code in parallel
|
|
|
+ let num_threads = 400;
|
|
|
+ let (tx, mut rx) = mpsc::channel(num_threads);
|
|
|
+ for _ in 0..num_threads {
|
|
|
+ let vm = vms.clone();
|
|
|
+ let tx_clone = tx.clone();
|
|
|
+
|
|
|
+ tokio::spawn(async move {
|
|
|
+ let start_time = Instant::now();
|
|
|
+ let result = vm.exec(&id).await;
|
|
|
+
|
|
|
+ // Send the result back to the main thread
|
|
|
+ let _ = tx_clone.send(result).await;
|
|
|
+
|
|
|
+ let elapsed_time = Instant::now() - start_time;
|
|
|
+
|
|
|
+ // Print the elapsed time in seconds and milliseconds
|
|
|
+ println!(
|
|
|
+ "Elapsed time: {} seconds {} milliseconds",
|
|
|
+ elapsed_time.as_secs(),
|
|
|
+ elapsed_time.as_millis(),
|
|
|
+ );
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ drop(tx);
|
|
|
+
|
|
|
+ loop {
|
|
|
+ let result = rx.recv().await;
|
|
|
+ if result.is_none() {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ println!("Result: {:?}", result,);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ let vms = Arc::new(VirtualMachines::default());
|
|
|
+
|
|
|
+ // Compile Lua code
|
|
|
+ let code = r#"
|
|
|
+ calls = 0
|
|
|
+ function add(a, b)
|
|
|
+ calls = calls + 1
|
|
|
+ print("Call " .. pid .. " " .. calls)
|
|
|
+ return a + b
|
|
|
+ end
|
|
|
+ print("hello world " .. pid)
|
|
|
+ "#;
|
|
|
+
|
|
|
+ let id = vms.register_program(code).await;
|
|
|
+ do_loop(vms.clone(), id).await;
|
|
|
+ tokio::time::sleep(Duration::from_secs(30)).await;
|
|
|
+ do_loop(vms.clone(), id).await;
|
|
|
+ tokio::time::sleep(Duration::from_secs(41)).await;
|
|
|
+}
|