From 45183127b92d225bd5ce649cb564fba861834d60 Mon Sep 17 00:00:00 2001 From: jmwample <8297368+jmwample@users.noreply.github.com> Date: Tue, 17 Oct 2023 23:30:34 -0600 Subject: [PATCH] more better github actions --- .github/workflows/rust.yml | 12 +- crates/water/src/config/mod.rs | 10 +- crates/water/src/config/wasm_shared_config.rs | 11 +- crates/water/src/errors/mod.rs | 1 + crates/water/src/lib.rs | 4 +- crates/water/src/runtime/core.rs | 63 +++--- crates/water/src/runtime/listener.rs | 138 +++++++++--- crates/water/src/runtime/mod.rs | 57 ++--- crates/water/src/runtime/net/mod.rs | 5 +- crates/water/src/runtime/runner.rs | 20 +- crates/water/src/runtime/stream.rs | 139 +++++++++--- crates/water/src/runtime/v0/funcs.rs | 206 +++++++++--------- crates/water/src/runtime/v0/mod.rs | 2 +- crates/water/src/runtime/v1/funcs.rs | 204 +++++++++-------- crates/water/src/runtime/v1/mod.rs | 2 +- crates/water/src/runtime/version.rs | 4 +- .../water/src/runtime/version_common/funcs.rs | 36 +-- .../water/src/runtime/version_common/mod.rs | 2 +- crates/water/src/utils/mod.rs | 1 + crates/water_wasm/src/config.rs | 2 +- crates/water_wasm/src/connections.rs | 126 ++++++----- crates/water_wasm/src/decoder.rs | 8 +- crates/water_wasm/src/dialer.rs | 33 +-- crates/water_wasm/src/encoder.rs | 7 +- crates/water_wasm/src/lib.rs | 18 +- crates/water_wasm/src/version.rs | 2 +- examples/clients/cli/src/cli.rs | 10 +- examples/clients/cli/src/main.rs | 9 +- .../echo_client/src/async_socks5_listener.rs | 135 ++++++++---- examples/water_bins/echo_client/src/lib.rs | 51 +++-- .../water_bins/ss_client_wasm_v1/src/aead.rs | 50 +++-- .../ss_client_wasm_v1/src/client.rs | 40 ++-- .../ss_client_wasm_v1/src/crypto_io.rs | 25 ++- .../water_bins/ss_client_wasm_v1/src/lib.rs | 18 +- .../ss_client_wasm_v1/src/socks5.rs | 115 +++++++--- .../water_bins/ss_client_wasm_v1/src/utils.rs | 3 +- .../water_bins/ss_client_wasm_v1/src/water.rs | 98 ++++++--- 37 files changed, 1028 insertions(+), 639 deletions(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index e70d12e..12b50d9 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -12,13 +12,19 @@ env: jobs: build: strategy: - fail-fast: false + fail-fast: true matrix: os: [ "ubuntu-latest", "macos-latest" ] + runs-on: ${{ matrix.os }} + steps: - uses: actions/checkout@v3 + - name: Format + run: cargo fmt --all -- --check + - name: Lint + run: cargo clippy --workspace --all-targets --verbose --all-features + - name: Test + run: cargo test --verbose --workspace --all-features - name: Build run: cargo build --verbose - - name: Test - run: cargo test --verbose diff --git a/crates/water/src/config/mod.rs b/crates/water/src/config/mod.rs index f289585..731d7b0 100644 --- a/crates/water/src/config/mod.rs +++ b/crates/water/src/config/mod.rs @@ -9,7 +9,13 @@ pub struct WATERConfig { } impl WATERConfig { - pub fn init(wasm_path: String, entry_fn: String, config_wasm: String, client_type: u32, debug: bool) -> Result { + pub fn init( + wasm_path: String, + entry_fn: String, + config_wasm: String, + client_type: u32, + debug: bool, + ) -> Result { Ok(WATERConfig { filepath: wasm_path, entry_fn: entry_fn, @@ -18,4 +24,4 @@ impl WATERConfig { debug: debug, }) } -} \ No newline at end of file +} diff --git a/crates/water/src/config/wasm_shared_config.rs b/crates/water/src/config/wasm_shared_config.rs index 0d8bbe3..e561a0b 100644 --- a/crates/water/src/config/wasm_shared_config.rs +++ b/crates/water/src/config/wasm_shared_config.rs @@ -1,18 +1,17 @@ +use serde::{Deserialize, Serialize}; use std::mem; -use serde::{Serialize, Deserialize}; #[repr(C)] pub struct WASMSharedConfig { - -// pub key: u64, // a pointer to a key string's byte-view -// pub size: u64, // size for the key + // pub key: u64, // a pointer to a key string's byte-view + // pub size: u64, // size for the key } impl WASMSharedConfig { pub fn to_bytes(&self) -> Vec { let size = mem::size_of::(); let ptr = self as *const Self; - + let bytes_slice = unsafe { std::slice::from_raw_parts(ptr as *const u8, size) }; let bytes = bytes_slice.to_vec(); bytes @@ -42,7 +41,7 @@ impl StreamConfig { pub fn to_bytes(&self) -> Vec { let size = mem::size_of::(); let ptr = self as *const Self; - + let bytes_slice = unsafe { std::slice::from_raw_parts(ptr as *const u8, size) }; let bytes = bytes_slice.to_vec(); bytes diff --git a/crates/water/src/errors/mod.rs b/crates/water/src/errors/mod.rs index e69de29..8b13789 100644 --- a/crates/water/src/errors/mod.rs +++ b/crates/water/src/errors/mod.rs @@ -0,0 +1 @@ + diff --git a/crates/water/src/lib.rs b/crates/water/src/lib.rs index bcd1ed6..c4cd117 100644 --- a/crates/water/src/lib.rs +++ b/crates/water/src/lib.rs @@ -8,7 +8,7 @@ extern crate wasmtime_wasi; extern crate wasmtime_wasi_threads; pub mod config; -pub mod runtime; pub mod errors; -pub mod utils; pub mod globals; +pub mod runtime; +pub mod utils; diff --git a/crates/water/src/runtime/core.rs b/crates/water/src/runtime/core.rs index c7ab524..2379e99 100644 --- a/crates/water/src/runtime/core.rs +++ b/crates/water/src/runtime/core.rs @@ -19,25 +19,28 @@ pub struct H2O { impl H2O { pub fn init(conf: &WATERConfig) -> Result { info!("[HOST] WATERCore H2O initing..."); - + let mut wasm_config = wasmtime::Config::new(); wasm_config.wasm_threads(true); - + let engine = Engine::new(&wasm_config)?; let mut linker: Linker = Linker::new(&engine); - + let module = Module::from_file(&engine, &conf.filepath)?; let host = Host::default(); let mut store = Store::new(&engine, host); let version = module.exports().find_map(|global| { - info!("[HOST] WATERCore finding exported symbols from WASM bin: {:?}", global.name()); + info!( + "[HOST] WATERCore finding exported symbols from WASM bin: {:?}", + global.name() + ); match Version::from_str(global.name()) { Some(v) => { info!("[HOST] WATERCore found version: {:?}", v.as_str()); Some(v) - }, + } None => None, } }); @@ -45,47 +48,43 @@ impl H2O { if version.is_none() { return Err(anyhow::Error::msg("WASM module version not found")); } - + // let path = unsafe { Dir::open_ambient_dir(".", ambient_authority())? }; - + // store.data_mut().preview1_ctx = Some(WasiCtxBuilder::new().inherit_stdio().preopened_dir(path, ".")?.build()); store.data_mut().preview1_ctx = Some(WasiCtxBuilder::new().inherit_stdio().build()); - - wasmtime_wasi::add_to_linker(&mut linker, |h: &mut Host| { - h.preview1_ctx.as_mut().unwrap() - })?; - + + wasmtime_wasi::add_to_linker(&mut linker, |h: &mut Host| h.preview1_ctx.as_mut().unwrap())?; + // initializing stuff for multithreading #[cfg(feature = "multithread")] { - store.data_mut().wasi_threads = Some(Arc::new(WasiThreadsCtx::new( module.clone(), Arc::new(linker.clone()), )?)); - + wasmtime_wasi_threads::add_to_linker(&mut linker, &store, &module, |h: &mut Host| { h.wasi_threads.as_ref().unwrap() })?; } - // export functions -- version dependent -- has to be done before instantiate match &version { Some(Version::V0) => { v0::funcs::export_tcp_connect(&mut linker); v0::funcs::export_tcplistener_create(&mut linker); - }, + } Some(Version::V1) => { v1::funcs::export_tcp_connect(&mut linker); v1::funcs::export_tcplistener_create(&mut linker); - }, + } _ => {} // add export funcs for other versions here } // export functions -- version independent version_common::funcs::export_config(&mut linker, conf.config_wasm.clone()); - + let instance = linker.instantiate(&mut store, &module)?; Ok(H2O { @@ -96,7 +95,7 @@ impl H2O { instance: instance, store: store, module: module, - }) + }) } pub fn _prepare(&mut self, conf: &WATERConfig) -> Result<(), anyhow::Error> { @@ -117,7 +116,7 @@ impl H2O { // TODO: check if we need to pass in any arguments / configs later let params = vec![Val::I32(debug as i32); init_fn.ty(&self.store).params().len()]; match init_fn.call(&mut self.store, ¶ms, &mut []) { - Ok(_) => {}, + Ok(_) => {} Err(e) => return Err(anyhow::Error::msg(format!("init function failed: {}", e))), } @@ -130,23 +129,35 @@ impl H2O { // _required to implement _process_config(i32) in WASM, which will be parsing all the configurations let config_fn = match self.instance.get_func(&mut self.store, CONFIG_FN) { Some(func) => func, - None => return Err(anyhow::Error::msg("_process_config function not found in WASM")), + None => { + return Err(anyhow::Error::msg( + "_process_config function not found in WASM", + )) + } }; // open the config file and insert to WASM let dir = Dir::open_ambient_dir(".", ambient_authority())?; // Open the root directory - let wasi_file = dir.open_with(&config.config_wasm, OpenOptions::new().read(true).write(true))?; + let wasi_file = dir.open_with( + &config.config_wasm, + OpenOptions::new().read(true).write(true), + )?; let wasi_file = wasmtime_wasi::sync::file::File::from_cap_std(wasi_file); - + let ctx = self.store.data_mut().preview1_ctx.as_mut().unwrap(); let config_fd = ctx.push_file(Box::new(wasi_file), FileAccessMode::all())? as i32; let params = vec![Val::I32(config_fd); config_fn.ty(&self.store).params().len()]; match config_fn.call(&mut self.store, ¶ms, &mut []) { - Ok(_) => {}, - Err(e) => return Err(anyhow::Error::msg(format!("_process_config function in WASM failed: {}", e))), + Ok(_) => {} + Err(e) => { + return Err(anyhow::Error::msg(format!( + "_process_config function in WASM failed: {}", + e + ))) + } } Ok(()) } -} \ No newline at end of file +} diff --git a/crates/water/src/runtime/listener.rs b/crates/water/src/runtime/listener.rs index 720bb1a..86ec2b9 100644 --- a/crates/water/src/runtime/listener.rs +++ b/crates/water/src/runtime/listener.rs @@ -5,11 +5,11 @@ pub struct WATERListener { // the reader in WASM (read from net -- n2w) // returns the number of bytes read - pub reader: Func, - + pub reader: Func, + // the writer in WASM (write to net -- w2n) // returns the number of bytes written - pub writer: Func, + pub writer: Func, pub caller_reader: UnixStream, // the reader in Caller (read from WASM -- w2u) pub caller_writer: UnixStream, // the writer in Caller (write to WASM -- u2w) @@ -18,29 +18,41 @@ pub struct WATERListener { } impl WATERListener { - /// Read from the target address pub fn read(&mut self, buf: &mut Vec) -> Result { info!("[HOST] WATERStream reading..."); let mut res = vec![Val::I64(0); self.reader.ty(&self.core.store).results().len()]; match self.reader.call(&mut self.core.store, &[], &mut res) { - Ok(_) => {}, - Err(e) => return Err(anyhow::Error::msg(format!("{} function failed: {}", READER_FN, e))), + Ok(_) => {} + Err(e) => { + return Err(anyhow::Error::msg(format!( + "{} function failed: {}", + READER_FN, e + ))) + } } let nums: i64 = match res.get(0) { - Some(wasmtime::Val::I64(v)) => { - *v - }, - _ => return Err(anyhow::Error::msg(format!("{} function returned unexpected type / no return", READER_FN))), + Some(wasmtime::Val::I64(v)) => *v, + _ => { + return Err(anyhow::Error::msg(format!( + "{} function returned unexpected type / no return", + READER_FN + ))) + } }; // read from WASM's caller_reader buf.resize(nums as usize, 0); match self.caller_reader.read(&mut buf[..]) { - Ok(_) => {}, - Err(e) => return Err(anyhow::Error::msg(format!("failed to read from caller_reader: {}", e))), + Ok(_) => {} + Err(e) => { + return Err(anyhow::Error::msg(format!( + "failed to read from caller_reader: {}", + e + ))) + } } Ok(nums) @@ -52,8 +64,13 @@ impl WATERListener { // write to WASM's caller_writer match self.caller_writer.write_all(buf) { - Ok(_) => {}, - Err(e) => return Err(anyhow::Error::msg(format!("failed to write to caller_writer: {}", e))), + Ok(_) => {} + Err(e) => { + return Err(anyhow::Error::msg(format!( + "failed to write to caller_writer: {}", + e + ))) + } } let params = vec![Val::I64(buf.len() as i64)]; @@ -63,78 +80,127 @@ impl WATERListener { match res.get(0) { Some(wasmtime::Val::I64(v)) => { if *v != buf.len() as i64 { - return Err(anyhow::Error::msg(format!("WASM write function returned unexpected value: {}", *v))); + return Err(anyhow::Error::msg(format!( + "WASM write function returned unexpected value: {}", + *v + ))); } - }, - _ => return Err(anyhow::Error::msg("user_write_done function returned unexpected type / no return")), + } + _ => { + return Err(anyhow::Error::msg( + "user_write_done function returned unexpected type / no return", + )) + } }; - }, - Err(e) => return Err(anyhow::Error::msg(format!("{} function failed: {}", WRITER_FN, e))), + } + Err(e) => { + return Err(anyhow::Error::msg(format!( + "{} function failed: {}", + WRITER_FN, e + ))) + } } Ok(()) } /// Listening at the addr:port with running the WASM listen function - pub fn listen(&mut self, conf: &WATERConfig, addr: &str, port: u16) -> Result<(), anyhow::Error> { + pub fn listen( + &mut self, + conf: &WATERConfig, + addr: &str, + port: u16, + ) -> Result<(), anyhow::Error> { info!("[HOST] WATERStream listening..."); // TODO: add addr:port sharing with WASM, for now WASM is using config.json's remote_addr:port - let fnc = self.core.instance.get_func(&mut self.core.store, &conf.entry_fn).unwrap(); + let fnc = self + .core + .instance + .get_func(&mut self.core.store, &conf.entry_fn) + .unwrap(); match fnc.call(&mut self.core.store, &[], &mut []) { - Ok(_) => {}, - Err(e) => return Err(anyhow::Error::msg(format!("connect function failed: {}", e))), + Ok(_) => {} + Err(e) => { + return Err(anyhow::Error::msg(format!( + "connect function failed: {}", + e + ))) + } } - Ok(()) } - + pub fn init(conf: &WATERConfig) -> Result { info!("[HOST] WATERStream init..."); let mut core = H2O::init(conf)?; core._prepare(conf)?; - + // constructing 2 pairs of UnixStream for communicating between WASM and Host // returns (read_end, write_end) for caller let (caller_read_end, water_write_end) = UnixStream::pair()?; let (water_read_end, caller_write_end) = UnixStream::pair()?; - let water_write_file = unsafe { cap_std::fs::File::from_raw_fd(water_write_end.as_raw_fd()) }; + let water_write_file = + unsafe { cap_std::fs::File::from_raw_fd(water_write_end.as_raw_fd()) }; let water_read_file = unsafe { cap_std::fs::File::from_raw_fd(water_read_end.as_raw_fd()) }; - + // insert file here let wasi_water_reader = wasmtime_wasi::sync::file::File::from_cap_std(water_read_file); let wasi_water_writer = wasmtime_wasi::sync::file::File::from_cap_std(water_write_file); std::mem::forget(water_write_end); std::mem::forget(water_read_end); - + let ctx = core.store.data_mut().preview1_ctx.as_mut().unwrap(); let water_reader_fd = ctx.push_file(Box::new(wasi_water_reader), FileAccessMode::all())?; let water_writer_fd = ctx.push_file(Box::new(wasi_water_writer), FileAccessMode::all())?; let water_bridging = match core.instance.get_func(&mut core.store, WATER_BRIDGING_FN) { Some(func) => func, - None => return Err(anyhow::Error::msg(format!("{} function not found in WASM", WATER_BRIDGING_FN))), + None => { + return Err(anyhow::Error::msg(format!( + "{} function not found in WASM", + WATER_BRIDGING_FN + ))) + } }; - let params = vec![Val::I32(water_reader_fd as i32), Val::I32(water_writer_fd as i32)]; + let params = vec![ + Val::I32(water_reader_fd as i32), + Val::I32(water_writer_fd as i32), + ]; match water_bridging.call(&mut core.store, ¶ms, &mut []) { - Ok(_) => {}, - Err(e) => return Err(anyhow::Error::msg(format!("{} function failed: {}", WATER_BRIDGING_FN, e))), + Ok(_) => {} + Err(e) => { + return Err(anyhow::Error::msg(format!( + "{} function failed: {}", + WATER_BRIDGING_FN, e + ))) + } } // getting reader & writer func from WASM let reader = match core.instance.get_func(&mut core.store, READER_FN) { Some(func) => func, - None => return Err(anyhow::Error::msg(format!("{} function not found in WASM", READER_FN))), + None => { + return Err(anyhow::Error::msg(format!( + "{} function not found in WASM", + READER_FN + ))) + } }; let writer = match core.instance.get_func(&mut core.store, WRITER_FN) { Some(func) => func, - None => return Err(anyhow::Error::msg(format!("{} function not found in WASM", WRITER_FN))), + None => { + return Err(anyhow::Error::msg(format!( + "{} function not found in WASM", + WRITER_FN + ))) + } }; let runtime = WATERListener { @@ -149,4 +215,4 @@ impl WATERListener { Ok(runtime) } -} \ No newline at end of file +} diff --git a/crates/water/src/runtime/mod.rs b/crates/water/src/runtime/mod.rs index 7064c2b..2327dfa 100644 --- a/crates/water/src/runtime/mod.rs +++ b/crates/water/src/runtime/mod.rs @@ -1,13 +1,13 @@ // =================== MODULES =================== -pub mod net; -pub mod v0; -pub mod v1; -pub mod version_common; -pub mod stream; pub mod core; pub mod listener; +pub mod net; pub mod runner; +pub mod stream; +pub mod v0; +pub mod v1; pub mod version; +pub mod version_common; // =================== STD Imports =================== use std::{ @@ -25,32 +25,25 @@ use cap_std::{ os::unix::net::UnixStream, }; use tracing::{debug, info}; -use wasi_common::{ - file::FileAccessMode, - WasiCtx, - WasiFile, -}; +use wasi_common::{file::FileAccessMode, WasiCtx, WasiFile}; use wasmtime::*; use wasmtime_wasi::sync::{Dir, WasiCtxBuilder}; use wasmtime_wasi_threads::WasiThreadsCtx; // =================== CURRENT CRATE IMPORTS =================== use crate::{ - config::{WATERConfig}, - globals::{ - CONFIG_FN, INIT_FN, READER_FN, WATER_BRIDGING_FN, WRITER_FN, - }, + config::WATERConfig, + globals::{CONFIG_FN, INIT_FN, READER_FN, WATER_BRIDGING_FN, WRITER_FN}, }; // =================== MODULES' DEPENDENCIES =================== -use self::core::{H2O, Host}; +use self::core::{Host, H2O}; use self::listener::WATERListener; use self::net::{ConnectFile, File, ListenFile}; use self::runner::WATERRunner; use self::stream::WATERStream; use self::version::Version; - // =================== WATERClient Definition =================== pub enum WATERClientType { Dialer(WATERStream), @@ -95,74 +88,70 @@ impl WATERClient { pub fn execute(&mut self) -> Result<(), anyhow::Error> { info!("[HOST] WATERClient Executing ..."); - + match &mut self.stream { WATERClientType::Runner(runner) => { runner.run(&self.config)?; - }, + } _ => { return Err(anyhow::anyhow!("This client is not a Runner")); } } Ok(()) } - + pub fn connect(&mut self, addr: &str, port: u16) -> Result<(), anyhow::Error> { info!("[HOST] WATERClient connecting ..."); match &mut self.stream { WATERClientType::Dialer(dialer) => { dialer.connect(&self.config, addr, port)?; - }, + } _ => { return Err(anyhow::anyhow!("This client is not a listener")); } } Ok(()) } - + pub fn listen(&mut self, addr: &str, port: u16) -> Result<(), anyhow::Error> { info!("[HOST] WATERClient listening ..."); match &mut self.stream { WATERClientType::Listener(listener) => { listener.listen(&self.config, addr, port)?; - }, + } _ => { return Err(anyhow::anyhow!("This client is not a listener")); } } Ok(()) } - + pub fn read(&mut self, buf: &mut Vec) -> Result { let read_bytes = match self.stream { - WATERClientType::Dialer(ref mut dialer) => { - dialer.read(buf)? - }, - WATERClientType::Listener(ref mut listener) => { - listener.read(buf)? - }, + WATERClientType::Dialer(ref mut dialer) => dialer.read(buf)?, + WATERClientType::Listener(ref mut listener) => listener.read(buf)?, _ => { return Err(anyhow::anyhow!("This client is not supporting read")); } }; - + Ok(read_bytes) } - + pub fn write(&mut self, buf: &[u8]) -> Result<(), anyhow::Error> { match self.stream { WATERClientType::Dialer(ref mut dialer) => { dialer.write(buf)?; - }, + } WATERClientType::Listener(ref mut listener) => { listener.write(buf)?; - }, + } _ => { return Err(anyhow::anyhow!("This client is not supporting write")); } } Ok(()) } -} \ No newline at end of file +} diff --git a/crates/water/src/runtime/net/mod.rs b/crates/water/src/runtime/net/mod.rs index 92b42ba..487dda3 100644 --- a/crates/water/src/runtime/net/mod.rs +++ b/crates/water/src/runtime/net/mod.rs @@ -1,7 +1,7 @@ use serde::{de::Error as _, Deserialize, Deserializer, Serialize}; -use std::ops::Deref; use std::convert::{TryFrom, TryInto}; +use std::ops::Deref; // ========= Definition for files shared between WASM & Host for creating connections =========== // TODO: migrate these code to a src file later @@ -83,7 +83,6 @@ pub enum File { // /// File descriptor of stderr // #[serde(rename = "stderr")] // Stderr(StdioFile), - /// File descriptor of a listen socket #[serde(rename = "listen")] Listen(ListenFile), @@ -175,4 +174,4 @@ pub enum ConnectFile { #[serde(default = "default_tcp_port")] port: u16, }, -} \ No newline at end of file +} diff --git a/crates/water/src/runtime/runner.rs b/crates/water/src/runtime/runner.rs index 7e8dbb7..e20bf04 100644 --- a/crates/water/src/runtime/runner.rs +++ b/crates/water/src/runtime/runner.rs @@ -9,25 +9,27 @@ impl WATERRunner { pub fn run(&mut self, conf: &WATERConfig) -> Result<(), anyhow::Error> { info!("[HOST] WATERRunner running..."); - let fnc = self.core.instance.get_func(&mut self.core.store, &conf.entry_fn).unwrap(); + let fnc = self + .core + .instance + .get_func(&mut self.core.store, &conf.entry_fn) + .unwrap(); match fnc.call(&mut self.core.store, &[], &mut []) { - Ok(_) => {}, + Ok(_) => {} Err(e) => return Err(anyhow::Error::msg(format!("run function failed: {}", e))), } - + Ok(()) } - + pub fn init(conf: &WATERConfig) -> Result { info!("[HOST] WATERRunner init..."); let mut core = H2O::init(conf)?; core._prepare(conf)?; - - let runtime = WATERRunner { - core, - }; + + let runtime = WATERRunner { core }; Ok(runtime) } -} \ No newline at end of file +} diff --git a/crates/water/src/runtime/stream.rs b/crates/water/src/runtime/stream.rs index 345caf5..4ce0801 100644 --- a/crates/water/src/runtime/stream.rs +++ b/crates/water/src/runtime/stream.rs @@ -16,11 +16,11 @@ pub struct WATERStream { // the reader in WASM (read from net -- n2w) // returns the number of bytes read - pub reader: Func, - + pub reader: Func, + // the writer in WASM (write to net -- w2n) // returns the number of bytes written - pub writer: Func, + pub writer: Func, pub caller_io: UnixStream, // the pipe for communcating between Host and WASM @@ -28,29 +28,41 @@ pub struct WATERStream { } impl WATERStream { - /// Read from the target address pub fn read(&mut self, buf: &mut Vec) -> Result { debug!("[HOST] WATERStream reading..."); let mut res = vec![Val::I64(0); self.reader.ty(&self.core.store).results().len()]; match self.reader.call(&mut self.core.store, &[], &mut res) { - Ok(_) => {}, - Err(e) => return Err(anyhow::Error::msg(format!("{} function failed: {}", READER_FN, e))), + Ok(_) => {} + Err(e) => { + return Err(anyhow::Error::msg(format!( + "{} function failed: {}", + READER_FN, e + ))) + } } let nums: i64 = match res.get(0) { - Some(wasmtime::Val::I64(v)) => { - *v - }, - _ => return Err(anyhow::Error::msg(format!("{} function returned unexpected type / no return", READER_FN))), + Some(wasmtime::Val::I64(v)) => *v, + _ => { + return Err(anyhow::Error::msg(format!( + "{} function returned unexpected type / no return", + READER_FN + ))) + } }; // read from WASM's caller_reader buf.resize(nums as usize, 0); match self.caller_io.read(&mut buf[..]) { - Ok(_) => {}, - Err(e) => return Err(anyhow::Error::msg(format!("failed to read from caller_reader: {}", e))), + Ok(_) => {} + Err(e) => { + return Err(anyhow::Error::msg(format!( + "failed to read from caller_reader: {}", + e + ))) + } } Ok(nums) @@ -62,8 +74,13 @@ impl WATERStream { // write to WASM's caller_writer match self.caller_io.write_all(buf) { - Ok(_) => {}, - Err(e) => return Err(anyhow::Error::msg(format!("failed to write to caller_writer: {}", e))), + Ok(_) => {} + Err(e) => { + return Err(anyhow::Error::msg(format!( + "failed to write to caller_writer: {}", + e + ))) + } } let params = vec![Val::I64(buf.len() as i64)]; @@ -73,78 +90,128 @@ impl WATERStream { match res.get(0) { Some(wasmtime::Val::I64(v)) => { if *v != buf.len() as i64 { - return Err(anyhow::Error::msg(format!("WASM write function returned unexpected value: {}", *v))); + return Err(anyhow::Error::msg(format!( + "WASM write function returned unexpected value: {}", + *v + ))); } - }, - _ => return Err(anyhow::Error::msg("user_write_done function returned unexpected type / no return")), + } + _ => { + return Err(anyhow::Error::msg( + "user_write_done function returned unexpected type / no return", + )) + } }; - }, - Err(e) => return Err(anyhow::Error::msg(format!("{} function failed: {}", WRITER_FN, e))), + } + Err(e) => { + return Err(anyhow::Error::msg(format!( + "{} function failed: {}", + WRITER_FN, e + ))) + } } Ok(()) } /// Connect to the target address with running the WASM connect function - pub fn connect(&mut self, conf: &WATERConfig, addr: &str, port: u16) -> Result<(), anyhow::Error> { + pub fn connect( + &mut self, + conf: &WATERConfig, + addr: &str, + port: u16, + ) -> Result<(), anyhow::Error> { info!("[HOST] WATERStream connecting..."); // TODO: add addr:port sharing with WASM, for now WASM is using config.json's remote_addr:port // let fnc = self.core.instance.get_func(&mut self.core.store, &conf.entry_fn).unwrap(); - let fnc = match self.core.instance.get_func(&mut self.core.store, &conf.entry_fn) { + let fnc = match self + .core + .instance + .get_func(&mut self.core.store, &conf.entry_fn) + { Some(func) => func, - None => return Err(anyhow::Error::msg(format!("{} function not found in WASM", conf.entry_fn))), + None => { + return Err(anyhow::Error::msg(format!( + "{} function not found in WASM", + conf.entry_fn + ))) + } }; match fnc.call(&mut self.core.store, &[], &mut []) { - Ok(_) => {}, - Err(e) => return Err(anyhow::Error::msg(format!("connect function failed: {}", e))), + Ok(_) => {} + Err(e) => { + return Err(anyhow::Error::msg(format!( + "connect function failed: {}", + e + ))) + } } - Ok(()) } - + pub fn init(conf: &WATERConfig) -> Result { info!("[HOST] WATERStream init..."); let mut core = H2O::init(conf)?; core._prepare(conf)?; - + // constructing a pair of UnixStream for communicating between WASM and Host let (caller_end, water_end) = UnixStream::pair()?; - + let water_end_file = unsafe { cap_std::fs::File::from_raw_fd(water_end.as_raw_fd()) }; - + // insert file here let water_end_file = wasmtime_wasi::sync::file::File::from_cap_std(water_end_file); std::mem::forget(water_end); // forget the water_end, so that it won't be closed - + let ctx = core.store.data_mut().preview1_ctx.as_mut().unwrap(); let water_end_fd = ctx.push_file(Box::new(water_end_file), FileAccessMode::all())?; let water_bridging = match core.instance.get_func(&mut core.store, WATER_BRIDGING_FN) { Some(func) => func, - None => return Err(anyhow::Error::msg(format!("{} function not found in WASM", WATER_BRIDGING_FN))), + None => { + return Err(anyhow::Error::msg(format!( + "{} function not found in WASM", + WATER_BRIDGING_FN + ))) + } }; // let params = vec![Val::I32(water_reader_fd as i32), Val::I32(water_writer_fd as i32)]; let params = vec![Val::I32(water_end_fd as i32)]; match water_bridging.call(&mut core.store, ¶ms, &mut []) { - Ok(_) => {}, - Err(e) => return Err(anyhow::Error::msg(format!("{} function failed: {}", WATER_BRIDGING_FN, e))), + Ok(_) => {} + Err(e) => { + return Err(anyhow::Error::msg(format!( + "{} function failed: {}", + WATER_BRIDGING_FN, e + ))) + } } // getting reader & writer func from WASM let reader = match core.instance.get_func(&mut core.store, READER_FN) { Some(func) => func, - None => return Err(anyhow::Error::msg(format!("{} function not found in WASM", READER_FN))), + None => { + return Err(anyhow::Error::msg(format!( + "{} function not found in WASM", + READER_FN + ))) + } }; let writer = match core.instance.get_func(&mut core.store, WRITER_FN) { Some(func) => func, - None => return Err(anyhow::Error::msg(format!("{} function not found in WASM", WRITER_FN))), + None => { + return Err(anyhow::Error::msg(format!( + "{} function not found in WASM", + WRITER_FN + ))) + } }; let runtime = WATERStream { @@ -158,4 +225,4 @@ impl WATERStream { Ok(runtime) } -} \ No newline at end of file +} diff --git a/crates/water/src/runtime/v0/funcs.rs b/crates/water/src/runtime/v0/funcs.rs index 11f931a..1214799 100644 --- a/crates/water/src/runtime/v0/funcs.rs +++ b/crates/water/src/runtime/v0/funcs.rs @@ -1,110 +1,122 @@ -use crate::runtime::*; use crate::config::wasm_shared_config::StreamConfig; -use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; +use crate::runtime::*; use std::convert::TryInto; - +use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; // TODO: rename this to dial_v1, since it has the ability to let WASM choose ip:port pub fn export_tcp_connect(linker: &mut Linker) { - linker.func_wrap("env", "connect_tcp", move |mut caller: Caller<'_, Host>, ptr: u32, size: u32| -> i32{ - - info!("[WASM] invoking Host exported Dial func connect_tcp..."); - - let memory = match caller.get_export("memory") { - Some(Extern::Memory(memory)) => memory, - _ => return -1, - }; - - // Get a slice of the memory. - let mem_slice = memory.data_mut(&mut caller); - - // Use the offset and size to get the relevant part of the memory. - let data = &mut mem_slice[ptr as usize..(ptr as usize + size as usize)]; - - let mut config: StreamConfig; - unsafe { - config = bincode::deserialize(&data).expect("Failed to deserialize"); - } - - let connect_file = File::Connect(ConnectFile::Tcp { - name: Some(config.name.clone().try_into().unwrap()), - port: config.port as u16, - host: config.addr.clone().into() - }); - - // Get the pair here addr:port - let (host, port) = match connect_file { - File::Connect(listen_file) => match listen_file { - ConnectFile::Tcp { host, port, .. } | ConnectFile::Tls { host, port, .. } => (host, port), + linker + .func_wrap( + "env", + "connect_tcp", + move |mut caller: Caller<'_, Host>, ptr: u32, size: u32| -> i32 { + info!("[WASM] invoking Host exported Dial func connect_tcp..."); + + let memory = match caller.get_export("memory") { + Some(Extern::Memory(memory)) => memory, + _ => return -1, + }; + + // Get a slice of the memory. + let mem_slice = memory.data_mut(&mut caller); + + // Use the offset and size to get the relevant part of the memory. + let data = &mut mem_slice[ptr as usize..(ptr as usize + size as usize)]; + + let mut config: StreamConfig; + unsafe { + config = bincode::deserialize(&data).expect("Failed to deserialize"); + } + + let connect_file = File::Connect(ConnectFile::Tcp { + name: Some(config.name.clone().try_into().unwrap()), + port: config.port as u16, + host: config.addr.clone().into(), + }); + + // Get the pair here addr:port + let (host, port) = match connect_file { + File::Connect(listen_file) => match listen_file { + ConnectFile::Tcp { host, port, .. } + | ConnectFile::Tls { host, port, .. } => (host, port), + }, + _ => ("Wrong".into(), 0), + }; + + let tcp = match (host.as_str(), port) { + ("localhost", port) => std::net::TcpStream::connect(SocketAddr::V4( + SocketAddrV4::new(Ipv4Addr::LOCALHOST, port), + )), + addr => std::net::TcpStream::connect(addr), + } + .map(TcpStream::from_std) + .context("failed to connect to endpoint") + .unwrap(); + + // Connecting Tcp + let socket_file: Box = wasmtime_wasi::net::Socket::from(tcp).into(); + + // Get the WasiCtx of the caller(WASM), then insert_file into it + let ctx: &mut WasiCtx = caller.data_mut().preview1_ctx.as_mut().unwrap(); + ctx.push_file(socket_file, FileAccessMode::all()).unwrap() as i32 }, - _ => { ("Wrong".into(), 0) } - }; - - let tcp = match (host.as_str(), port) { - ("localhost", port) => std::net::TcpStream::connect(SocketAddr::V4(SocketAddrV4::new( - Ipv4Addr::LOCALHOST, - port, - ))), - addr => std::net::TcpStream::connect(addr), - } - .map(TcpStream::from_std) - .context("failed to connect to endpoint").unwrap(); - - // Connecting Tcp - let socket_file: Box = wasmtime_wasi::net::Socket::from(tcp).into(); - - // Get the WasiCtx of the caller(WASM), then insert_file into it - let ctx: &mut WasiCtx = caller.data_mut().preview1_ctx.as_mut().unwrap(); - ctx.push_file(socket_file, FileAccessMode::all()).unwrap() as i32 - }).unwrap(); + ) + .unwrap(); } // TODO: rename this to dial_v1, since it has the ability to let WASM listen on a TcpListener pub fn export_tcplistener_create(linker: &mut Linker) { - linker.func_wrap("env", "create_listen", move |mut caller: Caller<'_, Host>, ptr: u32, size: u32| -> i32{ - - info!("[WASM] invoking Host exported Dial func create_tcp_listener..."); - - let memory = match caller.get_export("memory") { - Some(Extern::Memory(memory)) => memory, - _ => return -1, - }; - - // Get a slice of the memory. - let mem_slice = memory.data_mut(&mut caller); - - // Use the offset and size to get the relevant part of the memory. - let data = &mut mem_slice[ptr as usize..(ptr as usize + size as usize)]; - - let mut config: StreamConfig; - unsafe { - config = bincode::deserialize(&data).expect("Failed to deserialize"); - } - - let listener_file = File::Listen(ListenFile::Tcp { - name: config.name.clone().try_into().unwrap(), - port: config.port as u16, - addr: config.addr.clone().into() - }); - - // Get the pair here addr:port - let (addr, port) = match listener_file { - File::Listen(listen_file) => match listen_file { - ListenFile::Tcp { addr, port, .. } | ListenFile::Tls { addr, port, .. } => (addr, port), + linker + .func_wrap( + "env", + "create_listen", + move |mut caller: Caller<'_, Host>, ptr: u32, size: u32| -> i32 { + info!("[WASM] invoking Host exported Dial func create_tcp_listener..."); + + let memory = match caller.get_export("memory") { + Some(Extern::Memory(memory)) => memory, + _ => return -1, + }; + + // Get a slice of the memory. + let mem_slice = memory.data_mut(&mut caller); + + // Use the offset and size to get the relevant part of the memory. + let data = &mut mem_slice[ptr as usize..(ptr as usize + size as usize)]; + + let mut config: StreamConfig; + unsafe { + config = bincode::deserialize(&data).expect("Failed to deserialize"); + } + + let listener_file = File::Listen(ListenFile::Tcp { + name: config.name.clone().try_into().unwrap(), + port: config.port as u16, + addr: config.addr.clone().into(), + }); + + // Get the pair here addr:port + let (addr, port) = match listener_file { + File::Listen(listen_file) => match listen_file { + ListenFile::Tcp { addr, port, .. } | ListenFile::Tls { addr, port, .. } => { + (addr, port) + } + }, + _ => ("Wrong".into(), 0), + }; + + // Creating Tcp Listener + let tcp = std::net::TcpListener::bind((addr.as_str(), port)).unwrap(); + let tcp = TcpListener::from_std(tcp); + // tcp.set_nonblocking(true); + let socket_file: Box = wasmtime_wasi::net::Socket::from(tcp).into(); + + // Get the WasiCtx of the caller(WASM), then insert_file into it + let ctx: &mut WasiCtx = caller.data_mut().preview1_ctx.as_mut().unwrap(); + ctx.push_file(socket_file, FileAccessMode::all()).unwrap() as i32 }, - _ => { ("Wrong".into(), 0) } - }; - - // Creating Tcp Listener - let tcp = std::net::TcpListener::bind((addr.as_str(), port)).unwrap(); - let tcp = TcpListener::from_std(tcp); - // tcp.set_nonblocking(true); - let socket_file: Box = wasmtime_wasi::net::Socket::from(tcp).into(); - - // Get the WasiCtx of the caller(WASM), then insert_file into it - let ctx: &mut WasiCtx = caller.data_mut().preview1_ctx.as_mut().unwrap(); - ctx.push_file(socket_file, FileAccessMode::all()).unwrap() as i32 - }).unwrap(); + ) + .unwrap(); } // Generically link dial functions @@ -122,4 +134,4 @@ pub fn export_tcplistener_create(linker: &mut Linker) { // Err(e) => { eprintln!("Failed to define function: {}", e) }, // }; // } -// } \ No newline at end of file +// } diff --git a/crates/water/src/runtime/v0/mod.rs b/crates/water/src/runtime/v0/mod.rs index b1f56f5..5ef91ae 100644 --- a/crates/water/src/runtime/v0/mod.rs +++ b/crates/water/src/runtime/v0/mod.rs @@ -1 +1 @@ -pub mod funcs; \ No newline at end of file +pub mod funcs; diff --git a/crates/water/src/runtime/v1/funcs.rs b/crates/water/src/runtime/v1/funcs.rs index 9b3d149..79609ce 100644 --- a/crates/water/src/runtime/v1/funcs.rs +++ b/crates/water/src/runtime/v1/funcs.rs @@ -1,109 +1,121 @@ -use crate::runtime::*; use crate::config::wasm_shared_config::StreamConfig; -use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; +use crate::runtime::*; use std::convert::TryInto; - +use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; // TODO: rename this to dial_v1, since it has the ability to let WASM choose ip:port pub fn export_tcp_connect(linker: &mut Linker) { - linker.func_wrap("env", "connect_tcp", move |mut caller: Caller<'_, Host>, ptr: u32, size: u32| -> i32{ - - info!("[WASM] invoking Host exported Dial func connect_tcp..."); - - let memory = match caller.get_export("memory") { - Some(Extern::Memory(memory)) => memory, - _ => return -1, - }; - - // Get a slice of the memory. - let mem_slice = memory.data_mut(&mut caller); - - // Use the offset and size to get the relevant part of the memory. - let data = &mut mem_slice[ptr as usize..(ptr as usize + size as usize)]; - - let mut config: StreamConfig; - unsafe { - config = bincode::deserialize(&data).expect("Failed to deserialize"); - } - - let connect_file = File::Connect(ConnectFile::Tcp { - name: Some(config.name.clone().try_into().unwrap()), - port: config.port as u16, - host: config.addr.clone().into() - }); - - // Get the pair here addr:port - let (host, port) = match connect_file { - File::Connect(listen_file) => match listen_file { - ConnectFile::Tcp { host, port, .. } | ConnectFile::Tls { host, port, .. } => (host, port), + linker + .func_wrap( + "env", + "connect_tcp", + move |mut caller: Caller<'_, Host>, ptr: u32, size: u32| -> i32 { + info!("[WASM] invoking Host exported Dial func connect_tcp..."); + + let memory = match caller.get_export("memory") { + Some(Extern::Memory(memory)) => memory, + _ => return -1, + }; + + // Get a slice of the memory. + let mem_slice = memory.data_mut(&mut caller); + + // Use the offset and size to get the relevant part of the memory. + let data = &mut mem_slice[ptr as usize..(ptr as usize + size as usize)]; + + let mut config: StreamConfig; + unsafe { + config = bincode::deserialize(&data).expect("Failed to deserialize"); + } + + let connect_file = File::Connect(ConnectFile::Tcp { + name: Some(config.name.clone().try_into().unwrap()), + port: config.port as u16, + host: config.addr.clone().into(), + }); + + // Get the pair here addr:port + let (host, port) = match connect_file { + File::Connect(listen_file) => match listen_file { + ConnectFile::Tcp { host, port, .. } + | ConnectFile::Tls { host, port, .. } => (host, port), + }, + _ => ("Wrong".into(), 0), + }; + + let tcp = match (host.as_str(), port) { + ("localhost", port) => std::net::TcpStream::connect(SocketAddr::V4( + SocketAddrV4::new(Ipv4Addr::LOCALHOST, port), + )), + addr => std::net::TcpStream::connect(addr), + } + .map(TcpStream::from_std) + .context("failed to connect to endpoint") + .unwrap(); + + // Connecting Tcp + let socket_file: Box = wasmtime_wasi::net::Socket::from(tcp).into(); + + // Get the WasiCtx of the caller(WASM), then insert_file into it + let ctx: &mut WasiCtx = caller.data_mut().preview1_ctx.as_mut().unwrap(); + ctx.push_file(socket_file, FileAccessMode::all()).unwrap() as i32 }, - _ => { ("Wrong".into(), 0) } - }; - - let tcp = match (host.as_str(), port) { - ("localhost", port) => std::net::TcpStream::connect(SocketAddr::V4(SocketAddrV4::new( - Ipv4Addr::LOCALHOST, - port, - ))), - addr => std::net::TcpStream::connect(addr), - } - .map(TcpStream::from_std) - .context("failed to connect to endpoint").unwrap(); - - // Connecting Tcp - let socket_file: Box = wasmtime_wasi::net::Socket::from(tcp).into(); - - // Get the WasiCtx of the caller(WASM), then insert_file into it - let ctx: &mut WasiCtx = caller.data_mut().preview1_ctx.as_mut().unwrap(); - ctx.push_file(socket_file, FileAccessMode::all()).unwrap() as i32 - }).unwrap(); + ) + .unwrap(); } // TODO: rename this to dial_v1, since it has the ability to let WASM listen on a TcpListener pub fn export_tcplistener_create(linker: &mut Linker) { - linker.func_wrap("env", "create_listen", move |mut caller: Caller<'_, Host>, ptr: u32, size: u32| -> i32{ - - info!("[WASM] invoking Host exported Dial func create_tcp_listener..."); - - let memory = match caller.get_export("memory") { - Some(Extern::Memory(memory)) => memory, - _ => return -1, - }; - - // Get a slice of the memory. - let mem_slice = memory.data_mut(&mut caller); - - // Use the offset and size to get the relevant part of the memory. - let data = &mut mem_slice[ptr as usize..(ptr as usize + size as usize)]; - - let mut config: StreamConfig; - unsafe { - config = bincode::deserialize(&data).expect("Failed to deserialize"); - } - - let listener_file = File::Listen(ListenFile::Tcp { - name: config.name.clone().try_into().unwrap(), - port: config.port as u16, - addr: config.addr.clone().into() - }); - - // Get the pair here addr:port - let (addr, port) = match listener_file { - File::Listen(listen_file) => match listen_file { - ListenFile::Tcp { addr, port, .. } | ListenFile::Tls { addr, port, .. } => (addr, port), + linker + .func_wrap( + "env", + "create_listen", + move |mut caller: Caller<'_, Host>, ptr: u32, size: u32| -> i32 { + info!("[WASM] invoking Host exported Dial func create_tcp_listener..."); + + let memory = match caller.get_export("memory") { + Some(Extern::Memory(memory)) => memory, + _ => return -1, + }; + + // Get a slice of the memory. + let mem_slice = memory.data_mut(&mut caller); + + // Use the offset and size to get the relevant part of the memory. + let data = &mut mem_slice[ptr as usize..(ptr as usize + size as usize)]; + + let mut config: StreamConfig; + unsafe { + config = bincode::deserialize(&data).expect("Failed to deserialize"); + } + + let listener_file = File::Listen(ListenFile::Tcp { + name: config.name.clone().try_into().unwrap(), + port: config.port as u16, + addr: config.addr.clone().into(), + }); + + // Get the pair here addr:port + let (addr, port) = match listener_file { + File::Listen(listen_file) => match listen_file { + ListenFile::Tcp { addr, port, .. } | ListenFile::Tls { addr, port, .. } => { + (addr, port) + } + }, + _ => ("Wrong".into(), 0), + }; + + // Creating Tcp Listener + let tcp = std::net::TcpListener::bind((addr.as_str(), port)).unwrap(); + let tcp = TcpListener::from_std(tcp); + let socket_file: Box = wasmtime_wasi::net::Socket::from(tcp).into(); + + // Get the WasiCtx of the caller(WASM), then insert_file into it + let ctx: &mut WasiCtx = caller.data_mut().preview1_ctx.as_mut().unwrap(); + ctx.push_file(socket_file, FileAccessMode::all()).unwrap() as i32 }, - _ => { ("Wrong".into(), 0) } - }; - - // Creating Tcp Listener - let tcp = std::net::TcpListener::bind((addr.as_str(), port)).unwrap(); - let tcp = TcpListener::from_std(tcp); - let socket_file: Box = wasmtime_wasi::net::Socket::from(tcp).into(); - - // Get the WasiCtx of the caller(WASM), then insert_file into it - let ctx: &mut WasiCtx = caller.data_mut().preview1_ctx.as_mut().unwrap(); - ctx.push_file(socket_file, FileAccessMode::all()).unwrap() as i32 - }).unwrap(); + ) + .unwrap(); } // Generically link dial functions @@ -121,4 +133,4 @@ pub fn export_tcplistener_create(linker: &mut Linker) { // Err(e) => { eprintln!("Failed to define function: {}", e) }, // }; // } -// } \ No newline at end of file +// } diff --git a/crates/water/src/runtime/v1/mod.rs b/crates/water/src/runtime/v1/mod.rs index b1f56f5..5ef91ae 100644 --- a/crates/water/src/runtime/v1/mod.rs +++ b/crates/water/src/runtime/v1/mod.rs @@ -1 +1 @@ -pub mod funcs; \ No newline at end of file +pub mod funcs; diff --git a/crates/water/src/runtime/version.rs b/crates/water/src/runtime/version.rs index ac67771..fa3cdc0 100644 --- a/crates/water/src/runtime/version.rs +++ b/crates/water/src/runtime/version.rs @@ -13,7 +13,7 @@ impl Version { _ => None, // Any other string results in None } } - + pub fn as_str(&self) -> &'static str { match *self { Version::V0 => "V0", @@ -21,4 +21,4 @@ impl Version { Version::V2 => "V2", } } -} \ No newline at end of file +} diff --git a/crates/water/src/runtime/version_common/funcs.rs b/crates/water/src/runtime/version_common/funcs.rs index 934bbc6..cd7aded 100644 --- a/crates/water/src/runtime/version_common/funcs.rs +++ b/crates/water/src/runtime/version_common/funcs.rs @@ -2,18 +2,28 @@ use crate::runtime::*; // exportint a function for WASM to get CONFIG file pub fn export_config(linker: &mut Linker, config_file: String) { - linker.func_wrap("env", "request_config", move |mut caller: Caller<'_, Host>| -> i32{ + linker + .func_wrap( + "env", + "request_config", + move |mut caller: Caller<'_, Host>| -> i32 { + info!("[WASM] invoking Host exported request_config ..."); - info!("[WASM] invoking Host exported request_config ..."); - - // open the config file and insert to WASM - let dir = Dir::open_ambient_dir(".", ambient_authority()).expect("Error now able to open ambient dir"); // Open the root directory - let wasi_file = dir.open_with(&config_file, OpenOptions::new().read(true).write(true)).expect("Error now able to open config file"); - let wasi_file = wasmtime_wasi::sync::file::File::from_cap_std(wasi_file); - - let ctx: &mut WasiCtx = caller.data_mut().preview1_ctx.as_mut().unwrap(); - let config_fd = ctx.push_file(Box::new(wasi_file), FileAccessMode::all()).expect("Error with pushing file") as i32; - - config_fd - }).unwrap(); + // open the config file and insert to WASM + let dir = Dir::open_ambient_dir(".", ambient_authority()) + .expect("Error now able to open ambient dir"); // Open the root directory + let wasi_file = dir + .open_with(&config_file, OpenOptions::new().read(true).write(true)) + .expect("Error now able to open config file"); + let wasi_file = wasmtime_wasi::sync::file::File::from_cap_std(wasi_file); + + let ctx: &mut WasiCtx = caller.data_mut().preview1_ctx.as_mut().unwrap(); + let config_fd = ctx + .push_file(Box::new(wasi_file), FileAccessMode::all()) + .expect("Error with pushing file") as i32; + + config_fd + }, + ) + .unwrap(); } diff --git a/crates/water/src/runtime/version_common/mod.rs b/crates/water/src/runtime/version_common/mod.rs index b1f56f5..5ef91ae 100644 --- a/crates/water/src/runtime/version_common/mod.rs +++ b/crates/water/src/runtime/version_common/mod.rs @@ -1 +1 @@ -pub mod funcs; \ No newline at end of file +pub mod funcs; diff --git a/crates/water/src/utils/mod.rs b/crates/water/src/utils/mod.rs index e69de29..8b13789 100644 --- a/crates/water/src/utils/mod.rs +++ b/crates/water/src/utils/mod.rs @@ -0,0 +1 @@ + diff --git a/crates/water_wasm/src/config.rs b/crates/water_wasm/src/config.rs index 3c67720..61e2ad6 100644 --- a/crates/water_wasm/src/config.rs +++ b/crates/water_wasm/src/config.rs @@ -40,4 +40,4 @@ impl StreamConfigV1 { name: name, } } -} \ No newline at end of file +} diff --git a/crates/water_wasm/src/connections.rs b/crates/water_wasm/src/connections.rs index 8ed5b22..e3502f8 100644 --- a/crates/water_wasm/src/connections.rs +++ b/crates/water_wasm/src/connections.rs @@ -24,10 +24,7 @@ pub struct ConnFile { impl ConnFile { // A default constructor for ConnFile pub fn new() -> Self { - ConnFile { - fd: -1, - file: None, - } + ConnFile { fd: -1, file: None } } pub fn read(&mut self, buf: &mut [u8]) -> Result { @@ -36,13 +33,11 @@ impl ConnFile { let bytes_read = match stream { ConnStream::TcpStream(stream) => { stream.read(buf).map_err(anyhow::Error::from)? - }, - ConnStream::File(stream) => { - stream.read(buf).map_err(anyhow::Error::from)? - }, + } + ConnStream::File(stream) => stream.read(buf).map_err(anyhow::Error::from)?, }; Ok(bytes_read as i64) - }, + } None => { eprintln!("[WASM] > ERROR: ConnFile's file is None"); Err(anyhow::anyhow!("ConnFile's file is None")) @@ -52,11 +47,9 @@ impl ConnFile { pub fn write(&mut self, buf: &[u8]) -> Result<(), anyhow::Error> { match &mut self.file { - Some(stream) => { - match stream { - ConnStream::TcpStream(stream) => stream.write_all(buf).map_err(anyhow::Error::from), - ConnStream::File(stream) => stream.write_all(buf).map_err(anyhow::Error::from), - } + Some(stream) => match stream { + ConnStream::TcpStream(stream) => stream.write_all(buf).map_err(anyhow::Error::from), + ConnStream::File(stream) => stream.write_all(buf).map_err(anyhow::Error::from), }, None => { return Err(anyhow::anyhow!("[WASM] > ERROR: ConnFile's file is None")); @@ -129,7 +122,10 @@ impl Connection { // } /// this _read function is triggered by the Host to read from the remote connection - pub fn _read_from_outbound(self: &mut Self, decoder: &mut D) -> Result { + pub fn _read_from_outbound( + self: &mut Self, + decoder: &mut D, + ) -> Result { debug!("[WASM] running in _read_from_net"); let mut buf = vec![0u8; 4096]; @@ -138,36 +134,53 @@ impl Connection { Err(e) => { // eprintln!("[WASM] > ERROR in _read when reading from outbound: {:?}", e); // return -1; // Or another sentinel value to indicate error} - return Err(anyhow::anyhow!("[WASM] > ERROR in _read when reading from outbound: {:?}", e)); + return Err(anyhow::anyhow!( + "[WASM] > ERROR in _read when reading from outbound: {:?}", + e + )); } }; // NOTE: decode logic here let mut decoded = vec![0u8; 4096]; - let len_after_decoding = match decoder.decode(&mut buf[..bytes_read as usize], &mut decoded) { - Ok(n) => { n }, + let len_after_decoding = match decoder.decode(&mut buf[..bytes_read as usize], &mut decoded) + { + Ok(n) => n, Err(e) => { // eprintln!("[WASM] > ERROR in _write when encoding: {:?}", e); // return -1; // Or another sentinel value to indicate error - return Err(anyhow::anyhow!("[WASM] > ERROR in _write when encoding: {:?}", e)); + return Err(anyhow::anyhow!( + "[WASM] > ERROR in _write when encoding: {:?}", + e + )); } }; - match self.inbound_conn.write(decoded[..len_after_decoding as usize].as_ref()) { - Ok(_) => {}, + match self + .inbound_conn + .write(decoded[..len_after_decoding as usize].as_ref()) + { + Ok(_) => {} Err(e) => { // eprintln!("[WASM] > ERROR in _read when writing to inbound: {:?}", e); // return -1; // Or another sentinel value to indicate error - return Err(anyhow::anyhow!("[WASM] > ERROR in _read when writing to inbound: {:?}", e)); + return Err(anyhow::anyhow!( + "[WASM] > ERROR in _read when writing to inbound: {:?}", + e + )); } } Ok(len_after_decoding as i64) } - pub fn _write_2_outbound(self: &mut Self, encoder: &mut E, bytes_write: i64) -> Result { + pub fn _write_2_outbound( + self: &mut Self, + encoder: &mut E, + bytes_write: i64, + ) -> Result { debug!("[WASM] running in _write_2_net"); - + let mut bytes_read: i64 = 0; let mut buf = vec![0u8; 4096]; loop { @@ -176,50 +189,61 @@ impl Connection { Err(e) => { // eprintln!("[WASM] > ERROR in _read when reading from inbound: {:?}", e); // return -1; // Or another sentinel value to indicate error - return Err(anyhow::anyhow!("[WASM] > ERROR in _read when reading from inbound: {:?}", e)); + return Err(anyhow::anyhow!( + "[WASM] > ERROR in _read when reading from inbound: {:?}", + e + )); } }; - + bytes_read += read; - + if read == 0 || bytes_read == bytes_write { break; } } - + // NOTE: encode logic here let mut encoded = vec![0u8; 4096]; - let len_after_encoding = match encoder.encode(&mut buf[..bytes_read as usize], &mut encoded) { - Ok(n) => { n }, + let len_after_encoding = match encoder.encode(&mut buf[..bytes_read as usize], &mut encoded) + { + Ok(n) => n, Err(e) => { // eprintln!("[WASM] > ERROR in _write when encoding: {:?}", e); // return -1; // Or another sentinel value to indicate error - return Err(anyhow::anyhow!("[WASM] > ERROR in _write when encoding: {:?}", e)); + return Err(anyhow::anyhow!( + "[WASM] > ERROR in _write when encoding: {:?}", + e + )); } }; - - match self.outbound_conn.write(encoded[..len_after_encoding as usize].as_ref()) { - Ok(_) => {}, + + match self + .outbound_conn + .write(encoded[..len_after_encoding as usize].as_ref()) + { + Ok(_) => {} Err(e) => { // eprintln!("[WASM] > ERROR in _read when writing to outbound: {:?}", e); // return -1; // Or another sentinel value to indicate error - return Err(anyhow::anyhow!("[WASM] > ERROR in _read when writing to outbound: {:?}", e)); + return Err(anyhow::anyhow!( + "[WASM] > ERROR in _read when writing to outbound: {:?}", + e + )); } } - + Ok(len_after_encoding as i64) } pub fn close_inbound(self: &mut Self) { match &mut self.inbound_conn.file { - Some(stream) => { - match stream { - ConnStream::TcpStream(stream) => { - stream.shutdown(std::net::Shutdown::Both).unwrap(); - }, - ConnStream::File(stream) => { - stream.sync_all().unwrap(); - }, + Some(stream) => match stream { + ConnStream::TcpStream(stream) => { + stream.shutdown(std::net::Shutdown::Both).unwrap(); + } + ConnStream::File(stream) => { + stream.sync_all().unwrap(); } }, None => { @@ -232,14 +256,12 @@ impl Connection { pub fn close_outbound(self: &mut Self) { match &mut self.outbound_conn.file { - Some(stream) => { - match stream { - ConnStream::TcpStream(stream) => { - stream.shutdown(std::net::Shutdown::Both).unwrap(); - }, - ConnStream::File(stream) => { - stream.sync_all().unwrap(); - }, + Some(stream) => match stream { + ConnStream::TcpStream(stream) => { + stream.shutdown(std::net::Shutdown::Both).unwrap(); + } + ConnStream::File(stream) => { + stream.sync_all().unwrap(); } }, None => { diff --git a/crates/water_wasm/src/decoder.rs b/crates/water_wasm/src/decoder.rs index b55d3b2..3c608a6 100644 --- a/crates/water_wasm/src/decoder.rs +++ b/crates/water_wasm/src/decoder.rs @@ -21,5 +21,9 @@ impl Decoder for DefaultDecoder { } pub trait AsyncDecodeReader { - fn poll_read_decrypted(&mut self, stream: &mut S, buf: &mut [u8]) -> Result; -} \ No newline at end of file + fn poll_read_decrypted( + &mut self, + stream: &mut S, + buf: &mut [u8], + ) -> Result; +} diff --git a/crates/water_wasm/src/dialer.rs b/crates/water_wasm/src/dialer.rs index 1ea0777..7d61b33 100644 --- a/crates/water_wasm/src/dialer.rs +++ b/crates/water_wasm/src/dialer.rs @@ -1,6 +1,6 @@ use super::*; -use anyhow::{Ok, anyhow}; +use anyhow::{anyhow, Ok}; pub struct Dialer { pub file_conn: Connection, @@ -17,40 +17,47 @@ impl Dialer { pub fn dial(&mut self) -> Result { info!("[WASM] running in dial func..."); - + let mut fd: i32 = -1; - + // FIXME: hardcoded the filename for now, make it a config later fd = self.tcp_connect()?; - + if fd < 0 { eprintln!("failed to create connection to remote"); return Err(anyhow!("failed to create connection to remote")); } - - self.file_conn.set_outbound(fd, ConnStream::TcpStream(unsafe { std::net::TcpStream::from_raw_fd(fd) })); + + self.file_conn.set_outbound( + fd, + ConnStream::TcpStream(unsafe { std::net::TcpStream::from_raw_fd(fd) }), + ); Ok(fd) } fn tcp_connect(&self) -> Result { - let stream = StreamConfigV1::init(self.config.remote_address.clone(), self.config.remote_port, "CONNECT_REMOTE".to_string()); - + let stream = StreamConfigV1::init( + self.config.remote_address.clone(), + self.config.remote_port, + "CONNECT_REMOTE".to_string(), + ); + let encoded: Vec = bincode::serialize(&stream).expect("Failed to serialize"); - + let address = encoded.as_ptr() as u32; let size = encoded.len() as u32; - + let mut fd = -1; unsafe { // connect_tcp_unix(len, xxxx) fd = connect_tcp(address, size); }; - + if fd < 0 { return Err(anyhow!("failed to create listener")); } - + Ok(fd) } -} \ No newline at end of file +} diff --git a/crates/water_wasm/src/encoder.rs b/crates/water_wasm/src/encoder.rs index e73f662..46adf05 100644 --- a/crates/water_wasm/src/encoder.rs +++ b/crates/water_wasm/src/encoder.rs @@ -21,5 +21,8 @@ impl Encoder for DefaultEncoder { } pub trait AsyncEncodeWriter { - fn poll_write_encrypted(&mut self, stream: &mut S) -> Result; -} \ No newline at end of file + fn poll_write_encrypted( + &mut self, + stream: &mut S, + ) -> Result; +} diff --git a/crates/water_wasm/src/lib.rs b/crates/water_wasm/src/lib.rs index f7b60db..f16ca21 100644 --- a/crates/water_wasm/src/lib.rs +++ b/crates/water_wasm/src/lib.rs @@ -1,20 +1,19 @@ - // lib.rs // export all modules pub mod config; -pub mod dialer; pub mod connections; -pub mod version; -pub mod encoder; pub mod decoder; +pub mod dialer; +pub mod encoder; +pub mod version; // pub mod net; // pub mod listener_in_wasm; pub use config::*; -pub use dialer::*; pub use connections::*; -pub use encoder::*; pub use decoder::*; +pub use dialer::*; +pub use encoder::*; // pub use net::*; // pub use listener_in_wasm::*; @@ -29,12 +28,11 @@ use std::{ vec, }; -use tracing::{info, debug}; use bincode::{self}; +use tracing::{debug, info}; use anyhow::Result; -use serde::{Serialize, Deserialize}; - +use serde::{Deserialize, Serialize}; // TODO: move these to speicific implementations, shouldn't be in the crate lib // =================== WASM Imports ===================== @@ -42,4 +40,4 @@ extern "C" { // #[link_name = "create-listen"] pub fn create_listen(ptr: u32, size: u32) -> i32; pub fn connect_tcp(ptr: u32, size: u32) -> i32; -} \ No newline at end of file +} diff --git a/crates/water_wasm/src/version.rs b/crates/water_wasm/src/version.rs index a16fe68..5cfb839 100644 --- a/crates/water_wasm/src/version.rs +++ b/crates/water_wasm/src/version.rs @@ -1,3 +1,3 @@ // must have something like this in your WASM module, the following is just an example // #[export_name = "V0"] -// pub static V0: i32 = 0; \ No newline at end of file +// pub static V0: i32 = 0; diff --git a/examples/clients/cli/src/cli.rs b/examples/clients/cli/src/cli.rs index ef2aa76..6343263 100644 --- a/examples/clients/cli/src/cli.rs +++ b/examples/clients/cli/src/cli.rs @@ -1,9 +1,8 @@ +use water::globals::{CONFIG_WASM_PATH, MAIN, WASM_PATH}; use water::{config::WATERConfig, runtime}; -use water::globals::{WASM_PATH, MAIN, CONFIG_WASM_PATH}; use clap::Parser; - #[derive(Parser, Debug)] #[command(author, version, about, long_about = None)] struct Args { @@ -32,7 +31,6 @@ struct Args { debug: bool, } - impl From for WATERConfig { fn from(args: Args) -> Self { Self { @@ -66,9 +64,9 @@ pub fn execute(conf: WATERConfig) -> Result<(), anyhow::Error> { // // keep reading from stdin and call read and write function from water_client.stream // let mut buf = String::new(); // std::io::stdin().read_line(&mut buf)?; - + // water_client.write(buf.as_bytes())?; - + // let mut buf = vec![0; 1024]; // water_client.read(&mut buf)?; @@ -76,4 +74,4 @@ pub fn execute(conf: WATERConfig) -> Result<(), anyhow::Error> { // } Ok(()) -} \ No newline at end of file +} diff --git a/examples/clients/cli/src/main.rs b/examples/clients/cli/src/main.rs index 4996ecc..e21d360 100644 --- a/examples/clients/cli/src/main.rs +++ b/examples/clients/cli/src/main.rs @@ -1,7 +1,7 @@ +extern crate anyhow; extern crate clap; -extern crate tracing_subscriber; extern crate tracing; -extern crate anyhow; +extern crate tracing_subscriber; extern crate water; @@ -9,11 +9,8 @@ use tracing::Level; mod cli; - fn main() -> Result<(), anyhow::Error> { - tracing_subscriber::fmt() - .with_max_level(Level::INFO) - .init(); + tracing_subscriber::fmt().with_max_level(Level::INFO).init(); cli::parse_and_execute() } diff --git a/examples/water_bins/echo_client/src/async_socks5_listener.rs b/examples/water_bins/echo_client/src/async_socks5_listener.rs index 9977dd7..4904111 100644 --- a/examples/water_bins/echo_client/src/async_socks5_listener.rs +++ b/examples/water_bins/echo_client/src/async_socks5_listener.rs @@ -1,14 +1,14 @@ use super::*; use tokio::{ - io::{AsyncRead, AsyncWrite, AsyncReadExt, AsyncWriteExt}, + io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}, net::{TcpListener, TcpStream}, time, time::timeout, }; use tracing_subscriber::fmt::format; -use std::net::{ToSocketAddrs, TcpStream as StdTcpStream, SocketAddr}; +use std::net::{SocketAddr, TcpStream as StdTcpStream, ToSocketAddrs}; // ----------------------- Listener methods ----------------------- #[export_name = "v1_listen"] @@ -21,15 +21,22 @@ fn _listener_creation() -> Result { Ok(conf) => conf, Err(e) => { eprintln!("[WASM] > ERROR: {}", e); - return Err(std::io::Error::new(std::io::ErrorKind::Other, "failed to lock config")); + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + "failed to lock config", + )); } }; // FIXME: hardcoded the filename for now, make it a config later - let stream = StreamConfigV1::init(global_conn.config.local_address.clone(), global_conn.config.local_port, "LISTEN".to_string()); - + let stream = StreamConfigV1::init( + global_conn.config.local_address.clone(), + global_conn.config.local_port, + "LISTEN".to_string(), + ); + let encoded: Vec = bincode::serialize(&stream).expect("Failed to serialize"); - + let address = encoded.as_ptr() as u32; let size = encoded.len() as u32; @@ -39,11 +46,17 @@ fn _listener_creation() -> Result { }; if fd < 0 { - return Err(std::io::Error::new(std::io::ErrorKind::Other, "failed to create listener")); + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + "failed to create listener", + )); } - info!("[WASM] ready to start listening at {}:{}", global_conn.config.local_address, global_conn.config.local_port); - + info!( + "[WASM] ready to start listening at {}:{}", + global_conn.config.local_address, global_conn.config.local_port + ); + Ok(fd) } @@ -67,7 +80,7 @@ async fn wrapper() -> std::io::Result<()> { continue; } }; - + // Spawn a background task for each new connection. tokio::spawn(async move { eprintln!("[WASM] > CONNECTED"); @@ -84,56 +97,84 @@ async fn handle_incoming(mut stream: TcpStream) -> std::io::Result<()> { let mut buffer = [0; 512]; // Read the SOCKS5 greeting - let nbytes = stream.read(&mut buffer).await.expect("Failed to read from stream"); + let nbytes = stream + .read(&mut buffer) + .await + .expect("Failed to read from stream"); - println!("Received {} bytes: {:?}", nbytes, buffer[..nbytes].to_vec()); + println!("Received {} bytes: {:?}", nbytes, buffer[..nbytes].to_vec()); if nbytes < 2 || buffer[0] != 0x05 { eprintln!("Not a SOCKS5 request"); - return Err(std::io::Error::new(std::io::ErrorKind::InvalidInput, "Not a SOCKS5 request")); + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "Not a SOCKS5 request", + )); } let nmethods = buffer[1] as usize; if nbytes < 2 + nmethods { eprintln!("Incomplete SOCKS5 greeting"); - return Err(std::io::Error::new(std::io::ErrorKind::InvalidInput, "Incomplete SOCKS5 greeting")); + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "Incomplete SOCKS5 greeting", + )); } // For simplicity, always use "NO AUTHENTICATION REQUIRED" - stream.write_all(&[0x05, 0x00]).await.expect("Failed to write to stream"); + stream + .write_all(&[0x05, 0x00]) + .await + .expect("Failed to write to stream"); // Read the actual request - let nbytes = stream.read(&mut buffer).await.expect("Failed to read from stream"); + let nbytes = stream + .read(&mut buffer) + .await + .expect("Failed to read from stream"); println!("Received {} bytes: {:?}", nbytes, buffer[..nbytes].to_vec()); if nbytes < 7 || buffer[0] != 0x05 || buffer[1] != 0x01 { println!("Invalid SOCKS5 request"); - return Err(std::io::Error::new(std::io::ErrorKind::InvalidInput, "Invalid SOCKS5 request")); + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "Invalid SOCKS5 request", + )); } // Extract address and port let addr: SocketAddr = match buffer[3] { - 0x01 => { // IPv4 + 0x01 => { + // IPv4 if nbytes < 10 { eprintln!("Incomplete request for IPv4 address"); - return Err(std::io::Error::new(std::io::ErrorKind::InvalidInput, "Incomplete request for IPv4 address")); + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "Incomplete request for IPv4 address", + )); } let addr = std::net::Ipv4Addr::new(buffer[4], buffer[5], buffer[6], buffer[7]); let port = u16::from_be_bytes([buffer[8], buffer[9]]); SocketAddr::from((addr, port)) - }, - 0x03 => { // Domain name + } + 0x03 => { + // Domain name let domain_length = buffer[4] as usize; if nbytes < domain_length + 5 { eprintln!("Incomplete request for domain name"); - return Err(std::io::Error::new(std::io::ErrorKind::InvalidInput, "Incomplete request for domain name")); + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "Incomplete request for domain name", + )); } - let domain = std::str::from_utf8(&buffer[5..5+domain_length]).expect("Invalid domain string"); + let domain = + std::str::from_utf8(&buffer[5..5 + domain_length]).expect("Invalid domain string"); println!("Domain: {}", domain); - let port = u16::from_be_bytes([buffer[5+domain_length], buffer[5+domain_length+1]]); + let port = + u16::from_be_bytes([buffer[5 + domain_length], buffer[5 + domain_length + 1]]); println!("Port: {}", port); @@ -145,24 +186,33 @@ async fn handle_incoming(mut stream: TcpStream) -> std::io::Result<()> { Some(addr) => addr, None => { eprintln!("Domain resolved, but no addresses found for {}", domain); - return Err(std::io::Error::new(std::io::ErrorKind::InvalidInput, format!("Domain resolved, but no addresses found for {}", domain))); + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + format!("Domain resolved, but no addresses found for {}", domain), + )); } }, Err(e) => { eprintln!("Failed to resolve domain {}: {}", domain, e); - return Err(std::io::Error::new(std::io::ErrorKind::InvalidInput, format!("Failed to resolve domain {}: {}", domain, e))); + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + format!("Failed to resolve domain {}: {}", domain, e), + )); } } - }, + } _ => { eprintln!("Address type not supported"); - return Err(std::io::Error::new(std::io::ErrorKind::InvalidInput, "Address type not supported")); + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "Address type not supported", + )); } }; // NOTE: create a new Dialer to dial any target address as it wants to // Add more features later -- connect to target thru rules (direct / server) - + // Connect to target address let mut tcp_dialer = Dialer::new(); tcp_dialer.config.remote_address = addr.ip().to_string(); @@ -174,14 +224,20 @@ async fn handle_incoming(mut stream: TcpStream) -> std::io::Result<()> { ConnStream::TcpStream(s) => s, _ => { eprintln!("Failed to get outbound tcp stream"); - return Err(std::io::Error::new(std::io::ErrorKind::InvalidInput, "Failed to get outbound tcp stream")); + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "Failed to get outbound tcp stream", + )); } }; - target_stream.set_nonblocking(true).expect("Failed to set non-blocking"); + target_stream + .set_nonblocking(true) + .expect("Failed to set non-blocking"); + + let target_stream = + TcpStream::from_std(target_stream).expect("Failed to convert to tokio stream"); - let target_stream = TcpStream::from_std(target_stream).expect("Failed to convert to tokio stream"); - // Construct the response based on the target address let response = match addr { SocketAddr::V4(a) => { @@ -189,16 +245,19 @@ async fn handle_incoming(mut stream: TcpStream) -> std::io::Result<()> { r.extend_from_slice(&a.ip().octets()); r.extend_from_slice(&a.port().to_be_bytes()); r - }, + } SocketAddr::V6(a) => { let mut r = vec![0x05, 0x00, 0x00, 0x04]; r.extend_from_slice(&a.ip().octets()); r.extend_from_slice(&a.port().to_be_bytes()); r - }, + } }; - stream.write_all(&response).await.expect("Failed to write to stream"); + stream + .write_all(&response) + .await + .expect("Failed to write to stream"); let (mut client_read, mut client_write) = tokio::io::split(stream); let (mut target_read, mut target_write) = tokio::io::split(target_stream); @@ -219,7 +278,7 @@ async fn handle_incoming(mut stream: TcpStream) -> std::io::Result<()> { } } }; - + let target_to_client = async move { let mut buffer = vec![0; 4096]; loop { @@ -241,4 +300,4 @@ async fn handle_incoming(mut stream: TcpStream) -> std::io::Result<()> { tokio::join!(client_to_target, target_to_client); Ok(()) -} \ No newline at end of file +} diff --git a/examples/water_bins/echo_client/src/lib.rs b/examples/water_bins/echo_client/src/lib.rs index 1a41ce8..2b9903d 100644 --- a/examples/water_bins/echo_client/src/lib.rs +++ b/examples/water_bins/echo_client/src/lib.rs @@ -1,31 +1,31 @@ // =================== Imports & Modules ===================== use std::{ io::{self, Read, Write}, - os::fd::IntoRawFd, os::fd::FromRawFd, + os::fd::IntoRawFd, sync::Mutex, vec, }; use tokio::{ - io::{AsyncRead, AsyncWrite, AsyncReadExt, AsyncWriteExt}, + io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}, net::{TcpListener, TcpStream}, time, time::timeout, }; +use bincode::{self}; use lazy_static::lazy_static; use serde_json; -use tracing::{info, Level, debug}; +use tracing::{debug, info, Level}; use tracing_subscriber; -use bincode::{self}; +use anyhow::{Context, Result}; +use serde::{de::DeserializeOwned, Deserialize, Serialize}; use std::fs::File; use std::mem; -use anyhow::{Context, Result}; -use serde::{Serialize, Deserialize, de::DeserializeOwned}; -use tokio_util::codec::{AnyDelimiterCodec, Framed, FramedParts}; use std::time::Duration; +use tokio_util::codec::{AnyDelimiterCodec, Framed, FramedParts}; use water_wasm::*; @@ -41,11 +41,9 @@ lazy_static! { #[export_name = "_init"] pub fn _init(debug: bool) { if debug { - tracing_subscriber::fmt() - .with_max_level(Level::INFO) - .init(); + tracing_subscriber::fmt().with_max_level(Level::INFO).init(); } - + info!("[WASM] running in _init"); } @@ -59,7 +57,10 @@ pub fn _water_bridging(fd: i32) { } }; - global_dialer.file_conn.set_inbound(fd, ConnStream::File(unsafe { std::fs::File::from_raw_fd(fd) })); + global_dialer.file_conn.set_inbound( + fd, + ConnStream::File(unsafe { std::fs::File::from_raw_fd(fd) }), + ); } #[export_name = "_set_outbound"] @@ -72,7 +73,10 @@ pub fn _water_bridging_out(fd: i32) { } }; - global_dialer.file_conn.set_outbound(fd, ConnStream::TcpStream(unsafe { std::net::TcpStream::from_raw_fd(fd) })); + global_dialer.file_conn.set_outbound( + fd, + ConnStream::TcpStream(unsafe { std::net::TcpStream::from_raw_fd(fd) }), + ); } #[export_name = "_config"] @@ -98,12 +102,15 @@ pub fn _process_config(fd: i32) { return; } }; - + // global_dialer.file_conn.config = config.clone(); global_dialer.config = config; - }, + } Err(e) => { - eprintln!("[WASM] > WASM _process_config falied reading path ERROR: {}", e); + eprintln!( + "[WASM] > WASM _process_config falied reading path ERROR: {}", + e + ); return; } }; @@ -119,7 +126,10 @@ pub fn _write(bytes_write: i64) -> i64 { } }; - match global_dialer.file_conn._write_2_outbound(&mut DefaultEncoder, bytes_write) { + match global_dialer + .file_conn + ._write_2_outbound(&mut DefaultEncoder, bytes_write) + { Ok(n) => n, Err(e) => { eprintln!("[WASM] > ERROR in _write: {}", e); @@ -138,7 +148,10 @@ pub fn _read() -> i64 { } }; - match global_dialer.file_conn._read_from_outbound(&mut DefaultDecoder) { + match global_dialer + .file_conn + ._read_from_outbound(&mut DefaultDecoder) + { Ok(n) => n, Err(e) => { eprintln!("[WASM] > ERROR in _read: {}", e); @@ -158,7 +171,7 @@ pub fn _dial() { }; match global_dialer.dial() { - Ok(_) => {}, + Ok(_) => {} Err(e) => { eprintln!("[WASM] > ERROR in _dial: {}", e); return; diff --git a/examples/water_bins/ss_client_wasm_v1/src/aead.rs b/examples/water_bins/ss_client_wasm_v1/src/aead.rs index 89bb8ee..55431ec 100644 --- a/examples/water_bins/ss_client_wasm_v1/src/aead.rs +++ b/examples/water_bins/ss_client_wasm_v1/src/aead.rs @@ -1,10 +1,10 @@ use super::*; -use bytes::{BufMut, BytesMut, Bytes}; +use bytes::{BufMut, Bytes, BytesMut}; -use tokio::io::ReadBuf; use byte_string::ByteStr; use std::io::ErrorKind; +use tokio::io::ReadBuf; use std::task::{self, Poll}; @@ -92,8 +92,12 @@ impl DecryptedReader { self.state = DecryptReadState::BufferedData { pos: 0 }; } DecryptReadState::BufferedData { ref mut pos } => { - info!("buffered data, pos: {}, buffer len: {}", pos, self.buffer.len()); - + info!( + "buffered data, pos: {}, buffer len: {}", + pos, + self.buffer.len() + ); + if *pos < self.buffer.len() { let buffered = &self.buffer[*pos..]; @@ -113,7 +117,12 @@ impl DecryptedReader { } } - fn poll_read_salt(&mut self, cx: &mut task::Context<'_>, stream: &mut S, key: &[u8]) -> Poll> + fn poll_read_salt( + &mut self, + cx: &mut task::Context<'_>, + stream: &mut S, + key: &[u8], + ) -> Poll> where S: AsyncRead + Unpin + ?Sized, { @@ -125,7 +134,7 @@ impl DecryptedReader { } let salt = &self.buffer[..salt_len]; - + // #442 Remember salt in filter after first successful decryption. // If we check salt right here will allow attacker to flood our filter and eventually block all of our legitimate clients' requests. self.salt = Some(Bytes::copy_from_slice(salt)); @@ -139,7 +148,11 @@ impl DecryptedReader { Ok(()).into() } - fn poll_read_length(&mut self, cx: &mut task::Context<'_>, stream: &mut S) -> Poll>> + fn poll_read_length( + &mut self, + cx: &mut task::Context<'_>, + stream: &mut S, + ) -> Poll>> where S: AsyncRead + Unpin + ?Sized, { @@ -189,7 +202,12 @@ impl DecryptedReader { Ok(()).into() } - fn poll_read_exact(&mut self, cx: &mut task::Context<'_>, stream: &mut S, size: usize) -> Poll> + fn poll_read_exact( + &mut self, + cx: &mut task::Context<'_>, + stream: &mut S, + size: usize, + ) -> Poll> where S: AsyncRead + Unpin + ?Sized, { @@ -199,15 +217,16 @@ impl DecryptedReader { let remaining = size - self.buffer.len(); debug!("buffer was {:?}", ByteStr::new(&self.buffer)); - + let buffer = &mut self.buffer.chunk_mut()[..remaining]; - let mut read_buf = - ReadBuf::uninit(unsafe { slice::from_raw_parts_mut(buffer.as_mut_ptr() as *mut _, remaining) }); + let mut read_buf = ReadBuf::uninit(unsafe { + slice::from_raw_parts_mut(buffer.as_mut_ptr() as *mut _, remaining) + }); ready!(Pin::new(&mut *stream).poll_read(cx, &mut read_buf))?; let n = read_buf.filled().len(); - + if n == 0 { if !self.buffer.is_empty() { return Err(ErrorKind::UnexpectedEof.into()).into(); @@ -215,7 +234,7 @@ impl DecryptedReader { return Ok(0).into(); } } - + unsafe { self.buffer.advance_mut(n); } @@ -330,7 +349,8 @@ impl EncryptedWriter { } EncryptWriteState::Writing { ref mut pos } => { while *pos < self.buffer.len() { - let n = ready!(Pin::new(&mut *stream).poll_write(cx, &self.buffer[*pos..]))?; + let n = + ready!(Pin::new(&mut *stream).poll_write(cx, &self.buffer[*pos..]))?; if n == 0 { return Err(ErrorKind::UnexpectedEof.into()).into(); } @@ -346,4 +366,4 @@ impl EncryptedWriter { } } } -} \ No newline at end of file +} diff --git a/examples/water_bins/ss_client_wasm_v1/src/client.rs b/examples/water_bins/ss_client_wasm_v1/src/client.rs index 042c8bd..036b614 100644 --- a/examples/water_bins/ss_client_wasm_v1/src/client.rs +++ b/examples/water_bins/ss_client_wasm_v1/src/client.rs @@ -25,7 +25,11 @@ where S: AsyncRead + AsyncWrite + Unpin, { #[inline] - fn poll_read(self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &mut ReadBuf<'_>) -> Poll> { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { #[allow(unused_mut)] let mut this = self.project(); return this.stream.poll_read_decrypted(cx, buf).map_err(Into::into); @@ -36,7 +40,11 @@ impl AsyncWrite for ProxyClientStream where S: AsyncRead + AsyncWrite + Unpin, { - fn poll_write(self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &[u8]) -> Poll> { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + buf: &[u8], + ) -> Poll> { let this = self.project(); loop { @@ -56,7 +64,10 @@ where return Ok(buf.len()).into(); } ProxyClientStreamWriteState::Connected => { - return this.stream.poll_write_encrypted(cx, buf).map_err(Into::into); + return this + .stream + .poll_write_encrypted(cx, buf) + .map_err(Into::into); } } } @@ -68,25 +79,29 @@ where } #[inline] - fn poll_shutdown(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { + fn poll_shutdown( + self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + ) -> Poll> { self.project().stream.poll_shutdown(cx).map_err(Into::into) } } -impl ProxyClientStream +impl ProxyClientStream where S: AsyncRead + AsyncWrite + Unpin, { - pub fn from_stream(stream: S, addr: A, method: CipherKind, key: &[u8]) -> ProxyClientStream + pub fn from_stream( + stream: S, + addr: A, + method: CipherKind, + key: &[u8], + ) -> ProxyClientStream where A: Into
, { let addr = addr.into(); - let stream = CryptoStream::from_stream_with_identity( - stream, - method, - key, - ); + let stream = CryptoStream::from_stream_with_identity(stream, method, key); let reader_state = ProxyClientStreamReadState::Established; @@ -96,7 +111,6 @@ where reader_state, } } - } #[inline] @@ -122,4 +136,4 @@ pub fn make_first_packet_buffer(method: CipherKind, addr: &Address, buf: &[u8]) info!("[after making first packet] {:?}", slice); buffer -} \ No newline at end of file +} diff --git a/examples/water_bins/ss_client_wasm_v1/src/crypto_io.rs b/examples/water_bins/ss_client_wasm_v1/src/crypto_io.rs index b4d3995..63819ff 100644 --- a/examples/water_bins/ss_client_wasm_v1/src/crypto_io.rs +++ b/examples/water_bins/ss_client_wasm_v1/src/crypto_io.rs @@ -4,7 +4,8 @@ use std::task::{self}; lazy_static! { pub static ref ENC_CIPHER: Mutex = Mutex::new(Cipher::new(CipherKind::NONE, &[], &[])); - pub static ref DEC_CIPHER: Mutex = Mutex::new(DecryptedReader::new(CipherKind::NONE, &[])); + pub static ref DEC_CIPHER: Mutex = + Mutex::new(DecryptedReader::new(CipherKind::NONE, &[])); } /// AEAD Protocol Error @@ -89,7 +90,7 @@ where .. } = *self; ready!(dec.poll_read_decrypted(cx, stream, buf))?; - + if !*has_handshaked && dec.handshaked() { *has_handshaked = true; } @@ -114,7 +115,8 @@ where ref mut stream, .. } = *self; - enc.poll_write_encrypted(cx, stream, buf).map_err(Into::into) + enc.poll_write_encrypted(cx, stream, buf) + .map_err(Into::into) } } @@ -125,17 +127,20 @@ where /// Polls `flush` on the underlying stream #[inline] pub fn poll_flush(&mut self, cx: &mut task::Context<'_>) -> Poll> { - Pin::new(&mut self.stream).poll_flush(cx).map_err(Into::into) + Pin::new(&mut self.stream) + .poll_flush(cx) + .map_err(Into::into) } /// Polls `shutdown` on the underlying stream #[inline] pub fn poll_shutdown(&mut self, cx: &mut task::Context<'_>) -> Poll> { - Pin::new(&mut self.stream).poll_shutdown(cx).map_err(Into::into) + Pin::new(&mut self.stream) + .poll_shutdown(cx) + .map_err(Into::into) } } - impl CryptoStream { pub fn new(stream: S, method: CipherKind, key: &[u8], nonce: &[u8]) -> CryptoStream { CryptoStream { @@ -148,11 +153,7 @@ impl CryptoStream { } /// Create a new CryptoStream with the underlying stream connection - pub fn from_stream_with_identity( - stream: S, - method: CipherKind, - key: &[u8], - ) -> CryptoStream { + pub fn from_stream_with_identity(stream: S, method: CipherKind, key: &[u8]) -> CryptoStream { let category = method.category(); let prev_len = method.salt_len(); @@ -195,4 +196,4 @@ pub fn generate_nonce(method: CipherKind, nonce: &mut [u8], unique: bool) { pub fn create_cipher(key: &[u8], nonce: &[u8], kind: CipherKind) -> Cipher { let cipher = Cipher::new(kind, key, nonce); return cipher; -} \ No newline at end of file +} diff --git a/examples/water_bins/ss_client_wasm_v1/src/lib.rs b/examples/water_bins/ss_client_wasm_v1/src/lib.rs index 6a81d8a..f61fda0 100644 --- a/examples/water_bins/ss_client_wasm_v1/src/lib.rs +++ b/examples/water_bins/ss_client_wasm_v1/src/lib.rs @@ -33,27 +33,23 @@ use tracing::{debug, info, Level}; use tracing_subscriber; // =================== MODULES =================== -pub mod water; pub mod aead; -pub mod socks5; -pub mod crypto_io; pub mod client; +pub mod crypto_io; +pub mod socks5; pub mod utils; +pub mod water; // =================== DEPENDENCIES FROM MODULES =================== +use aead::{DecryptedReader, EncryptedWriter}; +use client::*; +use crypto_io::*; use socks5::*; use utils::*; -use crypto_io::*; -use client::*; use water_wasm::*; -use aead::{DecryptedReader, EncryptedWriter}; // =================== SHADOWSOCKS_CRYPTO =================== -use shadowsocks_crypto::{ - v1::Cipher, - v1::random_iv_or_salt, - CipherKind, -}; +use shadowsocks_crypto::{v1::random_iv_or_salt, v1::Cipher, CipherKind}; // Export version info #[export_name = "V1"] diff --git a/examples/water_bins/ss_client_wasm_v1/src/socks5.rs b/examples/water_bins/ss_client_wasm_v1/src/socks5.rs index 5db09ad..230af70 100644 --- a/examples/water_bins/ss_client_wasm_v1/src/socks5.rs +++ b/examples/water_bins/ss_client_wasm_v1/src/socks5.rs @@ -48,7 +48,9 @@ pub fn get_addr_len(atyp: &Address) -> usize { pub fn write_address(addr: &Address, buf: &mut B) { match *addr { Address::SocketAddress(ref addr) => write_socket_address(addr, buf), - Address::DomainNameAddress(ref dnaddr, ref port) => write_domain_name_address(dnaddr, *port, buf), + Address::DomainNameAddress(ref dnaddr, ref port) => { + write_domain_name_address(dnaddr, *port, buf) + } } } @@ -101,69 +103,111 @@ impl Socks5Handler { pub async fn socks5_greet(self: &mut Self) -> Result<(), std::io::Error> { // Read the SOCKS5 greeting - self.stream.read_buf(&mut self.buffer).await.expect("Failed to read from stream"); - - info!("SOCKS5 greeting: Received {} bytes: {:?}", self.buffer.len(), self.buffer.to_vec()); - + self.stream + .read_buf(&mut self.buffer) + .await + .expect("Failed to read from stream"); + + info!( + "SOCKS5 greeting: Received {} bytes: {:?}", + self.buffer.len(), + self.buffer.to_vec() + ); + if self.buffer.len() < 2 || self.buffer[0] != 0x05 { eprintln!("Not a SOCKS5 request"); - return Err(io::Error::new(io::ErrorKind::InvalidInput, "Not a SOCKS5 request")); + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "Not a SOCKS5 request", + )); } - + let nmethods = self.buffer[1] as usize; if self.buffer.len() < 2 + nmethods { eprintln!("Incomplete SOCKS5 greeting"); - return Err(io::Error::new(io::ErrorKind::InvalidInput, "Incomplete SOCKS5 greeting")); + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "Incomplete SOCKS5 greeting", + )); } - + // For simplicity, always use "NO AUTHENTICATION REQUIRED" - self.stream.write_all(&[0x05, 0x00]).await.expect("Failed to write to stream"); - + self.stream + .write_all(&[0x05, 0x00]) + .await + .expect("Failed to write to stream"); + self.buffer.clear(); Ok(()) } - + pub async fn socks5_get_target(self: &mut Self) -> Result { // Read the actual request - self.stream.read_buf(&mut self.buffer).await.expect("Failed to read from stream"); - - info!("Actual SOCKS5 request: Received {} bytes: {:?}", self.buffer.len(), self.buffer.to_vec()); - + self.stream + .read_buf(&mut self.buffer) + .await + .expect("Failed to read from stream"); + + info!( + "Actual SOCKS5 request: Received {} bytes: {:?}", + self.buffer.len(), + self.buffer.to_vec() + ); + if self.buffer.len() < 7 || self.buffer[0] != 0x05 || self.buffer[1] != 0x01 { println!("Invalid SOCKS5 request"); - return Err(io::Error::new(io::ErrorKind::InvalidInput, "Invalid SOCKS5 request")); + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "Invalid SOCKS5 request", + )); } - + // Extract address and port let target_addr: Address = match self.buffer[3] { - 0x01 => { // IPv4 + 0x01 => { + // IPv4 if self.buffer.len() < 10 { eprintln!("Incomplete request for IPv4 address"); - return Err(io::Error::new(io::ErrorKind::InvalidInput, "Incomplete request for IPv4 address")); + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "Incomplete request for IPv4 address", + )); } - let addr = std::net::Ipv4Addr::new(self.buffer[4], self.buffer[5], self.buffer[6], self.buffer[7]); + let addr = std::net::Ipv4Addr::new( + self.buffer[4], + self.buffer[5], + self.buffer[6], + self.buffer[7], + ); let port = (&self.buffer[8..10]).get_u16(); Address::SocketAddress(SocketAddr::from((addr, port))) - }, - 0x03 => { // Domain name + } + 0x03 => { + // Domain name let domain_length = self.buffer[4] as usize; if self.buffer.len() < domain_length + 5 { eprintln!("Incomplete request for domain name"); - return Err(io::Error::new(io::ErrorKind::InvalidInput, "Incomplete request for domain name")); + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "Incomplete request for domain name", + )); } - let domain = std::str::from_utf8(&self.buffer[5..5+domain_length]).expect("Invalid domain string"); - - - let port = (&self.buffer[5+domain_length..5+domain_length+2]).get_u16(); - + let domain = std::str::from_utf8(&self.buffer[5..5 + domain_length]) + .expect("Invalid domain string"); + + let port = (&self.buffer[5 + domain_length..5 + domain_length + 2]).get_u16(); + info!("Requested Domain:port: {}:{}", domain, port); - + Address::DomainNameAddress(domain.to_string(), port) - }, + } _ => { eprintln!("Address type not supported"); - return Err(io::Error::new(io::ErrorKind::InvalidInput, "Address type not supported")); + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "Address type not supported", + )); } }; @@ -172,7 +216,10 @@ impl Socks5Handler { pub async fn socks5_response(self: &mut Self, buf: &mut BytesMut) { // Send the response header - self.stream.write_all(&buf).await.expect("Failed to write back to client's stream"); + self.stream + .write_all(&buf) + .await + .expect("Failed to write back to client's stream"); info!("Responsed header to SOCKS5 client: {:?}", buf.to_vec()); } -} \ No newline at end of file +} diff --git a/examples/water_bins/ss_client_wasm_v1/src/utils.rs b/examples/water_bins/ss_client_wasm_v1/src/utils.rs index 2b25533..6f95a3a 100644 --- a/examples/water_bins/ss_client_wasm_v1/src/utils.rs +++ b/examples/water_bins/ss_client_wasm_v1/src/utils.rs @@ -165,8 +165,7 @@ where Ok(..) => { info!( "copy bidirection ends, a_to_b: {:?}, b_to_a: {:?}", - self.a_to_b, - self.b_to_a + self.a_to_b, self.b_to_a ); } Err(ref err) => { diff --git a/examples/water_bins/ss_client_wasm_v1/src/water.rs b/examples/water_bins/ss_client_wasm_v1/src/water.rs index 3c15b75..9c424f0 100644 --- a/examples/water_bins/ss_client_wasm_v1/src/water.rs +++ b/examples/water_bins/ss_client_wasm_v1/src/water.rs @@ -5,11 +5,9 @@ use bytes::{BufMut, BytesMut}; #[export_name = "_init"] pub fn _init(debug: bool) { if debug { - tracing_subscriber::fmt() - .with_max_level(Level::INFO) - .init(); + tracing_subscriber::fmt().with_max_level(Level::INFO).init(); } - + info!("[WASM] running in _init"); } @@ -36,12 +34,15 @@ pub fn _process_config(fd: i32) { return; } }; - + // global_dialer.file_conn.config = config.clone(); global_dialer.config = config; - }, + } Err(e) => { - eprintln!("[WASM] > WASM _process_config falied reading path ERROR: {}", e); + eprintln!( + "[WASM] > WASM _process_config falied reading path ERROR: {}", + e + ); return; } }; @@ -75,7 +76,7 @@ async fn _start_listen() -> std::io::Result<()> { continue; } }; - + // Spawn a background task for each new connection. tokio::spawn(async move { eprintln!("[WASM] > CONNECTED"); @@ -92,12 +93,15 @@ async fn _handle_connection(stream: TcpStream) -> std::io::Result<()> { let mut inbound_con = Socks5Handler::new(stream); inbound_con.socks5_greet().await.expect("Failed to greet"); - let target_addr = inbound_con.socks5_get_target().await.expect("Failed to get target address"); + let target_addr = inbound_con + .socks5_get_target() + .await + .expect("Failed to get target address"); let server_stream = _dial_server().expect("Failed to dial to SS-Server"); - + // FIXME: hardcoded server ip:address for now + only support connection with ip:port let server_addr = Address::SocketAddress(SocketAddr::from(([127, 0, 0, 1], 8388))); - + // Constructing the response header let mut buf = BytesMut::with_capacity(server_addr.serialized_len()); buf.put_slice(&[consts::SOCKS5_VERSION, consts::SOCKS5_REPLY_SUCCEEDED, 0x00]); @@ -106,17 +110,18 @@ async fn _handle_connection(stream: TcpStream) -> std::io::Result<()> { inbound_con.socks5_response(&mut buf).await; // FIXME: hardcoded the key which derived from the password: "Test!23" - let key = [128, 218, 128, 160, 125, 72, 115, 9, 187, 165, 163, 169, 92, 177, 35, 201, 49, 245, 92, 203, 57, 152, 63, 149, 108, 132, 60, 128, 201, 206, 82, 226]; + let key = [ + 128, 218, 128, 160, 125, 72, 115, 9, 187, 165, 163, 169, 92, 177, 35, 201, 49, 245, 92, + 203, 57, 152, 63, 149, 108, 132, 60, 128, 201, 206, 82, 226, + ]; // creating the client proxystream -- contains cryptostream with both AsyncRead and AsyncWrite implemented let mut proxy = ProxyClientStream::from_stream(server_stream, target_addr, CIPHER_METHOD, &key); - match copy_encrypted_bidirectional(CIPHER_METHOD, &mut proxy, &mut inbound_con.stream).await - { + match copy_encrypted_bidirectional(CIPHER_METHOD, &mut proxy, &mut inbound_con.stream).await { Ok((wn, rn)) => { info!( "tcp tunnel (proxied) closed, L2R {} bytes, R2L {} bytes", - rn, - wn + rn, wn ); } Err(err) => { @@ -141,13 +146,19 @@ pub fn _dial_server() -> Result { ConnStream::TcpStream(s) => s, _ => { eprintln!("Failed to get outbound tcp stream"); - return Err(std::io::Error::new(std::io::ErrorKind::InvalidInput, "Failed to get outbound tcp stream")); + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "Failed to get outbound tcp stream", + )); } }; // NOTE: can convert to a async tokio TcpStream if wanted / needed - server_stream.set_nonblocking(true).expect("Failed to set non-blocking"); - let server_stream = TcpStream::from_std(server_stream).expect("Failed to convert to tokio stream"); + server_stream + .set_nonblocking(true) + .expect("Failed to set non-blocking"); + let server_stream = + TcpStream::from_std(server_stream).expect("Failed to convert to tokio stream"); info!("[Connected] to SS-Server"); @@ -163,20 +174,26 @@ pub fn _direct_connect() { let mut tcp_dialer = Dialer::new(); tcp_dialer.config.remote_address = addr.ip().to_string(); tcp_dialer.config.remote_port = addr.port() as u32; - + let tcp_fd = tcp_dialer.dial().expect("Failed to dial"); - + let server_stream = match tcp_dialer.file_conn.outbound_conn.file.unwrap() { ConnStream::TcpStream(s) => s, _ => { eprintln!("Failed to get outbound tcp stream"); - return Err(std::io::Error::new(std::io::ErrorKind::InvalidInput, "Failed to get outbound tcp stream")); + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "Failed to get outbound tcp stream", + )); } }; - - server_stream.set_nonblocking(true).expect("Failed to set non-blocking"); - - let server_stream = TcpStream::from_std(server_stream).expect("Failed to convert to tokio stream"); + + server_stream + .set_nonblocking(true) + .expect("Failed to set non-blocking"); + + let server_stream = + TcpStream::from_std(server_stream).expect("Failed to convert to tokio stream"); } } @@ -185,15 +202,22 @@ pub fn _listener_creation() -> Result { Ok(conf) => conf, Err(e) => { eprintln!("[WASM] > ERROR: {}", e); - return Err(std::io::Error::new(std::io::ErrorKind::Other, "failed to lock config")); + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + "failed to lock config", + )); } }; // FIXME: hardcoded the filename for now, make it a config later - let stream = StreamConfigV1::init(global_conn.config.local_address.clone(), global_conn.config.local_port, "LISTEN".to_string()); - + let stream = StreamConfigV1::init( + global_conn.config.local_address.clone(), + global_conn.config.local_port, + "LISTEN".to_string(), + ); + let encoded: Vec = bincode::serialize(&stream).expect("Failed to serialize"); - + let address = encoded.as_ptr() as u32; let size = encoded.len() as u32; @@ -203,10 +227,16 @@ pub fn _listener_creation() -> Result { }; if fd < 0 { - return Err(std::io::Error::new(std::io::ErrorKind::Other, "failed to create listener")); + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + "failed to create listener", + )); } - info!("[WASM] ready to start listening at {}:{}", global_conn.config.local_address, global_conn.config.local_port); - + info!( + "[WASM] ready to start listening at {}:{}", + global_conn.config.local_address, global_conn.config.local_port + ); + Ok(fd) -} \ No newline at end of file +}