Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CLI tool update #20

Merged
merged 36 commits into from
Jan 10, 2024
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
0063bb6
update: change namespaces to prefix
Oct 23, 2023
582d24d
update: v0+ API matching + plain runnable with Dialer
Oct 30, 2023
0302fa0
update: lint
Oct 30, 2023
d31964c
fix: fmt
Oct 30, 2023
400eb32
fix: clippy
Oct 30, 2023
177cbc0
fix: resolve conflicts from the prev PR
Nov 1, 2023
f2f1d72
update: keep the library code safe
Nov 1, 2023
fdec4f1
update: cleaning code + more sensible comments
erikziyunchi Nov 5, 2023
330f987
fix: clippy
erikziyunchi Nov 5, 2023
1e5f376
update: remove dup .wasm + link go-side runner for plain.wasm & PR fo…
erikziyunchi Nov 5, 2023
493061d
update: adding listener(base), tests, refactoring
erikziyunchi Nov 9, 2023
8447289
fix: fmt
erikziyunchi Nov 9, 2023
044eff6
fix: clippy
erikziyunchi Nov 9, 2023
cfddea1
fix: conflict
erikziyunchi Nov 9, 2023
20b832d
update: add relay, test
erikziyunchi Nov 10, 2023
8bbc3c7
fix: sleep 1 sec before terminating relay to allow the test finish + …
erikziyunchi Nov 10, 2023
d8bfa5f
fix: clippy & give a bit more time before terminate for relay test
erikziyunchi Nov 10, 2023
c2010a5
fix: isolate relay test with another test file
erikziyunchi Nov 10, 2023
3f569a3
fix: fmt
erikziyunchi Nov 10, 2023
90b5d93
fix: change port in relay test
erikziyunchi Nov 10, 2023
f6e7b46
update: add multi listener for v0
erikziyunchi Dec 18, 2023
cd43711
update: lint
erikziyunchi Dec 18, 2023
9e38b7a
fix: resolve conflict
erikziyunchi Dec 19, 2023
aa3bed6
fix: lint
erikziyunchi Dec 19, 2023
b169832
fix: new ss wasm binary + change port in test
erikziyunchi Dec 19, 2023
664e389
fix: fmt
erikziyunchi Dec 19, 2023
799932c
update: working cli (v1 runner mode)
erikziyunchi Dec 20, 2023
f2ac547
fmt + lint
erikziyunchi Dec 20, 2023
128a35b
resolve conflict
erikziyunchi Jan 9, 2024
a108dd8
update: cli tool
erikziyunchi Jan 9, 2024
c422064
fix: fmt
erikziyunchi Jan 9, 2024
a6fe4e8
update: fix small v0 multi listener bug + v0 multi listener tests + m…
erikziyunchi Jan 9, 2024
2a575d4
fix: modify test for v0 multi listener
erikziyunchi Jan 10, 2024
54162d2
update: cli tool for all roles, docs
erikziyunchi Jan 10, 2024
ac599e9
clean up
erikziyunchi Jan 10, 2024
beab99c
remove 🚧 in cli readme [skip ci]
erikziyunchi Jan 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions crates/water/src/config/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
pub mod wasm_shared_config;

/// WATER configuration
#[derive(Clone)]
pub struct WATERConfig {
pub filepath: String,
pub entry_fn: String,
Expand Down Expand Up @@ -27,24 +29,25 @@ impl WATERConfig {
}
}

/// WATER client type
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub enum WaterBinType {
Unknown,
Wrap,
Dial,
Listen,
Relay,
Runner,
Wrap,
Unknown,
}

impl From<u32> for WaterBinType {
fn from(num: u32) -> Self {
match num {
0 => WaterBinType::Dial,
1 => WaterBinType::Listen,
2 => WaterBinType::Runner,
3 => WaterBinType::Wrap,
4 => WaterBinType::Relay,
2 => WaterBinType::Relay,
3 => WaterBinType::Runner,
4 => WaterBinType::Wrap,
_ => WaterBinType::Unknown,
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/water/src/globals.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#![allow(dead_code)]

pub const WASM_PATH: &str = "./proxy.wasm";
pub const CONFIG_WASM_PATH: &str = "./conf.json";
pub const CONFIG_WASM_PATH: &str = "./config.json";

pub const MAIN: &str = "main";
pub const VERSION_FN: &str = "_water_version";
Expand Down
33 changes: 28 additions & 5 deletions crates/water/src/runtime/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,32 @@ impl WATERClient {
})
}

pub fn keep_listen(&mut self) -> Result<Self, anyhow::Error> {
info!("[HOST] WATERClient keep listening...",);

let water = match &mut self.stream {
WATERClientType::Listener(ref mut listener) => WATERClientType::Listener(Box::new(
v0::listener::WATERListener::migrate_listener(&self.config, listener.get_core())?,
)
as Box<dyn WATERListenerTrait>),
WATERClientType::Relay(ref mut relay) => WATERClientType::Relay(Box::new(
v0::relay::WATERRelay::migrate_listener(&self.config, relay.get_core())?,
)
as Box<dyn WATERRelayTrait>),
_ => {
return Err(anyhow::anyhow!(
"[HOST] This client is neither a Listener nor a Relay"
));
}
};

Ok(WATERClient {
config: self.config.clone(),
debug: self.debug,
stream: water,
})
}

pub fn set_debug(&mut self, debug: bool) {
self.debug = debug;
}
Expand Down Expand Up @@ -163,16 +189,13 @@ impl WATERClient {

match &mut self.stream {
WATERClientType::Dialer(dialer) => dialer.run_entry_fn(&self.config),
WATERClientType::Listener(listener) => {
// TODO: clone listener here, since we are doing one WATM instance / accept
listener.run_entry_fn(&self.config)
}
WATERClientType::Listener(listener) => listener.run_entry_fn(&self.config),
WATERClientType::Relay(relay) => relay.run_entry_fn(&self.config),
_ => Err(anyhow::anyhow!("This client is not a Runner")),
}
}

// this will run the extry_fn(WATM) in the current thread -- replace Host when running
// this will run the entry_fn(WATM) in the current thread -- replace Host when running
pub fn execute(&mut self) -> Result<(), anyhow::Error> {
info!("[HOST] WATERClient Executing ...");

Expand Down
87 changes: 70 additions & 17 deletions crates/water/src/runtime/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,20 @@ impl H2O<Host> {
pub fn init(conf: &WATERConfig) -> Result<Self, anyhow::Error> {
info!("[HOST] WATERCore H2O initing...");

let mut wasm_config = wasmtime::Config::new();
wasm_config.wasm_threads(true);
let wasm_config = wasmtime::Config::new();

#[cfg(feature = "multithread")]
{
wasm_config.wasm_threads(true);
}

let engine = Engine::new(&wasm_config)?;
let mut linker: Linker<Host> = Linker::new(&engine);
let linker: Linker<Host> = Linker::new(&engine);

let module = Module::from_file(&engine, &conf.filepath)?;

let host = Host::default();
let mut store = Store::new(&engine, host);
let store = Store::new(&engine, host);

let mut error_occured = None;

Expand Down Expand Up @@ -64,6 +68,17 @@ impl H2O<Host> {
return Err(anyhow::Error::msg("WATM module version not found"));
}

Self::create_core(conf, linker, store, module, engine, version)
}

pub fn create_core(
conf: &WATERConfig,
mut linker: Linker<Host>,
mut store: Store<Host>,
module: Module,
engine: Engine,
version: Option<Version>,
) -> Result<Self, anyhow::Error> {
store.data_mut().preview1_ctx = Some(WasiCtxBuilder::new().inherit_stdio().build());

if store.data().preview1_ctx.is_none() {
Expand Down Expand Up @@ -93,22 +108,9 @@ impl H2O<Host> {
match &version {
Some(Version::V0(ref config)) => match config {
Some(v0_conf) => {
// let v0_conf = Arc::new(Mutex::new(v0_conf.clone()));
v0::funcs::export_tcp_connect(&mut linker, Arc::clone(v0_conf))?;
v0::funcs::export_accept(&mut linker, Arc::clone(v0_conf))?;
v0::funcs::export_defer(&mut linker, Arc::clone(v0_conf))?;

// // if client_type is Listen, then create a listener with the same config
// if conf.client_type == WaterBinType::Listen {
// match v0_conf.lock() {
// Ok(mut v0_conf) => {
// v0_conf.create_listener()?;
// }
// Err(e) => {
// return Err(anyhow::anyhow!("Failed to lock v0_conf: {}", e))?;
// }
// }
// }
}
None => {
return Err(anyhow::anyhow!(
Expand Down Expand Up @@ -148,6 +150,57 @@ impl H2O<Host> {
})
}

// This function is for migrating the v0 core for listener and relay
// to handle every new connection is creating a new separate core (as v0 spec)
pub fn v0_migrate_core(conf: &WATERConfig, core: &H2O<Host>) -> Result<Self, anyhow::Error> {
info!("[HOST] WATERCore H2O v0_migrating...");

// reseting the listener accepted_fd or the relay's accepted_fd & dial_fd
// when migrating from existed listener / relay
let version = match &core.version {
Version::V0(v0conf) => {
match v0conf {
Some(og_v0_conf) => match og_v0_conf.lock() {
Ok(og_v0_conf) => {
let mut new_v0_conf_inner = og_v0_conf.clone();
// reset the new cloned v0conf
new_v0_conf_inner.reset_listener_or_relay();

Version::V0(Some(Arc::new(Mutex::new(new_v0_conf_inner))))
}
Err(e) => {
return Err(anyhow::anyhow!("Failed to lock v0_conf: {}", e))?;
}
},
None => {
return Err(anyhow::anyhow!("v0_conf is None"))?;
}
}
}
_ => {
return Err(anyhow::anyhow!("This is not a V0 core"))?;
}
};

// NOTE: Some of the followings can reuse the existing core, leave to later explore
let wasm_config = wasmtime::Config::new();

#[cfg(feature = "multithread")]
{
wasm_config.wasm_threads(true);
}

let engine = Engine::new(&wasm_config)?;
let linker: Linker<Host> = Linker::new(&engine);

let module = Module::from_file(&engine, &conf.filepath)?;

let host = Host::default();
let store = Store::new(&engine, host);

Self::create_core(conf, linker, store, module, engine, Some(version))
}

pub fn _prepare(&mut self, conf: &WATERConfig) -> Result<(), anyhow::Error> {
self._init(conf.debug)?;
self._process_config(conf)?; // This is for now needed only by v1_preview
Expand Down
26 changes: 0 additions & 26 deletions crates/water/src/runtime/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,30 +190,4 @@ pub trait WATERTransportTrait: Send {

Ok(handle)
}

// fn read(&mut self, _buf: &mut Vec<u8>) -> Result<i64, anyhow::Error> {
// Err(anyhow::anyhow!("Method not supported"))
// }

// fn write(&mut self, _buf: &[u8]) -> Result<(), anyhow::Error> {
// Err(anyhow::anyhow!("Method not supported"))
// }

// // v0 only
// fn cancel_with(&mut self, _conf: &WATERConfig) -> Result<(), anyhow::Error> {
// Err(anyhow::anyhow!("Method not supported"))
// }

// // v0 only
// fn cancel(&mut self, _conf: &WATERConfig) -> Result<(), anyhow::Error> {
// Err(anyhow::anyhow!("Method not supported"))
// }

// // v0 only
// fn run_entry_fn(
// &mut self,
// _conf: &WATERConfig,
// ) -> Result<std::thread::JoinHandle<Result<(), anyhow::Error>>, anyhow::Error> {
// Err(anyhow::anyhow!("Method not supported"))
// }
}
99 changes: 75 additions & 24 deletions crates/water/src/runtime/v0/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ impl Config {
pub enum V0CRole {
Unknown,
Dialer(i32),
Listener(i32),
Relay(i32, i32), // listener_fd, dialer_fd
Listener(i32, i32), // listener_fd, accepted_fd
Relay(i32, i32, i32), // listener_fd, accepted_fd, dialer_fd
}

// V0 specific configurations
Expand Down Expand Up @@ -93,8 +93,12 @@ impl V0Config {
info!("[HOST] WATERCore V0 connecting to {}", addr);

match &mut self.conn {
V0CRole::Relay(_lis, ref mut conn_fd) => {
V0CRole::Relay(_, _, ref mut conn_fd) => {
// now relay has been built, need to dial
if *conn_fd != -1 {
erikziyunchi marked this conversation as resolved.
Show resolved Hide resolved
return Err(anyhow::Error::msg("Relay already connected"));
}

let conn = std::net::TcpStream::connect(addr)?;
*conn_fd = conn.as_raw_fd();
Ok(conn)
Expand All @@ -116,27 +120,40 @@ impl V0Config {
let listener = std::net::TcpListener::bind(addr)?;

if is_relay {
self.conn = V0CRole::Relay(listener.into_raw_fd(), 0);
self.conn = V0CRole::Relay(listener.into_raw_fd(), -1, -1);
} else {
self.conn = V0CRole::Listener(listener.into_raw_fd());
self.conn = V0CRole::Listener(listener.into_raw_fd(), -1);
}
Ok(())
}

pub fn accept(&mut self) -> Result<std::net::TcpStream, anyhow::Error> {
info!("[HOST] WATERCore V0 accept with conn {:?} ...", self.conn);

match &self.conn {
V0CRole::Listener(listener) => {
let listener = unsafe { std::net::TcpListener::from_raw_fd(*listener) };
match self.conn {
V0CRole::Listener(ref mut listener_fd, ref mut accepted_fd) => {
if *accepted_fd != -1 {
return Err(anyhow::Error::msg("Listener already accepted"));
}

let listener = unsafe { std::net::TcpListener::from_raw_fd(*listener_fd) };

let (stream, _) = listener.accept()?;
self.conn = V0CRole::Listener(listener.into_raw_fd()); // makde sure it is not closed after scope

*listener_fd = listener.into_raw_fd(); // makde sure the listener is not closed after scope
erikziyunchi marked this conversation as resolved.
Show resolved Hide resolved
*accepted_fd = stream.as_raw_fd();

Ok(stream)
}
V0CRole::Relay(listener, _) => {
let listener = unsafe { std::net::TcpListener::from_raw_fd(*listener) };
V0CRole::Relay(ref mut listener_fd, ref mut accepted_fd, _) => {
if *accepted_fd != -1 {
return Err(anyhow::Error::msg("Relay already accepted"));
}

let listener = unsafe { std::net::TcpListener::from_raw_fd(*listener_fd) };
let (stream, _) = listener.accept()?;
self.conn = V0CRole::Relay(listener.into_raw_fd(), 0); // makde sure it is not closed after scope
*listener_fd = listener.into_raw_fd(); // makde sure the listener is not closed after scope
*accepted_fd = stream.as_raw_fd();
Ok(stream)
}
_ => Err(anyhow::Error::msg("not a listener")),
Expand All @@ -146,22 +163,56 @@ impl V0Config {
pub fn defer(&mut self) {
info!("[HOST] WATERCore V0 defer with conn {:?} ...", self.conn);

match &self.conn {
V0CRole::Listener(_listener) => {
// TODO: Listener shouldn't be deferred, but the stream it connected to should be
// let listener = unsafe { std::net::TcpListener::from_raw_fd(*listener) };
// drop(listener);
match self.conn {
V0CRole::Listener(_, ref mut accepted_fd) => {
// The accepted stream should be defered, not the listener
let accepted_conn = unsafe { std::net::TcpStream::from_raw_fd(*accepted_fd) };
drop(accepted_conn);
*accepted_fd = -1; // set it back to default
}
V0CRole::Dialer(conn) => {
let conn = unsafe { std::net::TcpStream::from_raw_fd(*conn) };
V0CRole::Dialer(conn_fd) => {
let conn = unsafe { std::net::TcpStream::from_raw_fd(conn_fd) };
drop(conn);
}
V0CRole::Relay(_listener, conn) => {
// Listener shouldn't be deferred, like the above reason
// let listener = unsafe { std::net::TcpListener::from_raw_fd(*listener) };
// drop(listener);
let conn = unsafe { std::net::TcpStream::from_raw_fd(*conn) };
V0CRole::Relay(_, ref mut accepted_fd, ref mut conn_fd) => {
let accepted_conn = unsafe { std::net::TcpStream::from_raw_fd(*accepted_fd) };
drop(accepted_conn);
*accepted_fd = -1; // set it back to default

let conn = unsafe { std::net::TcpStream::from_raw_fd(*conn_fd) };
drop(conn);
*conn_fd = -1; // set it back to default
}
_ => {}
}
}

pub fn reset_listener_or_relay(&mut self) {
info!(
"[HOST] WATERCore v0 reset lisener / relay with conn {:?} ...",
self.conn
);

match self.conn {
V0CRole::Listener(_, ref mut accepted_fd) => {
if *accepted_fd != -1 {
let accepted_conn = unsafe { std::net::TcpStream::from_raw_fd(*accepted_fd) };
drop(accepted_conn);
*accepted_fd = -1; // set it back to default
}
}
V0CRole::Relay(_, ref mut accepted_fd, ref mut conn_fd) => {
if *accepted_fd != -1 {
let accepted_conn = unsafe { std::net::TcpStream::from_raw_fd(*accepted_fd) };
drop(accepted_conn);
*accepted_fd = -1; // set it back to default
}

if *conn_fd != -1 {
let conn = unsafe { std::net::TcpStream::from_raw_fd(*conn_fd) };
drop(conn);
*conn_fd = -1; // set it back to default
}
}
_ => {}
}
Expand Down
Loading
Loading