Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Async Support #462

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
240 changes: 131 additions & 109 deletions native/wasmex/Cargo.lock

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion native/wasmex/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ crate-type = ["dylib"]

[dependencies]
rustler = { version = "0.34", features = ["big_integer"] }
futures-lite = "2.3.0"
once_cell = "1.19.0"
rand = "0.8.5"
wasmtime = "23.0.2"
wasmtime = { version = "23.0.2", features = ["async"] }
wasmtime-wasi = "23.0.1"
wasi-common = "23.0.1"
wiggle = "23.0.1"
wat = "1.215.0"
tokio = { version = "1.39.1", features = ["full"] }
1 change: 1 addition & 0 deletions native/wasmex/src/atoms.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ rustler::atoms! {
// calls to erlang processes
returned_function_call,
invoke_callback,
async_nif_result,

// engine config - cranelift_opt_level
none,
Expand Down
3 changes: 2 additions & 1 deletion native/wasmex/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub fn new(config: ExEngineConfig) -> Result<ResourceArc<EngineResource>, rustle
Ok(resource)
}

#[rustler::nif(name = "engine_precompile_module")]
#[rustler::nif(name = "engine_precompile_module", schedule = "DirtyCpu")]
pub fn precompile_module<'a>(
env: rustler::Env<'a>,
engine_resource: ResourceArc<EngineResource>,
Expand Down Expand Up @@ -66,6 +66,7 @@ pub(crate) fn engine_config(engine_config: ExEngineConfig) -> Config {
config.consume_fuel(engine_config.consume_fuel);
config.wasm_backtrace_details(backtrace_details);
config.cranelift_opt_level(cranelift_opt_level);
config.async_support(true);
config
}

Expand Down
124 changes: 72 additions & 52 deletions native/wasmex/src/environment.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use crate::{
atoms,
caller::{remove_caller, set_caller},
instance::{map_wasm_values_to_vals, LinkedModule, WasmValue},
instance::{map_wasm_values_to_vals, ImportDefinition, LinkedModule, WasmValue},
memory::MemoryResource,
store::{StoreData, StoreOrCaller, StoreOrCallerResource},
};
use rustler::{
types::tuple, Atom, Encoder, Error, ListIterator, MapIterator, OwnedEnv, ResourceArc, Term,
types::tuple, Atom, Encoder, Error, ListIterator, LocalPid, MapIterator, OwnedEnv, ResourceArc, Term
};
use std::sync::{Condvar, Mutex};
use wasmtime::{Caller, Engine, FuncType, Linker, Val, ValType};
Expand Down Expand Up @@ -52,49 +52,82 @@ pub fn link_modules(
Ok(())
}

pub fn link_imports(
engine: &Engine,
linker: &mut Linker<StoreData>,
pub fn imports_from_map_iterator(
imports: MapIterator,
) -> Result<(), Error> {
) -> Result<Vec<ImportDefinition>, Error> {
let mut result = Vec::new();
for (namespace_name, namespace_definition) in imports {
let namespace_name = namespace_name.decode::<String>()?;
let definition: MapIterator = namespace_definition.decode()?;

for (import_name, import) in definition {
let import_name = import_name.decode::<String>()?;
link_import(engine, linker, &namespace_name, &import_name, import)?;
let import_tuple = tuple::get_tuple(import)?;

let import_type = import_tuple
.first()
.ok_or(Error::Atom("missing_import_type"))?;
let import_type =
Atom::from_term(*import_type).map_err(|_| Error::Atom("import type must be an atom"))?;

if atoms::__fn__().eq(&import_type) {
let param_term = import_tuple
.get(1)
.ok_or(Error::Atom("missing_import_params"))?;
let results_term = import_tuple
.get(2)
.ok_or(Error::Atom("missing_import_results"))?;

let params_signature = param_term
.decode::<ListIterator>()?
.map(term_to_arg_type)
.collect::<Result<Vec<ValType>, _>>()?;

let results_signature = results_term
.decode::<ListIterator>()?
.map(term_to_arg_type)
.collect::<Result<Vec<ValType>, _>>()?;

result.push(ImportDefinition::Function {
namespace: namespace_name.clone(),
name: import_name.clone(),
params: params_signature,
results: results_signature,
});
} else {
return Err(Error::Atom("unknown import type"));
}
}
}
Ok(result)
}

pub fn link_imports(
engine: &Engine,
linker: &mut Linker<StoreData>,
imports: &Vec<ImportDefinition>,
pid: &LocalPid,
) -> Result<(), Error> {
for import_definition in imports {
link_import(engine, linker, import_definition, pid)?;
}
Ok(())
}

fn link_import(
engine: &Engine,
linker: &mut Linker<StoreData>,
namespace_name: &str,
import_name: &str,
definition: Term,
import_definition: &ImportDefinition,
pid: &LocalPid,
) -> Result<(), Error> {
let import_tuple = tuple::get_tuple(definition)?;

let import_type = import_tuple
.first()
.ok_or(Error::Atom("missing_import_type"))?;
let import_type =
Atom::from_term(*import_type).map_err(|_| Error::Atom("import type must be an atom"))?;

if atoms::__fn__().eq(&import_type) {
return link_imported_function(
engine,
linker,
namespace_name.to_string(),
import_name.to_string(),
definition,
);
match import_definition {
ImportDefinition::Function {
namespace,
name,
params,
results,
} => link_imported_function(engine, linker, namespace, name, params, results, pid),
}

Err(Error::Atom("unknown import type"))
}

// Creates a wrapper function used in a Wasm import object.
Expand All @@ -112,32 +145,19 @@ fn link_import(
fn link_imported_function(
engine: &Engine,
linker: &mut Linker<StoreData>,
namespace_name: String,
import_name: String,
definition: Term,
namespace_name: &String,
import_name: &String,
params_signature: &Vec<ValType>,
results_signature: &Vec<ValType>,
pid: &LocalPid,
) -> Result<(), Error> {
let pid = definition.get_env().pid();

let import_tuple = tuple::get_tuple(definition)?;

let param_term = import_tuple
.get(1)
.ok_or(Error::Atom("missing_import_params"))?;
let results_term = import_tuple
.get(2)
.ok_or(Error::Atom("missing_import_results"))?;

let params_signature = param_term
.decode::<ListIterator>()?
.map(term_to_arg_type)
.collect::<Result<Vec<ValType>, _>>()?;

let results_signature = results_term
.decode::<ListIterator>()?
.map(term_to_arg_type)
.collect::<Result<Vec<ValType>, _>>()?;
let namespace_name = namespace_name.clone();
let import_name = import_name.clone();
let params_signature = params_signature.clone();
let results_signature = results_signature.clone();
let signature = FuncType::new(engine, params_signature.clone(), results_signature.clone());
let pid = pid.clone();

let signature = FuncType::new(engine, params_signature, results_signature.clone());
linker
.func_new(
&namespace_name.clone(),
Expand Down
105 changes: 59 additions & 46 deletions native/wasmex/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,39 @@

use crate::{
atoms,
environment::{link_imports, link_modules, CallbackTokenResource},
environment::{imports_from_map_iterator, link_imports, link_modules, CallbackTokenResource},
functions,
module::ModuleResource,
printable_term_type::PrintableTermType,
store::{StoreData, StoreOrCaller, StoreOrCallerResource},
task::{self, send_async_nif_result},
};
use rustler::{
env::SavedTerm,
types::{tuple::make_tuple, ListIterator},
Encoder, Env as RustlerEnv, Error, MapIterator, NifMap, NifResult, OwnedEnv, ResourceArc, Term,
TermType,
Atom, Encoder, Env as RustlerEnv, Error, MapIterator, NifMap, NifResult, OwnedEnv,
ResourceArc, Term, TermType,
};
use std::ops::Deref;
use std::sync::Mutex;
use std::thread;
use wasmtime::{Instance, Linker, Module, Val, ValType};
use wasmtime::{AsContextMut, Instance, Linker, Val, ValType};

#[derive(NifMap)]
pub struct LinkedModule {
pub name: String,
pub module_resource: ResourceArc<ModuleResource>,
}

#[derive(Debug, Clone)]
pub enum ImportDefinition {
Function {
namespace: String,
name: String,
params: Vec<ValType>,
results: Vec<ValType>,
},
}

pub struct InstanceResource {
pub inner: Mutex<Instance>,
}
Expand All @@ -43,49 +53,52 @@ impl rustler::Resource for InstanceResource {}
// structure: %{namespace_name: %{import_name: {:fn, param_types, result_types, captured_function}}}
#[rustler::nif(name = "instance_new")]
pub fn new(
env: rustler::Env,
store_or_caller_resource: ResourceArc<StoreOrCallerResource>,
module_resource: ResourceArc<ModuleResource>,
imports: MapIterator,
linked_modules: Vec<LinkedModule>,
) -> Result<ResourceArc<InstanceResource>, rustler::Error> {
let module = module_resource.inner.lock().map_err(|e| {
rustler::Error::Term(Box::new(format!(
"Could not unlock module resource as the mutex was poisoned: {e}"
)))
})?;
let store_or_caller: &mut StoreOrCaller =
&mut *(store_or_caller_resource.inner.lock().map_err(|e| {
rustler::Error::Term(Box::new(format!(
"Could not unlock store_or_caller resource as the mutex was poisoned: {e}"
)))
})?);

let instance = link_and_create_instance(store_or_caller, &module, imports, linked_modules)?;
let resource = ResourceArc::new(InstanceResource {
inner: Mutex::new(instance),
});
Ok(resource)
}

fn link_and_create_instance(
store_or_caller: &mut StoreOrCaller,
module: &Module,
imports: MapIterator,
linked_modules: Vec<LinkedModule>,
) -> Result<Instance, Error> {
let mut linker = Linker::new(store_or_caller.engine());
if let Some(_wasi_ctx) = &store_or_caller.data().wasi {
linker.allow_shadowing(true);
wasi_common::sync::add_to_linker(&mut linker, |s: &mut StoreData| s.wasi.as_mut().unwrap())
.map_err(|err| Error::Term(Box::new(err.to_string())))?;
}

link_imports(store_or_caller.engine(), &mut linker, imports)?;
link_modules(&mut linker, store_or_caller, linked_modules)?;

linker
.instantiate(store_or_caller, module)
.map_err(|err| Error::Term(Box::new(err.to_string())))
) -> NifResult<(Atom, Atom)> {
// TODO: pass pid as parameter instead of hardcoding it
let pid = env.pid();
// create erlang environment for the thread
let imports = imports_from_map_iterator(imports)?;

send_async_nif_result(env, async move {
let mut store_or_caller = (store_or_caller_resource.inner.lock().map_err(|e| {
format!("Could not unlock store resource as the mutex was poisoned: {e}")
}))?;

let module = module_resource.inner.lock().map_err(|e| {
format!("Could not unlock module resource as the mutex was poisoned: {e}")
}).map(|module| module.clone())?;

let mut linker = Linker::new(store_or_caller.engine());
if let Some(_wasi_ctx) = &store_or_caller.data().wasi {
linker.allow_shadowing(true);
wasi_common::sync::add_to_linker(&mut linker, |s: &mut StoreData| s.wasi.as_mut().unwrap())
.map_err(|err| err.to_string())?;
}

link_imports(store_or_caller.engine(), &mut linker, &imports, &pid).map_err(|e| format!("{:?}", e))?;
link_modules(&mut linker, &mut *store_or_caller, linked_modules).map_err(|e| format!("{:?}", e))?;

linker
.instantiate_async(store_or_caller.as_context_mut(), &module)
.await
.map_err(|err| Error::Term(Box::new(err.to_string())))
.map(|instance| {
ResourceArc::new(InstanceResource {
inner: Mutex::new(instance),
})
})
.map(|instance_resource| (
atoms::returned_function_call(),
(atoms::ok(), instance_resource),
atoms::async_nif_result()
))
.map_err(|error| format!("{:?}", error))
})
}

#[rustler::nif(name = "instance_get_global_value", schedule = "DirtyCpu")]
Expand Down Expand Up @@ -207,7 +220,7 @@ pub fn function_export_exists(
Ok(result)
}

#[rustler::nif(name = "instance_call_exported_function", schedule = "DirtyCpu")]
#[rustler::nif(name = "instance_call_exported_function")]
pub fn call_exported_function(
env: rustler::Env,
store_or_caller_resource: ResourceArc<StoreOrCallerResource>,
Expand All @@ -223,7 +236,7 @@ pub fn call_exported_function(
let function_params = thread_env.save(params);
let from = thread_env.save(from);

thread::spawn(move || {
task::spawn(async move {
thread_env.send_and_clear(&pid, |thread_env| {
execute_function(
thread_env,
Expand Down
24 changes: 13 additions & 11 deletions native/wasmex/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
pub mod atoms;
pub mod caller;
pub mod engine;
pub mod environment;
pub mod functions;
pub mod instance;
pub mod memory;
pub mod module;
pub mod pipe;
pub mod printable_term_type;
pub mod store;
mod atoms;
mod caller;
mod engine;
mod environment;
mod functions;
mod instance;
mod memory;
mod module;
mod pipe;
mod printable_term_type;
mod store;
mod store_limits;
mod task;

rustler::init!("Elixir.Wasmex.Native");
Loading
Loading