use futures::executor::block_on;
use mlua::{Compiler, Lua, Table, Value};
use std::{
    collections::HashMap,
    hash::Hash,
    sync::{
        atomic::{AtomicU16, AtomicUsize, Ordering},
        Arc,
    },
};
use tokio::{
    sync::{mpsc, oneshot, Mutex, RwLock},
    time::{timeout, Duration},
};

#[async_trait::async_trait]
pub trait VarStorage: Send + Sync {
    async fn get(&self, instance: usize, var: Variable) -> VarValue;

    async fn set(&self, instance: usize, var: Variable, value: VarValue);

    async fn shutdown(&self, instance: usize);
}

type Sender<I, R> = mpsc::Sender<(Vec<I>, oneshot::Sender<R>)>;
type Receiver<I, R> = mpsc::Receiver<(Vec<I>, oneshot::Sender<R>)>;

#[derive(Debug)]
pub struct Program<X>
where
    X: VarStorage + 'static,
{
    opcodes: Vec<u8>,
    instances: Arc<AtomicUsize>,
    running: Arc<AtomicU16>,
    execution_id: Arc<AtomicUsize>,
    storage: Arc<X>,
    sender: Sender<VarValue, VarValue>,
    receiver: Arc<Mutex<Receiver<VarValue, VarValue>>>,
}

#[derive(Debug, Clone)]
pub enum VarValue {
    /// The Lua value `nil`.
    Nil,
    /// The Lua value `true` or `false`.
    Boolean(bool),
    /// Integer number
    Integer(i128),
    /// A floating point number.
    Number(f64),
    /// String
    String(String),
    /// A vector
    Vector(Vec<VarValue>),
    /// A
    HashMap(HashMap<String, VarValue>),
    /// An error
    ErrorType(String),
}

pub enum Variable {
    Balances,
    Accounts,
    Transactions,
    Payments,
    Other(String),
}

impl From<String> for Variable {
    fn from(s: String) -> Self {
        match s.as_str() {
            "balances" => Self::Balances,
            "accounts" => Self::Accounts,
            "transactions" => Self::Transactions,
            "payments" => Self::Payments,
            _ => Self::Other(s),
        }
    }
}

impl Variable {
    pub fn name<'a>(&'a self) -> &'a str {
        match self {
            Self::Balances => "balances",
            Self::Accounts => "accounts",
            Self::Transactions => "transactions",
            Self::Payments => "payments",
            Self::Other(s) => s,
        }
    }
}

impl<X> Program<X>
where
    X: VarStorage + 'static,
{
    pub fn new(opcodes: Vec<u8>, storage: Arc<X>) -> Program<X> {
        let (sender, receiver) = mpsc::channel(100);
        Self {
            storage,
            instances: Arc::new(AtomicUsize::new(0)),
            execution_id: Arc::new(AtomicUsize::new(0)),
            running: Arc::new(AtomicU16::new(0)),
            receiver: Arc::new(Mutex::new(receiver)),
            opcodes,
            sender,
        }
    }

    fn var_value_to_lua_val(lua: &Lua, value: VarValue) -> mlua::Result<Value> {
        match value {
            VarValue::Nil => Ok(Value::Nil),
            VarValue::Boolean(b) => Ok(Value::Boolean(b)),
            VarValue::Integer(i) => Ok(Value::Integer(i.try_into().unwrap())),
            VarValue::Number(n) => Ok(Value::Number(n)),
            VarValue::String(s) => Ok(Value::String(lua.create_string(&s)?)),
            VarValue::HashMap(map) => {
                let table = lua.create_table()?;
                for (k, v) in map {
                    table.set(k, Self::var_value_to_lua_val(lua, v)?)?;
                }
                Ok(Value::Table(table))
            }
            VarValue::ErrorType(e) => Err(mlua::Error::RuntimeError(e.to_string())),
            _ => Err(mlua::Error::RuntimeError("Invalid type".into())),
        }
    }

    fn inject_dynamic_global_state(
        lua: &Lua,
        storage: Arc<X>,
        instance: usize,
    ) -> mlua::Result<Option<Table>> {
        lua.set_app_data(storage);

        let getter = lua.create_function(move |lua, (global, key): (Table, String)| {
            match global.raw_get::<_, Value>(key.clone())?.into() {
                Value::Nil => (),
                local_value => return Ok(local_value),
            };
            let storage = lua
                .app_data_ref::<Arc<X>>()
                .ok_or(mlua::Error::MismatchedRegistryKey)?
                .clone();
            let value = block_on(async move { storage.get(instance, key.into()).await });
            Self::var_value_to_lua_val(lua, value)
        })?;
        let setter =
            lua.create_function(move |lua, (global, key, value): (Table, String, Value)| {
                let storage = lua
                    .app_data_ref::<Arc<X>>()
                    .ok_or(mlua::Error::MismatchedRegistryKey)?
                    .clone();
                let value: VarValue = if let Ok(value) = value.as_ref().try_into() {
                    value
                } else {
                    return global.raw_set(key, value);
                };
                block_on(async move {
                    storage.set(instance, key.into(), value).await;
                    Ok(())
                })
            })?;

        let metatable = lua.create_table()?;
        metatable.raw_set("__index", getter)?;
        metatable.raw_set("__newindex", setter)?;

        Ok(Some(metatable))
    }

    /// Returns a new Lua VM and a list of all the global variables to be
    /// persisted and read from the storage engine. Since lua is a dynamic
    /// language, other state variables may be read/updated dynamically, which
    /// is fine, this list is just for the initial state and any potential
    /// optimization.
    fn execute_program(state: Arc<X>, instance: usize, bytecode: &[u8]) -> mlua::Result<VarValue> {
        let lua = Lua::new();
        let globals = lua.globals();

        let require = lua.create_function(|_, (_,): (String,)| -> mlua::Result<()> {
            Err(mlua::Error::RuntimeError("require is not allowed".into()))
        })?;

        globals.set_metatable(Self::inject_dynamic_global_state(
            &lua,
            state.clone(),
            instance,
        )?);
        lua.set_memory_limit(100 * 1024 * 1024)?;

        // remove external require
        globals.set("require", require)?;
        drop(globals);

        // load main program
        let x: Value = lua.load(bytecode).call(())?;

        // shutdown the execution and let the storage / state engine know so all
        // locked variables by this execution_id can be released
        block_on(async move {
            state.shutdown(instance).await;
        });

        x.as_ref().try_into().map_err(|_| mlua::Error::StackError)
    }

    fn spawn(
        storage: Arc<X>,
        bytecode: Vec<u8>,
        instances: Arc<AtomicUsize>,
        exec_id: Arc<AtomicUsize>,
        running: Arc<AtomicU16>,
        receiver: Arc<Mutex<Receiver<VarValue, VarValue>>>,
    ) {
        if instances.load(Ordering::Relaxed) > 100 {
            return;
        }

        instances.fetch_add(1, Ordering::Relaxed);
        let max_timeout = Duration::from_secs(30);

        tokio::task::spawn_blocking(move || {
            loop {
                if let Ok(mut queue) =
                    futures::executor::block_on(timeout(max_timeout, receiver.lock()))
                {
                    if let Ok(Some((_inputs, output))) =
                        futures::executor::block_on(timeout(max_timeout, queue.recv()))
                    {
                        let exec_id: usize = exec_id.fetch_add(1, Ordering::Relaxed);
                        // drop queue lock to release the mutex so any other
                        // free VM can use it to listen for incoming messages
                        drop(queue);

                        let ret =
                            Self::execute_program(storage.clone(), exec_id, &bytecode).unwrap();

                        running.fetch_add(1, Ordering::Relaxed);
                        let _ = output.send(ret).unwrap();
                        continue;
                    }
                }
                break;
            }

            println!("Lua listener is exiting");
            instances.fetch_sub(1, Ordering::Relaxed);
        });
    }

    pub async fn exec(&self, input: Vec<VarValue>) -> VarValue {
        let (return_notifier, return_listener) = oneshot::channel();
        Self::spawn(
            self.storage.clone(),
            self.opcodes.clone(),
            self.instances.clone(),
            self.execution_id.clone(),
            self.running.clone(),
            self.receiver.clone(),
        );
        self.sender
            .send((input, return_notifier))
            .await
            .expect("valid");
        return_listener.await.expect("valid")
    }
}

impl TryFrom<&Value<'_>> for VarValue {
    type Error = String;

    fn try_from(value: &Value<'_>) -> Result<Self, Self::Error> {
        match value {
            Value::Nil => Ok(VarValue::Nil),
            Value::Boolean(b) => Ok(VarValue::Boolean(*b)),
            Value::Integer(i) => Ok(VarValue::Integer((*i).into())),
            Value::Number(n) => Ok(VarValue::Number(*n)),
            Value::String(s) => Ok(VarValue::String(s.to_str().unwrap().to_owned())),
            Value::Table(t) => {
                let mut map = HashMap::new();
                let mut iter = t.clone().pairs::<String, Value>().enumerate();
                let mut is_vector = true;
                while let Some((id, Ok((k, v)))) = iter.next() {
                    if Ok(id + 1) != k.parse() {
                        is_vector = false;
                    }
                    map.insert(k, v.as_ref().try_into()?);
                }

                Ok(if is_vector {
                    let mut values = map
                        .into_iter()
                        .map(|(k, v)| k.parse().map(|k| (k, v)))
                        .collect::<Result<Vec<(usize, VarValue)>, _>>()
                        .unwrap();

                    values.sort_by(|(a, _), (b, _)| a.cmp(b));

                    VarValue::Vector(values.into_iter().map(|(_, v)| v).collect())
                } else {
                    VarValue::HashMap(map)
                })
            }
            x => Err(format!("Invalid type: {:?}", x)),
        }
    }
}

#[derive(Debug)]
pub struct VarStorageMem {
    storage: RwLock<HashMap<String, VarValue>>,
    locks: RwLock<HashMap<String, usize>>,
    var_locked_by_instance: Mutex<HashMap<usize, Vec<String>>>,
}

impl Default for VarStorageMem {
    fn default() -> Self {
        Self {
            storage: RwLock::new(HashMap::new()),
            locks: RwLock::new(HashMap::new()),
            var_locked_by_instance: Mutex::new(HashMap::new()),
        }
    }
}

impl VarStorageMem {
    async fn lock(&self, instance: usize, var: &Variable) {
        let locks = self.locks.read().await;
        let name = var.name();

        if locks.get(name).map(|v| *v) == Some(instance) {
            // The variable is already locked by this instance
            return;
        }

        drop(locks);

        loop {
            // wait here while the locked is not null or it is locked by another
            // instance
            let locks = self.locks.read().await;
            let var_lock = locks.get(name).map(|v| *v);
            if var_lock.is_none() || var_lock == Some(instance) {
                break;
            }
            drop(locks);
            tokio::time::sleep(Duration::from_micros(10)).await;
        }

        loop {
            let mut locks = self.locks.write().await;
            let var_lock = locks.get(name).map(|v| *v);

            if !var_lock.is_none() {
                if var_lock == Some(instance) {
                    break;
                }
                drop(locks);
                tokio::time::sleep(Duration::from_micros(10)).await;
                continue;
            }

            locks.insert(name.to_owned(), instance);

            let mut vars_by_instance = self.var_locked_by_instance.lock().await;
            if let Some(vars) = vars_by_instance.get_mut(&instance) {
                vars.push(name.to_owned());
            } else {
                vars_by_instance.insert(instance, vec![name.to_owned()]);
            }
        }
    }
}

#[async_trait::async_trait]
impl VarStorage for VarStorageMem {
    async fn get(&self, instance: usize, var: Variable) -> VarValue {
        self.lock(instance, &var).await;
        self.storage
            .read()
            .await
            .get(var.name())
            .cloned()
            .unwrap_or(VarValue::Nil)
    }

    async fn set(&self, instance: usize, var: Variable, value: VarValue) {
        self.lock(instance, &var).await;
        self.storage
            .write()
            .await
            .insert(var.name().to_owned(), value);
    }

    async fn shutdown(&self, instance: usize) {
        let mut vars_by_instance = self.var_locked_by_instance.lock().await;
        let mut locks = self.locks.write().await;

        if let Some(vars) = vars_by_instance.remove(&instance) {
            for var in vars {
                if locks.get(&var).map(|v| *v) == Some(instance) {
                    locks.remove(&var);
                }
            }
        }
    }
}

pub struct Runtime<K: Hash + Eq, X: VarStorage + 'static> {
    vms: RwLock<HashMap<K, Program<X>>>,
}

impl<K: Hash + Eq, X: VarStorage + 'static> Runtime<K, X> {
    pub fn new() -> Arc<Self> {
        Arc::new(Self {
            vms: RwLock::new(HashMap::new()),
        })
    }

    pub async fn register_program(&self, name: K, program: &str, storage: Arc<X>) -> bool {
        self.register_opcodes(name, Compiler::new().compile(program), storage)
            .await
    }

    pub async fn exec(&self, id: &K) -> Option<VarValue> {
        if let Some(vm) = self.vms.read().await.get(id) {
            Some(vm.exec(vec![VarValue::Integer(1)]).await)
        } else {
            None
        }
    }

    pub async fn register_opcodes(&self, name: K, opcodes: Vec<u8>, storage: Arc<X>) -> bool {
        let mut vms = self.vms.write().await;

        vms.insert(name, Program::new(opcodes, storage)).is_some()
    }

    pub async fn shutdown(&self) {
        let mut vms = self.vms.write().await;
        vms.clear();
    }
}

#[tokio::main]
async fn main() {
    use std::{sync::Arc, time::Instant};

    let mem = Arc::new(VarStorageMem::default());

    async fn do_loop(vms: Arc<Runtime<String, VarStorageMem>>) {
        // Create N threads to execute the Lua code in parallel
        let num_threads = 400;
        let (tx, mut rx) = mpsc::channel(num_threads);
        for _ in 0..num_threads {
            let vm = vms.clone();
            let tx_clone = tx.clone();

            tokio::spawn(async move {
                let start_time = Instant::now();
                let result = vm.exec(&"foo".to_owned()).await;

                // Send the result back to the main thread
                let _ = tx_clone.send(result).await;

                let elapsed_time = Instant::now() - start_time;

                // Print the elapsed time in seconds and milliseconds
                println!(
                    "Elapsed time: {} seconds {} milliseconds",
                    elapsed_time.as_secs(),
                    elapsed_time.as_millis(),
                );
            });
        }

        drop(tx);

        loop {
            let result = rx.recv().await;
            if result.is_none() {
                break;
            }
            println!("Result: {:?}", result.unwrap());
        }
    }

    let vms = Runtime::new();

    // Compile Lua code
    let _code = r#"
        function add(a, b)
            calls = calls + 1
            print("Call from old " .. pid .. " "  .. calls)
            return a + b
        end
        print("hello world " .. pid)
    "#;
    let code = r#"
        if pid == nil then
            pid = 0
        end
        pid = pid + 1
        print("hello world " .. pid)
        return true
    "#;

    let _ = vms
        .register_program("foo".to_owned(), code, mem.clone())
        .await;
    do_loop(vms.clone()).await;

    let code = r#"
        if pid == nil then
            pid = 0
        end
        pid = pid + 1
        function add(a, b)
            foo = {1,"foo"}
            print("Call from new " .. pid .. " ")
            return a + b
        end
        print("hello world " .. pid .. " = " .. add(pid, pid))
        return false
    "#;
    let y = vms
        .register_program("foo".to_owned(), code, mem.clone())
        .await;
    tokio::time::sleep(Duration::from_secs(3)).await;

    do_loop(vms.clone()).await;

    vms.shutdown().await;

    tokio::time::sleep(Duration::from_secs(1)).await;

    println!("{} {:?}", "foo", mem);
}