diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 16222df..7e635d2 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -64,12 +64,9 @@ jobs: # Build for wasm32-wasi target - name: Build wasm32-wasi Target - run: | - for member in crates/wasm/ examples/water_bins/ss_client_wasm_v1/ examples/water_bins/echo_client/; do - cargo build --verbose --manifest-path $member/Cargo.toml --target wasm32-wasi - done + run: bash ./scripts/build_wasm_targets.sh env: RUSTFLAGS: --cfg tokio_unstable - name: Test - run: cargo test --verbose --workspace --all-features + run: cargo test --verbose --workspace --all-features \ No newline at end of file diff --git a/crates/wasm_v0/.cargo/config b/crates/wasm_v0/.cargo/config new file mode 100644 index 0000000..bca99d0 --- /dev/null +++ b/crates/wasm_v0/.cargo/config @@ -0,0 +1,5 @@ +[build] +target = "wasm32-wasi" + +[target.wasm32-wasi] +rustflags = [ "--cfg", "tokio_unstable"] diff --git a/crates/wasm_v0/Cargo.toml b/crates/wasm_v0/Cargo.toml new file mode 100644 index 0000000..03036fa --- /dev/null +++ b/crates/wasm_v0/Cargo.toml @@ -0,0 +1,28 @@ +[package] +name = "water-wasm-v0" +version = "0.1.0" +authors.workspace = true +description.workspace = true +edition.workspace = true + +[lib] +name = "water_wasm_v0" +path = "src/lib.rs" +crate-type = ["cdylib", "lib"] + +[dependencies] +tokio = { version = "1.33.0", default-features = false, features = ["fs", "net", "rt", "macros", "io-util", "io-std", "time", "sync"] } +tokio-util = { version = "0.7.1", features = ["codec"] } + +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0.107" +bincode = "1.3" + +anyhow = "1.0.7" +tracing = "0.1" +tracing-subscriber = "0.3.17" +toml = "0.5.9" +lazy_static = "1.4" +url = { version = "2.2.2", features = ["serde"] } +libc = "0.2.147" + diff --git a/crates/wasm_v0/src/common.rs b/crates/wasm_v0/src/common.rs new file mode 100644 index 0000000..ae945e7 --- /dev/null +++ b/crates/wasm_v0/src/common.rs @@ -0,0 +1,96 @@ +use std::os::fd::FromRawFd; +use tokio::net::TcpStream; + +// WASI Imports +extern "C" { + pub fn host_accept() -> i32; // obtain a connection (specified by returned fd) accepted by the host + pub fn host_dial() -> i32; // obtain a connection (specified by returned fd) dialed by the host + pub fn host_defer(); // call when exiting + #[allow(dead_code)] + pub fn pull_config() -> i32; // obtain a configuration file (specified by returned fd) from the host +} + +// enumerated constants for Role (i32) +// 0: unknown +// 1: dialer +// 2: listener +// 3: relay +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub enum Role { + Unknown = 0, + Dialer = 1, + Listener = 2, + Relay = 3, +} + +pub struct AsyncFdConn { + fd: i32, + temp_stream: Option, // used to hold the std tcp stream, will be upgraded to tokio stream later + stream: Option, +} + +impl Default for AsyncFdConn { + fn default() -> Self { + Self::new() + } +} + +impl AsyncFdConn { + pub fn new() -> Self { + AsyncFdConn { + fd: -1, + temp_stream: None, + stream: None, + } + } + + pub fn wrap(&mut self, fd: i32) -> Result<(), String> { + if self.fd > 0 { + return Err("already wrapped".to_string()); + } + if fd < 0 { + return Err("invalid fd".to_string()); + } + self.fd = fd; + println!("wrap: fd = {}", fd); + let stdstream = unsafe { std::net::TcpStream::from_raw_fd(fd) }; + + self.temp_stream = Some(stdstream); + // println!("wrap: stdstream = {:?}", stdstream); + // stdstream + // .set_nonblocking(true) + // .expect("Failed to set non-blocking"); + + // println!("wrap: stream = {:?}", stdstream); + // self.stream = + // Some(TcpStream::from_std(stdstream).expect("Failed to convert to tokio stream")); + // Ok(()) + Ok(()) + } + + pub fn tokio_upgrade(&mut self) -> Result<(), String> { + if self.fd < 0 { + return Err("invalid fd".to_string()); + } + let stdstream = self.temp_stream.take().unwrap(); + stdstream + .set_nonblocking(true) + .expect("Failed to set non-blocking"); + self.stream = + Some(TcpStream::from_std(stdstream).expect("Failed to convert to tokio stream")); + Ok(()) + } + + pub fn close(&mut self) { + if self.fd < 0 { + return; + } + let stream = self.stream.take().unwrap(); + drop(stream); + self.fd = -1; + } + + pub fn stream(&mut self) -> Option<&mut TcpStream> { + self.stream.as_mut() + } +} diff --git a/crates/wasm_v0/src/error.rs b/crates/wasm_v0/src/error.rs new file mode 100644 index 0000000..c816173 --- /dev/null +++ b/crates/wasm_v0/src/error.rs @@ -0,0 +1,20 @@ +// Error is a enum in i32 +#[allow(dead_code)] +#[derive(Debug, PartialEq, Eq, Copy, Clone)] +pub enum Error { + None = 0, + Unknown = -1, // general error + InvalidArgument = -2, // invalid argument supplied to func call + InvalidConfig = -3, // config file provided is invalid + InvalidFd = -4, // invalid file descriptor provided + InvalidFunction = -5, // invalid function called + DoubleInit = -6, // initializing twice + FailedIO = -7, // Failing an I/O operation + NotInitialized = -8, // not initialized +} + +impl Error { + pub fn i32(&self) -> i32 { + *self as i32 + } +} diff --git a/crates/wasm_v0/src/lib.rs b/crates/wasm_v0/src/lib.rs new file mode 100644 index 0000000..740e752 --- /dev/null +++ b/crates/wasm_v0/src/lib.rs @@ -0,0 +1,3 @@ +pub mod common; +pub mod error; +pub mod v0plus; diff --git a/crates/wasm_v0/src/v0plus.rs b/crates/wasm_v0/src/v0plus.rs new file mode 100644 index 0000000..c93a3a3 --- /dev/null +++ b/crates/wasm_v0/src/v0plus.rs @@ -0,0 +1,220 @@ +use crate::{common::*, error}; + +pub const VERSION: i32 = 0x00000000; // v0plus share the same version number with v0 + +pub struct Dialer { + caller_conn: AsyncFdConn, + remote_conn: AsyncFdConn, +} + +pub struct Listener { + caller_conn: AsyncFdConn, + source_conn: AsyncFdConn, +} +pub struct Relay { + source_conn: AsyncFdConn, + remote_conn: AsyncFdConn, +} + +impl Default for Dialer { + fn default() -> Self { + Self::new() + } +} + +impl Dialer { + pub fn new() -> Self { + Dialer { + caller_conn: AsyncFdConn::new(), + remote_conn: AsyncFdConn::new(), + } + } + + pub fn dial(&mut self, caller_conn_fd: i32) -> Result { + // check if caller_conn_fd is valid + if caller_conn_fd < 0 { + return Err("invalid caller_conn_fd".to_string()); + } + match self.caller_conn.wrap(caller_conn_fd) { + Ok(_) => {} + Err(e) => { + return Err(e); + } + } + + // call external dial() to get remote_conn_fd + let remote_conn_fd = unsafe { host_dial() }; + if remote_conn_fd < 0 { + return Err("dial failed".to_string()); + } + match self.remote_conn.wrap(remote_conn_fd) { + Ok(_) => {} + Err(e) => { + return Err(e); + } + } + + // return remote_conn_fd + Ok(remote_conn_fd) + } + + // // borrow self.caller_conn + // pub fn caller(&mut self) -> Option<&mut TcpStream> { + // self.caller_conn.stream() + // } + + // // borrow self.remote_conn + // pub fn remote(&mut self) -> Option<&mut TcpStream> { + // self.remote_conn.stream() + // } + + pub fn close(&mut self) { + self.caller_conn.close(); + self.remote_conn.close(); + unsafe { host_defer() }; + } +} + +impl Default for Listener { + fn default() -> Self { + Self::new() + } +} + +impl Listener { + pub fn new() -> Self { + Listener { + caller_conn: AsyncFdConn::new(), + source_conn: AsyncFdConn::new(), + } + } + + pub fn accept(&mut self, caller_conn_fd: i32) -> Result { + // check if caller_conn_fd is valid + if caller_conn_fd < 0 { + return Err("Listener: invalid caller_conn_fd".to_string()); + } + + match self.caller_conn.wrap(caller_conn_fd) { + Ok(_) => {} + Err(e) => { + return Err(e); + } + } + + // call external accept() to get source_conn_fd + let source_conn_fd = unsafe { host_accept() }; + if source_conn_fd < 0 { + return Err("Listener: accept failed".to_string()); + } + + match self.source_conn.wrap(source_conn_fd) { + Ok(_) => {} + Err(e) => { + return Err(e); + } + } + + // return source_conn_fd + Ok(source_conn_fd) + } + + // // borrow self.caller_conn + // pub fn caller(&mut self) -> Option<&mut TcpStream> { + // self.caller_conn.stream() + // } + + // // borrow self.source_conn + // pub fn source(&mut self) -> Option<&mut TcpStream> { + // self.source_conn.stream() + // } + + pub fn close(&mut self) { + self.caller_conn.close(); + self.source_conn.close(); + unsafe { host_defer() }; + } +} + +impl Default for Relay { + fn default() -> Self { + Self::new() + } +} + +impl Relay { + pub fn new() -> Self { + Relay { + source_conn: AsyncFdConn::new(), + remote_conn: AsyncFdConn::new(), + } + } + + pub fn associate(&mut self) -> Result { + // call external accept() to get source_conn_fd + let source_conn_fd = unsafe { host_accept() }; + if source_conn_fd < 0 { + return Err("Relay: accept failed".to_string()); + } + + match self.source_conn.wrap(source_conn_fd) { + Ok(_) => {} + Err(e) => { + return Err(e); + } + } + + // call external dial() to get remote_conn_fd + let remote_conn_fd = unsafe { host_dial() }; + if remote_conn_fd < 0 { + return Err("Relay: dial failed".to_string()); + } + match self.remote_conn.wrap(remote_conn_fd) { + Ok(_) => {} + Err(e) => { + return Err(e); + } + } + + // return remote_conn_fd + Ok(error::Error::None.i32()) + } + + // // borrow self.source_conn + // pub fn source(&mut self) -> Option<&mut TcpStream> { + // self.source_conn.stream() + // } + + // // borrow self.remote_conn + // pub fn remote(&mut self) -> Option<&mut TcpStream> { + // self.remote_conn.stream() + // } + + pub fn close(&mut self) { + self.source_conn.close(); + self.remote_conn.close(); + unsafe { host_defer() }; + } +} + +pub trait ConnPair { + fn conn_pair(&mut self) -> Option<(&mut AsyncFdConn, &mut AsyncFdConn)>; +} + +impl ConnPair for Dialer { + fn conn_pair(&mut self) -> Option<(&mut AsyncFdConn, &mut AsyncFdConn)> { + Some((&mut self.caller_conn, &mut self.remote_conn)) + } +} + +impl ConnPair for Listener { + fn conn_pair(&mut self) -> Option<(&mut AsyncFdConn, &mut AsyncFdConn)> { + Some((&mut self.caller_conn, &mut self.source_conn)) + } +} + +impl ConnPair for Relay { + fn conn_pair(&mut self) -> Option<(&mut AsyncFdConn, &mut AsyncFdConn)> { + Some((&mut self.source_conn, &mut self.remote_conn)) + } +} diff --git a/crates/water/src/config/mod.rs b/crates/water/src/config/mod.rs index d5630f4..ad17958 100644 --- a/crates/water/src/config/mod.rs +++ b/crates/water/src/config/mod.rs @@ -1,5 +1,6 @@ pub mod wasm_shared_config; +#[derive(Clone)] pub struct WATERConfig { pub filepath: String, pub entry_fn: String, diff --git a/crates/water/src/runtime/client.rs b/crates/water/src/runtime/client.rs index 91cc23c..c2041c7 100644 --- a/crates/water/src/runtime/client.rs +++ b/crates/water/src/runtime/client.rs @@ -81,6 +81,32 @@ impl WATERClient { }) } + pub fn keep_listen(&mut self) -> Result { + info!("[HOST] WATERClient keep listening...",); + + let water = match &mut self.stream { + WATERClientType::Listener(ref mut listener) => WATERClientType::Listener(Box::new( + v0::listener::WATERListener::migrate_listener(&self.config, listener.get_core())?, + ) + as Box), + WATERClientType::Relay(ref mut relay) => WATERClientType::Relay(Box::new( + v0::relay::WATERRelay::migrate_listener(&self.config, relay.get_core())?, + ) + as Box), + _ => { + return Err(anyhow::anyhow!( + "[HOST] This client is neither a Listener nor a Relay" + )); + } + }; + + Ok(WATERClient { + config: self.config.clone(), + debug: self.debug, + stream: water, + }) + } + pub fn set_debug(&mut self, debug: bool) { self.debug = debug; } @@ -163,16 +189,13 @@ impl WATERClient { match &mut self.stream { WATERClientType::Dialer(dialer) => dialer.run_entry_fn(&self.config), - WATERClientType::Listener(listener) => { - // TODO: clone listener here, since we are doing one WATM instance / accept - listener.run_entry_fn(&self.config) - } + WATERClientType::Listener(listener) => listener.run_entry_fn(&self.config), WATERClientType::Relay(relay) => relay.run_entry_fn(&self.config), _ => Err(anyhow::anyhow!("This client is not a Runner")), } } - // this will run the extry_fn(WATM) in the current thread -- replace Host when running + // this will run the entry_fn(WATM) in the current thread -- replace Host when running pub fn execute(&mut self) -> Result<(), anyhow::Error> { info!("[HOST] WATERClient Executing ..."); diff --git a/crates/water/src/runtime/core.rs b/crates/water/src/runtime/core.rs index e79b9ac..41bb68f 100644 --- a/crates/water/src/runtime/core.rs +++ b/crates/water/src/runtime/core.rs @@ -23,16 +23,20 @@ impl H2O { pub fn init(conf: &WATERConfig) -> Result { info!("[HOST] WATERCore H2O initing..."); - let mut wasm_config = wasmtime::Config::new(); - wasm_config.wasm_threads(true); + let wasm_config = wasmtime::Config::new(); + + #[cfg(feature = "multithread")] + { + wasm_config.wasm_threads(true); + } let engine = Engine::new(&wasm_config)?; - let mut linker: Linker = Linker::new(&engine); + let linker: Linker = Linker::new(&engine); let module = Module::from_file(&engine, &conf.filepath)?; let host = Host::default(); - let mut store = Store::new(&engine, host); + let store = Store::new(&engine, host); let mut error_occured = None; @@ -64,6 +68,17 @@ impl H2O { return Err(anyhow::Error::msg("WATM module version not found")); } + Self::create_core(conf, linker, store, module, engine, version) + } + + pub fn create_core( + conf: &WATERConfig, + mut linker: Linker, + mut store: Store, + module: Module, + engine: Engine, + version: Option, + ) -> Result { store.data_mut().preview1_ctx = Some(WasiCtxBuilder::new().inherit_stdio().build()); if store.data().preview1_ctx.is_none() { @@ -93,22 +108,9 @@ impl H2O { match &version { Some(Version::V0(ref config)) => match config { Some(v0_conf) => { - // let v0_conf = Arc::new(Mutex::new(v0_conf.clone())); v0::funcs::export_tcp_connect(&mut linker, Arc::clone(v0_conf))?; v0::funcs::export_accept(&mut linker, Arc::clone(v0_conf))?; v0::funcs::export_defer(&mut linker, Arc::clone(v0_conf))?; - - // // if client_type is Listen, then create a listener with the same config - // if conf.client_type == WaterBinType::Listen { - // match v0_conf.lock() { - // Ok(mut v0_conf) => { - // v0_conf.create_listener()?; - // } - // Err(e) => { - // return Err(anyhow::anyhow!("Failed to lock v0_conf: {}", e))?; - // } - // } - // } } None => { return Err(anyhow::anyhow!( @@ -148,6 +150,57 @@ impl H2O { }) } + // This function is for migrating the v0 core for listener and relay + // to handle every new connection is creating a new separate core (as v0 spec) + pub fn v0_migrate_core(conf: &WATERConfig, core: &H2O) -> Result { + info!("[HOST] WATERCore H2O v0_migrating..."); + + // reseting the listener accepted_fd or the relay's accepted_fd & dial_fd + // when migrating from existed listener / relay + let version = match &core.version { + Version::V0(v0conf) => { + match v0conf { + Some(og_v0_conf) => match og_v0_conf.lock() { + Ok(og_v0_conf) => { + let mut new_v0_conf_inner = og_v0_conf.clone(); + // reset the new cloned v0conf + new_v0_conf_inner.reset_listener_or_relay(); + + Version::V0(Some(Arc::new(Mutex::new(new_v0_conf_inner)))) + } + Err(e) => { + return Err(anyhow::anyhow!("Failed to lock v0_conf: {}", e))?; + } + }, + None => { + return Err(anyhow::anyhow!("v0_conf is None"))?; + } + } + } + _ => { + return Err(anyhow::anyhow!("This is not a V0 core"))?; + } + }; + + // NOTE: Some of the followings can reuse the existing core, leave to later explore + let wasm_config = wasmtime::Config::new(); + + #[cfg(feature = "multithread")] + { + wasm_config.wasm_threads(true); + } + + let engine = Engine::new(&wasm_config)?; + let linker: Linker = Linker::new(&engine); + + let module = Module::from_file(&engine, &conf.filepath)?; + + let host = Host::default(); + let store = Store::new(&engine, host); + + Self::create_core(conf, linker, store, module, engine, Some(version)) + } + pub fn _prepare(&mut self, conf: &WATERConfig) -> Result<(), anyhow::Error> { self._init(conf.debug)?; self._process_config(conf)?; // This is for now needed only by v1_preview diff --git a/crates/water/src/runtime/transport.rs b/crates/water/src/runtime/transport.rs index 86e079d..e0a6a0b 100644 --- a/crates/water/src/runtime/transport.rs +++ b/crates/water/src/runtime/transport.rs @@ -190,30 +190,4 @@ pub trait WATERTransportTrait: Send { Ok(handle) } - - // fn read(&mut self, _buf: &mut Vec) -> Result { - // Err(anyhow::anyhow!("Method not supported")) - // } - - // fn write(&mut self, _buf: &[u8]) -> Result<(), anyhow::Error> { - // Err(anyhow::anyhow!("Method not supported")) - // } - - // // v0 only - // fn cancel_with(&mut self, _conf: &WATERConfig) -> Result<(), anyhow::Error> { - // Err(anyhow::anyhow!("Method not supported")) - // } - - // // v0 only - // fn cancel(&mut self, _conf: &WATERConfig) -> Result<(), anyhow::Error> { - // Err(anyhow::anyhow!("Method not supported")) - // } - - // // v0 only - // fn run_entry_fn( - // &mut self, - // _conf: &WATERConfig, - // ) -> Result>, anyhow::Error> { - // Err(anyhow::anyhow!("Method not supported")) - // } } diff --git a/crates/water/src/runtime/v0/config.rs b/crates/water/src/runtime/v0/config.rs index 45b9617..22058cb 100644 --- a/crates/water/src/runtime/v0/config.rs +++ b/crates/water/src/runtime/v0/config.rs @@ -51,8 +51,8 @@ impl Config { pub enum V0CRole { Unknown, Dialer(i32), - Listener(i32), - Relay(i32, i32), // listener_fd, dialer_fd + Listener(i32, i32), // listener_fd, accepted_fd + Relay(i32, i32, i32), // listener_fd, accepted_fd, dialer_fd } // V0 specific configurations @@ -93,8 +93,12 @@ impl V0Config { info!("[HOST] WATERCore V0 connecting to {}", addr); match &mut self.conn { - V0CRole::Relay(_lis, ref mut conn_fd) => { + V0CRole::Relay(_, _, ref mut conn_fd) => { // now relay has been built, need to dial + if *conn_fd != -1 { + return Err(anyhow::Error::msg("Relay already connected")); + } + let conn = std::net::TcpStream::connect(addr)?; *conn_fd = conn.as_raw_fd(); Ok(conn) @@ -116,9 +120,9 @@ impl V0Config { let listener = std::net::TcpListener::bind(addr)?; if is_relay { - self.conn = V0CRole::Relay(listener.into_raw_fd(), 0); + self.conn = V0CRole::Relay(listener.into_raw_fd(), -1, -1); } else { - self.conn = V0CRole::Listener(listener.into_raw_fd()); + self.conn = V0CRole::Listener(listener.into_raw_fd(), -1); } Ok(()) } @@ -126,17 +130,30 @@ impl V0Config { pub fn accept(&mut self) -> Result { info!("[HOST] WATERCore V0 accept with conn {:?} ...", self.conn); - match &self.conn { - V0CRole::Listener(listener) => { - let listener = unsafe { std::net::TcpListener::from_raw_fd(*listener) }; + match self.conn { + V0CRole::Listener(ref mut listener_fd, ref mut accepted_fd) => { + if *accepted_fd != -1 { + return Err(anyhow::Error::msg("Listener already accepted")); + } + + let listener = unsafe { std::net::TcpListener::from_raw_fd(*listener_fd) }; + let (stream, _) = listener.accept()?; - self.conn = V0CRole::Listener(listener.into_raw_fd()); // makde sure it is not closed after scope + + *listener_fd = listener.into_raw_fd(); // makde sure the listener is not closed after scope + *accepted_fd = stream.as_raw_fd(); + Ok(stream) } - V0CRole::Relay(listener, _) => { - let listener = unsafe { std::net::TcpListener::from_raw_fd(*listener) }; + V0CRole::Relay(ref mut listener_fd, ref mut accepted_fd, _) => { + if *accepted_fd != -1 { + return Err(anyhow::Error::msg("Relay already accepted")); + } + + let listener = unsafe { std::net::TcpListener::from_raw_fd(*listener_fd) }; let (stream, _) = listener.accept()?; - self.conn = V0CRole::Relay(listener.into_raw_fd(), 0); // makde sure it is not closed after scope + *listener_fd = listener.into_raw_fd(); // makde sure the listener is not closed after scope + *accepted_fd = stream.as_raw_fd(); Ok(stream) } _ => Err(anyhow::Error::msg("not a listener")), @@ -146,22 +163,56 @@ impl V0Config { pub fn defer(&mut self) { info!("[HOST] WATERCore V0 defer with conn {:?} ...", self.conn); - match &self.conn { - V0CRole::Listener(_listener) => { - // TODO: Listener shouldn't be deferred, but the stream it connected to should be - // let listener = unsafe { std::net::TcpListener::from_raw_fd(*listener) }; - // drop(listener); + match self.conn { + V0CRole::Listener(_, ref mut accepted_fd) => { + // The accepted stream should be defered, not the listener + let accepted_conn = unsafe { std::net::TcpStream::from_raw_fd(*accepted_fd) }; + drop(accepted_conn); + *accepted_fd = -1; // set it back to default } - V0CRole::Dialer(conn) => { - let conn = unsafe { std::net::TcpStream::from_raw_fd(*conn) }; + V0CRole::Dialer(conn_fd) => { + let conn = unsafe { std::net::TcpStream::from_raw_fd(conn_fd) }; drop(conn); } - V0CRole::Relay(_listener, conn) => { - // Listener shouldn't be deferred, like the above reason - // let listener = unsafe { std::net::TcpListener::from_raw_fd(*listener) }; - // drop(listener); - let conn = unsafe { std::net::TcpStream::from_raw_fd(*conn) }; + V0CRole::Relay(_, ref mut accepted_fd, ref mut conn_fd) => { + let accepted_conn = unsafe { std::net::TcpStream::from_raw_fd(*accepted_fd) }; + drop(accepted_conn); + *accepted_fd = -1; // set it back to default + + let conn = unsafe { std::net::TcpStream::from_raw_fd(*conn_fd) }; drop(conn); + *conn_fd = -1; // set it back to default + } + _ => {} + } + } + + pub fn reset_listener_or_relay(&mut self) { + info!( + "[HOST] WATERCore v0 reset lisener / relay with conn {:?} ...", + self.conn + ); + + match self.conn { + V0CRole::Listener(_, ref mut accepted_fd) => { + if *accepted_fd != -1 { + let accepted_conn = unsafe { std::net::TcpStream::from_raw_fd(*accepted_fd) }; + drop(accepted_conn); + *accepted_fd = -1; // set it back to default + } + } + V0CRole::Relay(_, ref mut accepted_fd, ref mut conn_fd) => { + if *accepted_fd != -1 { + let accepted_conn = unsafe { std::net::TcpStream::from_raw_fd(*accepted_fd) }; + drop(accepted_conn); + *accepted_fd = -1; // set it back to default + } + + if *conn_fd != -1 { + let conn = unsafe { std::net::TcpStream::from_raw_fd(*conn_fd) }; + drop(conn); + *conn_fd = -1; // set it back to default + } } _ => {} } diff --git a/crates/water/src/runtime/v0/listener.rs b/crates/water/src/runtime/v0/listener.rs index 685f507..c87da97 100644 --- a/crates/water/src/runtime/v0/listener.rs +++ b/crates/water/src/runtime/v0/listener.rs @@ -129,4 +129,14 @@ impl WATERListener { Ok(runtime) } + + pub fn migrate_listener(_conf: &WATERConfig, core: &H2O) -> Result { + info!("[HOST] WATERListener v0 migrating listener..."); + + let mut new_core = + core::H2O::v0_migrate_core(_conf, core).context("Failed to migrate core")?; + new_core._prepare(_conf)?; + + WATERListener::init(_conf, new_core) + } } diff --git a/crates/water/src/runtime/v0/relay.rs b/crates/water/src/runtime/v0/relay.rs index b97fea1..cb437a6 100644 --- a/crates/water/src/runtime/v0/relay.rs +++ b/crates/water/src/runtime/v0/relay.rs @@ -108,4 +108,14 @@ impl WATERRelay { Ok(runtime) } + + pub fn migrate_listener(_conf: &WATERConfig, core: &H2O) -> Result { + info!("[HOST] WATERelay v0 migrating listener..."); + + let mut new_core = + core::H2O::v0_migrate_core(_conf, core).context("Failed to migrate core")?; + new_core._prepare(_conf)?; + + WATERRelay::init(_conf, new_core) + } } diff --git a/crates/water/src/runtime/v1/listener.rs b/crates/water/src/runtime/v1/listener.rs index b18fdcf..8992782 100644 --- a/crates/water/src/runtime/v1/listener.rs +++ b/crates/water/src/runtime/v1/listener.rs @@ -40,7 +40,7 @@ impl WATERTransportTrait for WATERListener { } } - let nums: i64 = match res.get(0) { + let nums: i64 = match res.first() { Some(wasmtime::Val::I64(v)) => *v, _ => { return Err(anyhow::Error::msg(format!( @@ -91,7 +91,7 @@ impl WATERTransportTrait for WATERListener { let mut res = vec![Val::I64(0)]; match self.writer.call(&mut *store, ¶ms, &mut res) { Ok(_) => { - match res.get(0) { + match res.first() { Some(wasmtime::Val::I64(v)) => { if *v != buf.len() as i64 { return Err(anyhow::Error::msg(format!( diff --git a/crates/water/src/runtime/v1/stream.rs b/crates/water/src/runtime/v1/stream.rs index a33c3a4..251eaf5 100644 --- a/crates/water/src/runtime/v1/stream.rs +++ b/crates/water/src/runtime/v1/stream.rs @@ -50,7 +50,7 @@ impl WATERTransportTrait for WATERStream { } } - let nums: i64 = match res.get(0) { + let nums: i64 = match res.first() { Some(wasmtime::Val::I64(v)) => *v, _ => { return Err(anyhow::Error::msg(format!( @@ -101,7 +101,7 @@ impl WATERTransportTrait for WATERStream { let mut res = vec![Val::I64(0)]; match self.writer.call(&mut *store, ¶ms, &mut res) { Ok(_) => { - match res.get(0) { + match res.first() { Some(wasmtime::Val::I64(v)) => { if *v != buf.len() as i64 { return Err(anyhow::Error::msg(format!( diff --git a/examples/water_bins/plain_v0/.cargo/config b/examples/water_bins/plain_v0/.cargo/config new file mode 100644 index 0000000..bca99d0 --- /dev/null +++ b/examples/water_bins/plain_v0/.cargo/config @@ -0,0 +1,5 @@ +[build] +target = "wasm32-wasi" + +[target.wasm32-wasi] +rustflags = [ "--cfg", "tokio_unstable"] diff --git a/examples/water_bins/plain_v0/Cargo.toml b/examples/water_bins/plain_v0/Cargo.toml new file mode 100644 index 0000000..c4d36b6 --- /dev/null +++ b/examples/water_bins/plain_v0/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "plain" +version = "0.1.0" +authors.workspace = true +description.workspace = true +edition.workspace = true +publish = false + +[lib] +name = "plain" +path = "src/lib.rs" +crate-type = ["cdylib"] + +[dependencies] +hex = "0.4.3" +lazy_static = "1.4" +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +tokio = { version = "1.33.0", default-features = false, features = ["fs", "net", "rt", "macros", "io-util", "io-std", "time", "sync"] } + +# water wasm lib import +water-wasm-v0 = { path = "../../../crates/wasm_v0/", version = "0.1.0" } \ No newline at end of file diff --git a/examples/water_bins/plain_v0/plain.wasm b/examples/water_bins/plain_v0/plain.wasm new file mode 100644 index 0000000..375787f Binary files /dev/null and b/examples/water_bins/plain_v0/plain.wasm differ diff --git a/examples/water_bins/plain_v0/src/lib.rs b/examples/water_bins/plain_v0/src/lib.rs new file mode 100644 index 0000000..3eb0b3b --- /dev/null +++ b/examples/water_bins/plain_v0/src/lib.rs @@ -0,0 +1,278 @@ +use water_wasm_v0::*; + +use lazy_static::lazy_static; +use std::ops::DerefMut; +use std::sync::{Arc, Mutex}; +use std::thread::sleep; +use std::time::Duration; +use tokio::{ + self, + io::{AsyncReadExt, AsyncWriteExt}, +}; +use v0plus::ConnPair; + +const READ_BUFFER_SIZE: usize = 1024; // 1KB is shorter than common MTU but longer than common TCP MSS + +lazy_static! { + static ref ROLE: Arc> = Arc::new(Mutex::new(common::Role::Unknown)); + static ref DIALER: Arc> = Arc::new(Mutex::new(v0plus::Dialer::new())); + static ref LISTENER: Arc> = + Arc::new(Mutex::new(v0plus::Listener::new())); + static ref RELAY: Arc> = Arc::new(Mutex::new(v0plus::Relay::new())); + static ref CANCEL: Arc> = + Arc::new(Mutex::new(common::AsyncFdConn::new())); +} + +#[export_name = "_water_v0"] +pub static VERSION: i32 = v0plus::VERSION; + +// version-independent API +#[export_name = "_water_init"] +pub fn _init() -> i32 { + // do all the initializing work here AND pull config from host + sleep(Duration::from_millis(10)); // sleep for 10ms + error::Error::None.i32() +} + +// V0 API +#[export_name = "_water_dial"] +pub fn _dial(caller_conn_fd: i32) -> i32 { + println!("Dialer: dialing..."); + + // check ROLE, if set, return -1 + let mut role = ROLE.lock().unwrap(); + if *role != common::Role::Unknown { + println!("_dial: role is already set to {:?}", *role); + return error::Error::DoubleInit.i32(); + } + + // set ROLE to Dialer + *role = common::Role::Dialer; + + let mut dialer = DIALER.lock().unwrap(); + match dialer.dial(caller_conn_fd) { + Ok(remote_conn_fd) => { + println!("Dialer: dial succeeded"); + remote_conn_fd + } + Err(e) => { + println!("Dialer: dial failed: {}", e); + error::Error::Unknown.i32() + } + } +} + +// V0 API +#[export_name = "_water_accept"] +pub fn _accept(caller_conn_fd: i32) -> i32 { + // check ROLE, if set, return -1 + let mut role = ROLE.lock().unwrap(); + if *role != common::Role::Unknown { + println!("_accept: role is already set to {:?}", *role); + return error::Error::DoubleInit.i32(); + } + + // set ROLE to Listener + *role = common::Role::Listener; + + let mut listener = LISTENER.lock().unwrap(); + match listener.accept(caller_conn_fd) { + Ok(source_conn_fd) => { + println!("Listener: accept succeeded"); + source_conn_fd + } + Err(e) => { + println!("Listener: listen failed: {}", e); + error::Error::Unknown.i32() + } + } +} + +// V0+ API +#[export_name = "_water_associate"] +pub fn _associate() -> i32 { + // check ROLE, if set, return -1 + let mut role = ROLE.lock().unwrap(); + if *role != common::Role::Unknown { + println!("_accept: role is already set to {:?}", *role); + return error::Error::DoubleInit.i32(); + } + + // set ROLE to Relay + *role = common::Role::Relay; + + let mut relay = RELAY.lock().unwrap(); + + match relay.associate() { + Ok(_) => { + println!("Relay: associate succeeded"); + error::Error::None.i32() + } + Err(e) => { + println!("Relay: associate failed: {}", e); + error::Error::Unknown.i32() + } + } +} + +// V0+ API +#[export_name = "_water_cancel_with"] +pub fn _cancel_with(fd: i32) -> i32 { + // check ROLE, if not set, return -1 + let role = ROLE.lock().unwrap(); + if *role == common::Role::Unknown { + println!("_cancel_with: role is not set"); + return error::Error::NotInitialized.i32(); + } + + // check CANCEL, if set, return -1 + let mut cancel = CANCEL.lock().unwrap(); + // set CANCEL + match cancel.wrap(fd) { + Ok(_) => { + println!("_cancel_with: cancel set to {}", fd); + error::Error::None.i32() + } + Err(e) => { + println!("_cancel_with: cancel set failed: {}", e); + error::Error::Unknown.i32() + } + } +} + +/// WASM Entry point here +#[export_name = "_water_worker"] +pub fn _worker() -> i32 { + // borrow CANCEL as &mut AsyncFdConn + let mut cancel = CANCEL.lock().unwrap(); + let cancel = cancel.deref_mut(); + + // check role + let role = ROLE.lock().unwrap(); + match *role { + common::Role::Dialer => { + let mut dialer = DIALER.lock().unwrap(); + let conn_pair = dialer.conn_pair().expect("Dialer: conn_pair is None"); + let caller = conn_pair.0; + let remote = conn_pair.1; + match bidi_worker(remote, caller, cancel) { + Ok(_) => { + dialer.close(); + error::Error::None.i32() + } + Err(e) => { + println!("Dialer: bidi_worker failed: {}", e); + dialer.close(); + error::Error::FailedIO.i32() + } + } + } + common::Role::Listener => { + let mut listener = LISTENER.lock().unwrap(); + let conn_pair = listener.conn_pair().expect("Listener: conn_pair is None"); + let caller = conn_pair.0; + let source = conn_pair.1; + match bidi_worker(source, caller, cancel) { + Ok(_) => { + listener.close(); + error::Error::None.i32() + } + Err(e) => { + println!("Listener: bidi_worker failed: {}", e); + listener.close(); + error::Error::FailedIO.i32() + } + } + } + common::Role::Relay => { + let mut relay = RELAY.lock().unwrap(); + let conn_pair = relay.conn_pair().expect("Relay: conn_pair is None"); + let source = conn_pair.0; + let remote = conn_pair.1; + match bidi_worker(remote, source, cancel) { + Ok(_) => { + relay.close(); + error::Error::None.i32() + } + Err(e) => { + println!("Relay: bidi_worker failed: {}", e); + relay.close(); + error::Error::FailedIO.i32() + } + } + } + _ => { + println!("_worker: role is not set"); + error::Error::NotInitialized.i32() + } + } +} + +#[tokio::main(flavor = "current_thread")] +async fn bidi_worker( + dst: &mut common::AsyncFdConn, + src: &mut common::AsyncFdConn, + cancel: &mut common::AsyncFdConn, +) -> std::io::Result<()> { + // upgrade to AsyncFdConn + dst.tokio_upgrade().expect("dst upgrade failed"); + src.tokio_upgrade().expect("src upgrade failed"); + cancel.tokio_upgrade().expect("cancel upgrade failed"); + + let dst: &mut tokio::net::TcpStream = dst.stream().expect("dst stream is None"); + let src: &mut tokio::net::TcpStream = src.stream().expect("src stream is None"); + let cancel: &mut tokio::net::TcpStream = cancel.stream().expect("cancel stream is None"); + + // dst.set_nodelay(true).expect("dst set_nodelay failed"); + // src.set_nodelay(true).expect("src set_nodelay failed"); + + let mut dst_buf = vec![0; READ_BUFFER_SIZE]; + let mut src_buf = vec![0; READ_BUFFER_SIZE]; + let mut cancel_buf = vec![0; 256]; + + loop { + tokio::select! { + result = dst.read(&mut dst_buf) => { + // println!("dst.read() result = {:?}", result); + match result { + Ok(0) => break, // End of stream + Ok(n) => { + if let Err(e) = src.write_all(&dst_buf[0..n]).await { + println!("Error writing to src: {:?}", e); + return Err(e); + } + } + Err(e) => { + println!("Error reading from dst: {:?}", e); + return Err(e); + } + } + } + + result = src.read(&mut src_buf) => { + // println!("src.read() result = {:?}", result); + match result { + Ok(0) => break, // End of stream + Ok(n) => { + if let Err(e) = dst.write_all(&src_buf[0..n]).await { + println!("Error writing to dst: {:?}", e); + return Err(e); + } + } + Err(e) => { + println!("Error reading from src: {:?}", e); + return Err(e); + } + } + } + + result = cancel.read(&mut cancel_buf) => { + println!("cancel.read() result = {:?}", result); + // exit + break; + } + } + } + + Ok(()) +} diff --git a/examples/water_bins/reverse_v0/.cargo/config b/examples/water_bins/reverse_v0/.cargo/config new file mode 100644 index 0000000..bca99d0 --- /dev/null +++ b/examples/water_bins/reverse_v0/.cargo/config @@ -0,0 +1,5 @@ +[build] +target = "wasm32-wasi" + +[target.wasm32-wasi] +rustflags = [ "--cfg", "tokio_unstable"] diff --git a/examples/water_bins/reverse_v0/Cargo.toml b/examples/water_bins/reverse_v0/Cargo.toml new file mode 100644 index 0000000..6a38f52 --- /dev/null +++ b/examples/water_bins/reverse_v0/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "reverse" +version = "0.2.0" +authors.workspace = true +description.workspace = true +edition.workspace = true +publish = false + +[lib] +name = "reverse" +path = "src/lib.rs" +crate-type = ["cdylib"] + +[dependencies] +hex = "0.4.3" +lazy_static = "1.4" +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +tokio = { version = "1.33.0", default-features = false, features = ["fs", "net", "rt", "macros", "io-util", "io-std", "time", "sync"] } + +# water wasm lib import +water-wasm-v0 = { path = "../../../crates/wasm_v0/", version = "0.1.0" } \ No newline at end of file diff --git a/examples/water_bins/reverse_v0/reverse.wasm b/examples/water_bins/reverse_v0/reverse.wasm new file mode 100644 index 0000000..f304c5f Binary files /dev/null and b/examples/water_bins/reverse_v0/reverse.wasm differ diff --git a/examples/water_bins/reverse_v0/src/lib.rs b/examples/water_bins/reverse_v0/src/lib.rs new file mode 100644 index 0000000..9798746 --- /dev/null +++ b/examples/water_bins/reverse_v0/src/lib.rs @@ -0,0 +1,280 @@ +use water_wasm_v0::*; + +use lazy_static::lazy_static; +use std::ops::DerefMut; +use std::sync::{Arc, Mutex}; +use std::thread::sleep; +use std::time::Duration; +use tokio::{ + self, + io::{AsyncReadExt, AsyncWriteExt}, +}; +use v0plus::ConnPair; + +const READ_BUFFER_SIZE: usize = 1024; // 1KB is shorter than common MTU but longer than common TCP MSS + +lazy_static! { + static ref ROLE: Arc> = Arc::new(Mutex::new(common::Role::Unknown)); + static ref DIALER: Arc> = Arc::new(Mutex::new(v0plus::Dialer::new())); + static ref LISTENER: Arc> = + Arc::new(Mutex::new(v0plus::Listener::new())); + static ref RELAY: Arc> = Arc::new(Mutex::new(v0plus::Relay::new())); + static ref CANCEL: Arc> = + Arc::new(Mutex::new(common::AsyncFdConn::new())); +} + +#[export_name = "_water_v0"] +pub static VERSION: i32 = v0plus::VERSION; + +// version-independent API +#[export_name = "_water_init"] +pub fn _init() -> i32 { + // do all the initializing work here AND pull config from host + sleep(Duration::from_millis(10)); // sleep for 10ms + error::Error::None.i32() +} + +// V0 API +#[export_name = "_water_dial"] +pub fn _dial(caller_conn_fd: i32) -> i32 { + println!("Dialer: dialing..."); + + // check ROLE, if set, return -1 + let mut role = ROLE.lock().unwrap(); + if *role != common::Role::Unknown { + println!("_dial: role is already set to {:?}", *role); + return error::Error::DoubleInit.i32(); + } + + // set ROLE to Dialer + *role = common::Role::Dialer; + + let mut dialer = DIALER.lock().unwrap(); + match dialer.dial(caller_conn_fd) { + Ok(remote_conn_fd) => { + println!("Dialer: dial succeeded"); + remote_conn_fd + } + Err(e) => { + println!("Dialer: dial failed: {}", e); + error::Error::Unknown.i32() + } + } +} + +// V0 API +#[export_name = "_water_accept"] +pub fn _accept(caller_conn_fd: i32) -> i32 { + // check ROLE, if set, return -1 + let mut role = ROLE.lock().unwrap(); + if *role != common::Role::Unknown { + println!("_accept: role is already set to {:?}", *role); + return error::Error::DoubleInit.i32(); + } + + // set ROLE to Listener + *role = common::Role::Listener; + + let mut listener = LISTENER.lock().unwrap(); + match listener.accept(caller_conn_fd) { + Ok(source_conn_fd) => { + println!("Listener: accept succeeded"); + source_conn_fd + } + Err(e) => { + println!("Listener: listen failed: {}", e); + error::Error::Unknown.i32() + } + } +} + +// V0+ API +#[export_name = "_water_associate"] +pub fn _associate() -> i32 { + // check ROLE, if set, return -1 + let mut role = ROLE.lock().unwrap(); + if *role != common::Role::Unknown { + println!("_accept: role is already set to {:?}", *role); + return error::Error::DoubleInit.i32(); + } + + // set ROLE to Relay + *role = common::Role::Relay; + + let mut relay = RELAY.lock().unwrap(); + + match relay.associate() { + Ok(_) => { + println!("Relay: associate succeeded"); + error::Error::None.i32() + } + Err(e) => { + println!("Relay: associate failed: {}", e); + error::Error::Unknown.i32() + } + } +} + +// V0+ API +#[export_name = "_water_cancel_with"] +pub fn _cancel_with(fd: i32) -> i32 { + // check ROLE, if not set, return -1 + let role = ROLE.lock().unwrap(); + if *role == common::Role::Unknown { + println!("_cancel_with: role is not set"); + return error::Error::NotInitialized.i32(); + } + + // check CANCEL, if set, return -1 + let mut cancel = CANCEL.lock().unwrap(); + // set CANCEL + match cancel.wrap(fd) { + Ok(_) => { + println!("_cancel_with: cancel set to {}", fd); + error::Error::None.i32() + } + Err(e) => { + println!("_cancel_with: cancel set failed: {}", e); + error::Error::Unknown.i32() + } + } +} + +/// WASM Entry point here +#[export_name = "_water_worker"] +pub fn _worker() -> i32 { + // borrow CANCEL as &mut AsyncFdConn + let mut cancel = CANCEL.lock().unwrap(); + let cancel = cancel.deref_mut(); + + // check role + let role = ROLE.lock().unwrap(); + match *role { + common::Role::Dialer => { + let mut dialer = DIALER.lock().unwrap(); + let conn_pair = dialer.conn_pair().expect("Dialer: conn_pair is None"); + let caller = conn_pair.0; + let remote = conn_pair.1; + match bidi_worker(remote, caller, cancel) { + Ok(_) => { + dialer.close(); + error::Error::None.i32() + } + Err(e) => { + println!("Dialer: bidi_worker failed: {}", e); + dialer.close(); + error::Error::FailedIO.i32() + } + } + } + common::Role::Listener => { + let mut listener = LISTENER.lock().unwrap(); + let conn_pair = listener.conn_pair().expect("Listener: conn_pair is None"); + let caller = conn_pair.0; + let source = conn_pair.1; + match bidi_worker(source, caller, cancel) { + Ok(_) => { + listener.close(); + error::Error::None.i32() + } + Err(e) => { + println!("Listener: bidi_worker failed: {}", e); + listener.close(); + error::Error::FailedIO.i32() + } + } + } + common::Role::Relay => { + let mut relay = RELAY.lock().unwrap(); + let conn_pair = relay.conn_pair().expect("Relay: conn_pair is None"); + let source = conn_pair.0; + let remote = conn_pair.1; + match bidi_worker(remote, source, cancel) { + Ok(_) => { + relay.close(); + error::Error::None.i32() + } + Err(e) => { + println!("Relay: bidi_worker failed: {}", e); + relay.close(); + error::Error::FailedIO.i32() + } + } + } + _ => { + println!("_worker: role is not set"); + error::Error::NotInitialized.i32() + } + } +} + +#[tokio::main(flavor = "current_thread")] +async fn bidi_worker( + dst: &mut common::AsyncFdConn, + src: &mut common::AsyncFdConn, + cancel: &mut common::AsyncFdConn, +) -> std::io::Result<()> { + // upgrade to AsyncFdConn + dst.tokio_upgrade().expect("dst upgrade failed"); + src.tokio_upgrade().expect("src upgrade failed"); + cancel.tokio_upgrade().expect("cancel upgrade failed"); + + let dst: &mut tokio::net::TcpStream = dst.stream().expect("dst stream is None"); + let src: &mut tokio::net::TcpStream = src.stream().expect("src stream is None"); + let cancel: &mut tokio::net::TcpStream = cancel.stream().expect("cancel stream is None"); + + // dst.set_nodelay(true).expect("dst set_nodelay failed"); + // src.set_nodelay(true).expect("src set_nodelay failed"); + + let mut dst_buf = vec![0; READ_BUFFER_SIZE]; + let mut src_buf = vec![0; READ_BUFFER_SIZE]; + let mut cancel_buf = vec![0; 256]; + + loop { + tokio::select! { + result = dst.read(&mut dst_buf) => { + // println!("dst.read() result = {:?}", result); + match result { + Ok(0) => break, // End of stream + Ok(n) => { + dst_buf[0..n].reverse(); + if let Err(e) = src.write_all(&dst_buf[0..n]).await { + println!("Error writing to src: {:?}", e); + return Err(e); + } + } + Err(e) => { + println!("Error reading from dst: {:?}", e); + return Err(e); + } + } + } + + result = src.read(&mut src_buf) => { + // println!("src.read() result = {:?}", result); + match result { + Ok(0) => break, // End of stream + Ok(n) => { + src_buf[0..n].reverse(); + if let Err(e) = dst.write_all(&src_buf[0..n]).await { + println!("Error writing to dst: {:?}", e); + return Err(e); + } + } + Err(e) => { + println!("Error reading from src: {:?}", e); + return Err(e); + } + } + } + + result = cancel.read(&mut cancel_buf) => { + println!("cancel.read() result = {:?}", result); + // exit + break; + } + } + } + + Ok(()) +} diff --git a/plain.wasm b/plain.wasm deleted file mode 100644 index 77e1350..0000000 Binary files a/plain.wasm and /dev/null differ diff --git a/scripts/build_wasm_targets.sh b/scripts/build_wasm_targets.sh new file mode 100644 index 0000000..5fd4111 --- /dev/null +++ b/scripts/build_wasm_targets.sh @@ -0,0 +1,16 @@ +#!/bin/bash +set -e # Exit on any error + +members=( + "crates/wasm" + "examples/water_bins/ss_client_wasm_v1" + "examples/water_bins/echo_client" + "examples/water_bins/plain_v0" + "examples/water_bins/reverse_v0" +) + +for member in "${members[@]}"; do + pushd $member + cargo build --verbose --target wasm32-wasi + popd +done \ No newline at end of file diff --git a/tests/test_wasm/plain.wasm b/tests/test_wasm/plain.wasm index 8c5a90b..77e1350 100644 Binary files a/tests/test_wasm/plain.wasm and b/tests/test_wasm/plain.wasm differ diff --git a/tests/test_wasm/ss_client_wasm.wasm b/tests/test_wasm/ss_client_wasm.wasm old mode 100644 new mode 100755 index 11e5042..2e65958 Binary files a/tests/test_wasm/ss_client_wasm.wasm and b/tests/test_wasm/ss_client_wasm.wasm differ diff --git a/tests/tests/cross_lang_tests.rs b/tests/tests/cross_lang_tests.rs index fe341a1..ac06172 100644 --- a/tests/tests/cross_lang_tests.rs +++ b/tests/tests/cross_lang_tests.rs @@ -177,3 +177,58 @@ fn test_cross_lang_wasm_listener() -> Result<(), Box> { Ok(()) } + +// #[test] +// fn test_cross_lang_wasm_multi_listener() -> Result<(), Box> { +// // tracing_subscriber::fmt().with_max_level(Level::INFO).init(); + +// let cfg_str = r#" +// { +// "remote_address": "127.0.0.1", +// "remote_port": 8088, +// "local_address": "127.0.0.1", +// "local_port": 8084 +// } +// "#; +// // Create a directory inside of `std::env::temp_dir()`. +// let dir = tempdir()?; +// let file_path = dir.path().join("temp-config.txt"); +// let mut file = File::create(&file_path)?; +// writeln!(file, "{}", cfg_str)?; + +// let conf = config::WATERConfig::init( +// // plain.wasm is in v0 and fully compatible with the Go engine +// // More details for the Go-side of running plain.wasm check here: +// // https://github.com/gaukas/water/tree/master/examples/v0/plain +// // +// // More details for the implementation of plain.wasm check this PR: +// // https://github.com/erikziyunchi/water-rs/pull/10 +// // +// String::from("./test_wasm/plain.wasm"), +// String::from("_water_worker"), +// String::from(file_path.to_string_lossy()), +// config::WaterBinType::Listen, +// true, +// ) +// .unwrap(); + +// let mut water_client = runtime::client::WATERClient::new(conf).unwrap(); +// water_client.listen().unwrap(); + +// water_client.accept().unwrap(); +// water_client.cancel_with().unwrap(); +// let mut handler = water_client.run_worker().unwrap(); + +// for i in 0..5 { +// handler.join().unwrap(); +// let mut new_water = water_client.keep_listen().unwrap(); +// new_water.accept().unwrap(); +// new_water.cancel_with().unwrap(); +// handler = new_water.run_worker().unwrap(); +// } + +// drop(file); +// dir.close()?; + +// Ok(()) +// } diff --git a/tests/tests/spinning_relay.rs b/tests/tests/spinning_relay.rs index cedd557..366218c 100644 --- a/tests/tests/spinning_relay.rs +++ b/tests/tests/spinning_relay.rs @@ -158,7 +158,26 @@ fn spin_cross_lang_wasm_relay() -> Result<(), Box> { water_client.associate().unwrap(); water_client.cancel_with().unwrap(); - let handle_water = water_client.run_worker().unwrap(); + let mut handle_water = water_client.run_worker().unwrap(); + + for _i in 0..5 { + match handle_water.join().unwrap() { + Ok(_) => {} + Err(e) => { + eprintln!("Running _water_worker ERROR: {}", e); + return Err(Box::new(Error::new( + ErrorKind::Other, + "Failed to join _water_worker thread", + ))); + } + }; + + let mut new_water = water_client.keep_listen().unwrap(); + // no need to call relay again, since relay() is also creating the listener + new_water.associate().unwrap(); + new_water.cancel_with().unwrap(); + handle_water = new_water.run_worker().unwrap(); + } std::thread::sleep(std::time::Duration::from_secs(20)); @@ -166,16 +185,6 @@ fn spin_cross_lang_wasm_relay() -> Result<(), Box> { drop(file); dir.close()?; - match handle_water.join().unwrap() { - Ok(_) => {} - Err(e) => { - eprintln!("Running _water_worker ERROR: {}", e); - return Err(Box::new(Error::new( - ErrorKind::Other, - "Failed to join _water_worker thread", - ))); - } - }; Ok(()) } diff --git a/tests/tests/ss_testing.rs b/tests/tests/ss_testing.rs index 8e7fa1d..cfb0e1b 100644 --- a/tests/tests/ss_testing.rs +++ b/tests/tests/ss_testing.rs @@ -121,7 +121,7 @@ async fn wasm_managed_shadowsocks_async() -> Result<(), Box Result<(), Box Result<(), Box