diff --git a/crates/macros/Cargo.toml b/crates/macros/Cargo.toml new file mode 100644 index 0000000..7906ad8 --- /dev/null +++ b/crates/macros/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "water_macros" +version = "0.0.1" +authors.workspace = true +description.workspace = true +edition.workspace = true + +[lib] +proc-macro = true +doctest=false + +[features] + +[dependencies] +proc-macro2 = "1.0.7" +quote = "1" +lazy_static = "~1.4.0" +syn = { version = "2.0", features = ["full"] } diff --git a/crates/macros/src/lib.rs b/crates/macros/src/lib.rs new file mode 100644 index 0000000..463771b --- /dev/null +++ b/crates/macros/src/lib.rs @@ -0,0 +1,131 @@ +#[allow(unused_extern_crates)] +extern crate proc_macro; +extern crate proc_macro2; + +extern crate quote; +extern crate syn; + +mod v0; +use v0::{entrypoint, impl_water_dial, impl_water_listen, impl_water_wrap}; + +use proc_macro::TokenStream; +use syn::{parse_macro_input, DeriveInput}; + +/// Marks async function to be executed by the selected runtime. This macro +/// helps set up a `Runtime` without requiring the user to use +/// [Runtime](../tokio/runtime/struct.Runtime.html) or +/// [Builder](../tokio/runtime/struct.Builder.html) directly. +/// +/// Note: This macro is designed to be simplistic and targets applications that +/// do not require a complex setup. If the provided functionality is not +/// sufficient, you may be interested in using +/// [Builder](../tokio/runtime/struct.Builder.html), which provides a more +/// powerful interface. +/// +/// Note: This macro can be used on any function and not just the `main` +/// function. Using it on a non-main function makes the function behave as if it +/// was synchronous by starting a new runtime each time it is called. If the +/// function is called often, it is preferable to create the runtime using the +/// runtime builder so the runtime can be reused across calls. +/// +/// # Non-worker async function +/// +/// Note that the async function marked with this macro does not run as a +/// worker. The expectation is that other tasks are spawned by the function here. +/// Awaiting on other futures from the function provided here will not +/// perform as fast as those spawned as workers. +/// +/// ``` +/// #[water_macros_v0::entry] +/// # fn entry() {} +/// ``` +/// ## Usage +/// +/// ### Using the multi-thread runtime +/// +/// ```rust +/// #[water_macros_v0::entry] +/// async fn main() { +/// println!("Hello world"); +/// } +/// ``` +/// +/// Equivalent code not using `#[water_macros_v0::entry]` +/// +/// ```rust +/// fn entry() { +/// tokio::runtime::Builder::new_multi_thread() +/// .enable_all() +/// .build() +/// .unwrap() +/// .block_on(async { +/// println!("Hello world"); +/// }) +/// } +/// ``` +/// +/// ### Rename package +/// +/// ```rust +/// use water_macros_v0 as wtr0; +/// +/// #[water_macros_v0::entry(crate = "wtr0")] +/// fn entry() { +/// println!("Hello world"); +/// } +/// ``` +/// +/// Equivalent code not using `#[tokio::main]` +/// +/// ```rust +/// use water_macros_v0 as wtr0; +/// +/// fn entry() { +/// tokio1::runtime::Builder::new_multi_thread() +/// .enable_all() +/// .build() +/// .unwrap() +/// .block_on(async { +/// println!("Hello world"); +/// }) +/// } +/// ``` +#[proc_macro_attribute] +// #[cfg(not(test))] // Work around for rust-lang/rust#62127 +pub fn entry(args: TokenStream, item: TokenStream) -> TokenStream { + entrypoint(args.into(), item.into()).into() +} + +// // Disabled for now since Testing with C interface doesn't make a lot if sense yet. +// #[cfg(test)] +// #[proc_macro_attribute] +// pub fn test(args: TokenStream, item: TokenStream) -> TokenStream { +// entry::test(args.into(), item.into()).into() +// } + +#[proc_macro_derive(WaterDialer)] +pub fn water_dial_macro(input: TokenStream) -> TokenStream { + // Parse the string representation + let ast = parse_macro_input!(input as DeriveInput); + + // Build the impl + impl_water_dial(&ast) +} + +#[proc_macro_derive(WaterListener)] +pub fn water_listen_macro(input: TokenStream) -> TokenStream { + // Parse the string representation + let ast = parse_macro_input!(input as DeriveInput); + + // Build the impl + impl_water_listen(&ast) +} + +#[proc_macro_derive(WaterWrapper)] +pub fn water_wrap_macro(input: TokenStream) -> TokenStream { + // Parse the string representation + let ast = parse_macro_input!(input as DeriveInput); + + // Build the impl + impl_water_wrap(&ast) +} diff --git a/crates/macros/src/v0/dial.rs b/crates/macros/src/v0/dial.rs new file mode 100644 index 0000000..69e3afb --- /dev/null +++ b/crates/macros/src/v0/dial.rs @@ -0,0 +1,64 @@ +use proc_macro::TokenStream; +use quote::{format_ident, quote}; + +pub(crate) fn impl_water_dial(ast: &syn::DeriveInput) -> TokenStream { + let name = &ast.ident; + let inst_name = &format_ident!("OBJ_{}", name.to_string().to_uppercase()); + + quote! { + // version-independent API + #[export_name = "_version"] + pub fn _version() -> i32 { + ::water_wasm::v0::VERSION + } + + // version-independent API + #[export_name = "_role"] + pub fn _role() -> i32 { + ::water_wasm::common::Role::Dialer as i32 + } + + + lazy_static! { + static ref #inst_name: std::sync::Arc< + std::sync::Mutex< + std::boxed::Box< dyn ::water_wasm::v0::ReadWriteDial + Sized>>> = { + let m = std::sync::Arc::new(std::sync::Mutex::new(<#name>::new())); + m + }; + } + + #[export_name = "_config"] + pub fn _config(fd: i32) -> i32 { + 0 + } + + #[export_name = "_dial"] + pub fn _dial(caller_conn_fd: i32) -> i32 { + println!("Dialing..."); + let mut obj = #inst_name.lock().unwrap(); + obj.dial(caller_conn_fd) + } + + + // V0 API + #[export_name = "_read"] + pub fn _read() -> i32 { + println!("Dialing..."); + let mut obj = #inst_name.lock().unwrap(); + // obj.Read(caller_conn_fd) + } + + // V0 API + #[export_name = "_write"] + pub fn _write() -> i32 { + 0 + } + + // V0 API + #[export_name = "_close"] + pub fn _close() { } + + } + .into() +} diff --git a/crates/macros/src/v0/entry.rs b/crates/macros/src/v0/entry.rs new file mode 100644 index 0000000..88a0034 --- /dev/null +++ b/crates/macros/src/v0/entry.rs @@ -0,0 +1,396 @@ +use proc_macro2::{Span, TokenStream, TokenTree}; +use quote::{quote, quote_spanned, ToTokens}; +use syn::parse::{Parse, ParseStream, Parser}; +use syn::{braced, Attribute, Ident, Path, Signature, Visibility}; + +// syn::AttributeArgs does not implement syn::Parse +type AttributeArgs = syn::punctuated::Punctuated; + +struct FinalConfig { + crate_name: Option, +} + +/// Config used in case of the attribute not being able to build a valid config +const DEFAULT_ERROR_CONFIG: FinalConfig = FinalConfig { crate_name: None }; + +struct Configuration { + is_test: bool, // unused for now as testing with C interface doesn't make a lot of sense yet. + crate_name: Option, +} + +impl Configuration { + fn new(is_test: bool) -> Self { + Configuration { + is_test, + crate_name: None, + } + } + + fn set_crate_name(&mut self, name: syn::Lit, span: Span) -> Result<(), syn::Error> { + if self.crate_name.is_some() { + return Err(syn::Error::new(span, "`crate` set multiple times.")); + } + let name_path = parse_path(name, span, "crate")?; + self.crate_name = Some(name_path); + Ok(()) + } + + // Currently unused + fn macro_name(&self) -> &'static str { + if self.is_test { + "water_wasm::test" + } else { + "water_wasm::main" + } + } + + fn build(&self) -> Result { + Ok(FinalConfig { + crate_name: self.crate_name.clone(), + }) + } +} + +fn parse_path(lit: syn::Lit, span: Span, field: &str) -> Result { + match lit { + syn::Lit::Str(s) => { + let err = syn::Error::new( + span, + format!( + "Failed to parse value of `{}` as path: \"{}\"", + field, + s.value() + ), + ); + s.parse::().map_err(|_| err.clone()) + } + _ => Err(syn::Error::new( + span, + format!("Failed to parse value of `{}` as path.", field), + )), + } +} + +fn build_config( + _input: &ItemFn, + args: AttributeArgs, + is_test: bool, +) -> Result { + let mut config = Configuration::new(is_test); + let _macro_name = config.macro_name(); + + for arg in args { + match arg { + syn::Meta::NameValue(namevalue) => { + let ident = namevalue + .path + .get_ident() + .ok_or_else(|| { + syn::Error::new_spanned(&namevalue, "Must have specified ident") + })? + .to_string() + .to_lowercase(); + let lit = match &namevalue.value { + syn::Expr::Lit(syn::ExprLit { lit, .. }) => lit, + expr => return Err(syn::Error::new_spanned(expr, "Must be a literal")), + }; + match ident.as_str() { + "crate" => { + config.set_crate_name(lit.clone(), syn::spanned::Spanned::span(lit))?; + } + name => { + let msg = format!( + "Unknown attribute {} is specified; expected one of: `crate`", + name, + ); + return Err(syn::Error::new_spanned(namevalue, msg)); + } + } + } + syn::Meta::Path(path) => { + let name = path + .get_ident() + .ok_or_else(|| syn::Error::new_spanned(&path, "Must have specified ident"))? + .to_string() + .to_lowercase(); + let name = name.as_str(); + let msg = format!( + "Unknown attribute {} is specified; expected one of: `crate`", + name + ); + // let msg = match name.as_str() { + // // [Disabled] Options that are present in tokio, but not in water. Left as example. + // + // "threaded_scheduler" | "multi_thread" => { + // format!( + // "Set the runtime flavor with #[{}(flavor = \"multi_thread\")].", + // macro_name + // ) + // } + // "basic_scheduler" | "current_thread" | "single_threaded" => { + // format!( + // "Set the runtime flavor with #[{}(flavor = \"current_thread\")].", + // macro_name + // ) + // } + // "flavor" | "worker_threads" | "start_paused" => { + // format!("The `{}` attribute requires an argument.", name) + // } + // name => { + // format!( + // "Unknown attribute {} is specified; expected one of: `crate`", + // name + // ) + // } + // }; + return Err(syn::Error::new_spanned(path, msg)); + } + other => { + return Err(syn::Error::new_spanned( + other, + "Unknown attribute inside the macro", + )); + } + } + } + + config.build() +} + +fn token_stream_with_error(mut tokens: TokenStream, error: syn::Error) -> TokenStream { + tokens.extend(error.into_compile_error()); + tokens +} + +pub(crate) fn entrypoint(args: TokenStream, item: TokenStream) -> TokenStream { + //Args unused for now, but could be used to add attributes to the macro + + // If any of the steps for this macro fail, we still want to expand to an item that is as close + // to the expected output as possible. This helps out IDEs such that completions and other + // related features keep working. + let input: ItemFn = match syn::parse2(item.clone()) { + Ok(it) => it, + Err(e) => return token_stream_with_error(item, e), + }; + + let config = if input.sig.ident == "entry" && !input.sig.inputs.is_empty() { + let msg = "the main function cannot accept arguments"; + Err(syn::Error::new_spanned(&input.sig.ident, msg)) + } else { + AttributeArgs::parse_terminated + .parse2(args) + .and_then(|args| build_config(&input, args, false)) + }; + + match config { + Ok(config) => expand_entrypoint(input, false, config), + Err(e) => token_stream_with_error(expand_entrypoint(input, false, DEFAULT_ERROR_CONFIG), e), + } +} + +fn expand_entrypoint(input: ItemFn, is_test: bool, config: FinalConfig) -> TokenStream { + // If type mismatch occurs, the current rustc points to the last statement. + let (last_stmt_start_span, last_stmt_end_span) = { + let mut last_stmt = input.stmts.last().cloned().unwrap_or_default().into_iter(); + + // `Span` on stable Rust has a limitation that only points to the first + // token, not the whole tokens. We can work around this limitation by + // using the first/last span of the tokens like + // `syn::Error::new_spanned` does. + let start = last_stmt.next().map_or_else(Span::call_site, |t| t.span()); + let end = last_stmt.last().map_or(start, |t| t.span()); + (start, end) + }; + + let crate_path = config + .crate_name + .map(ToTokens::into_token_stream) + .unwrap_or_else(|| Ident::new("water_wasm::v0", last_stmt_start_span).into_token_stream()); + + let rt = quote_spanned! {last_stmt_start_span=> + #crate_path::runtime::Builder::new_current_thread() + }; + + let header = if is_test { + quote! { + // version-independent API + #[::core::prelude::v1::test] + #[export_name = "_init"] + pub fn _init() {} + } + } else { + quote! { + #[export_name = "_init"] + pub fn _init() {} + } + }; + + let body_ident = quote! { body }; + let last_block = quote_spanned! {last_stmt_end_span=> + #[allow(clippy::expect_used, clippy::diverging_sub_expression)] + { + return #rt + .enable_all() + .build() + .expect("Failed building the Runtime") + .block_on(#body_ident); + } + }; + + let body = input.body(); + + // For test functions pin the body to the stack and use `Pin<&mut dyn + // Future>` to reduce the amount of `Runtime::block_on` (and related + // functions) copies we generate during compilation due to the generic + // parameter `F` (the future to block on). This could have an impact on + // performance, but because it's only for testing it's unlikely to be very + // large. + // + // We don't do this for the main function as it should only be used once so + // there will be no benefit. + let body = if is_test { + let output_type = match &input.sig.output { + // For functions with no return value syn doesn't print anything, + // but that doesn't work as `Output` for our boxed `Future`, so + // default to `()` (the same type as the function output). + syn::ReturnType::Default => quote! { () }, + syn::ReturnType::Type(_, ret_type) => quote! { #ret_type }, + }; + quote! { + let body = async #body; + #crate_path::pin!(body); + let body: ::std::pin::Pin<&mut dyn ::std::future::Future> = body; + } + } else { + quote! { + let body = async #body; + } + }; + + input.into_tokens(header, body, last_block) +} + +struct ItemFn { + outer_attrs: Vec, + vis: Visibility, + sig: Signature, + brace_token: syn::token::Brace, + inner_attrs: Vec, + stmts: Vec, +} + +impl ItemFn { + /// Access all attributes of the function item. + #[allow(dead_code)] + fn attrs(&self) -> impl Iterator { + self.outer_attrs.iter().chain(self.inner_attrs.iter()) + } + + /// Get the body of the function item in a manner so that it can be + /// conveniently used with the `quote!` macro. + fn body(&self) -> Body<'_> { + Body { + brace_token: self.brace_token, + stmts: &self.stmts, + } + } + + /// Convert our local function item into a token stream. + fn into_tokens( + self, + header: proc_macro2::TokenStream, + body: proc_macro2::TokenStream, + last_block: proc_macro2::TokenStream, + ) -> TokenStream { + let mut tokens = proc_macro2::TokenStream::new(); + header.to_tokens(&mut tokens); + + // Outer attributes are simply streamed as-is. + for attr in self.outer_attrs { + attr.to_tokens(&mut tokens); + } + + // Inner attributes require extra care, since they're not supported on + // blocks (which is what we're expanded into) we instead lift them + // outside of the function. This matches the behavior of `syn`. + for mut attr in self.inner_attrs { + attr.style = syn::AttrStyle::Outer; + attr.to_tokens(&mut tokens); + } + + self.vis.to_tokens(&mut tokens); + self.sig.to_tokens(&mut tokens); + + self.brace_token.surround(&mut tokens, |tokens| { + body.to_tokens(tokens); + last_block.to_tokens(tokens); + }); + + tokens + } +} + +impl Parse for ItemFn { + #[inline] + fn parse(input: ParseStream<'_>) -> syn::Result { + // This parse implementation has been largely lifted from `syn`, with + // the exception of: + // * We don't have access to the plumbing necessary to parse inner + // attributes in-place. + // * We do our own statements parsing to avoid recursively parsing + // entire statements and only look for the parts we're interested in. + + let outer_attrs = input.call(Attribute::parse_outer)?; + let vis: Visibility = input.parse()?; + let sig: Signature = input.parse()?; + + let content; + let brace_token = braced!(content in input); + let inner_attrs = Attribute::parse_inner(&content)?; + + let mut buf = proc_macro2::TokenStream::new(); + let mut stmts = Vec::new(); + + while !content.is_empty() { + if let Some(semi) = content.parse::>()? { + semi.to_tokens(&mut buf); + stmts.push(buf); + buf = proc_macro2::TokenStream::new(); + continue; + } + + // Parse a single token tree and extend our current buffer with it. + // This avoids parsing the entire content of the sub-tree. + buf.extend([content.parse::()?]); + } + + if !buf.is_empty() { + stmts.push(buf); + } + + Ok(Self { + outer_attrs, + vis, + sig, + brace_token, + inner_attrs, + stmts, + }) + } +} + +struct Body<'a> { + brace_token: syn::token::Brace, + // Statements, with terminating `;`. + stmts: &'a [TokenStream], +} + +impl ToTokens for Body<'_> { + fn to_tokens(&self, tokens: &mut proc_macro2::TokenStream) { + self.brace_token.surround(tokens, |tokens| { + for stmt in self.stmts { + stmt.to_tokens(tokens); + } + }) + } +} diff --git a/crates/macros/src/v0/listen.rs b/crates/macros/src/v0/listen.rs new file mode 100644 index 0000000..3cc5aa6 --- /dev/null +++ b/crates/macros/src/v0/listen.rs @@ -0,0 +1,7 @@ +use proc_macro::TokenStream; +use quote::quote; + +pub(crate) fn impl_water_listen(ast: &syn::DeriveInput) -> TokenStream { + let _name = &ast.ident; + quote! {}.into() +} diff --git a/crates/macros/src/v0/mod.rs b/crates/macros/src/v0/mod.rs new file mode 100644 index 0000000..8884b6a --- /dev/null +++ b/crates/macros/src/v0/mod.rs @@ -0,0 +1,9 @@ +mod dial; +mod entry; +mod listen; +mod wrap; + +pub(crate) use self::dial::impl_water_dial; +pub(crate) use self::entry::entrypoint; +pub(crate) use self::listen::impl_water_listen; +pub(crate) use self::wrap::impl_water_wrap; diff --git a/crates/macros/src/v0/wrap.rs b/crates/macros/src/v0/wrap.rs new file mode 100644 index 0000000..01702a9 --- /dev/null +++ b/crates/macros/src/v0/wrap.rs @@ -0,0 +1,64 @@ +use proc_macro::TokenStream; +use quote::{format_ident, quote}; + +pub(crate) fn impl_water_wrap(ast: &syn::DeriveInput) -> TokenStream { + let name = &ast.ident; + let inst_name = &format_ident!("OBJ_{}", name.to_string().to_uppercase()); + + quote! { + // version-independent API + #[export_name = "_version"] + pub fn _version() -> i32 { + ::water_wasm::v0::VERSION + } + + // version-independent API + #[export_name = "_role"] + pub fn _role() -> i32 { + ::water_wasm::common::Role::Wrapper as i32 + } + + + lazy_static! { + static ref #inst_name: std::sync::Arc< + std::sync::Mutex< + std::boxed::Box>> = { + let mut m = std::sync::Arc::new(std::sync::Mutex::new(<#name>::new())); + m + }; + } + + #[export_name = "_water_config"] + pub fn _config(fd: i32) -> i32 { + + } + + #[export_name = "_water_dial"] + pub fn _dial(caller_conn_fd: i32) -> i32 { + println!("Dialing..."); + let mut obj = #inst_name.lock().unwrap(); + obj.dial(caller_conn_fd) + } + + + // V0 API + #[export_name = "_water_read"] + pub fn _read() -> i32 { + println!("Dialing..."); + let mut obj = #inst_name.lock().unwrap(); + // obj.Read(caller_conn_fd) + } + + // V0 API + #[export_name = "_write"] + pub fn _write() -> i32 { + 0 + } + + // V0 API + #[export_name = "_close"] + pub fn _close() { } + + } + .into() +} diff --git a/examples/water_bins/echo_client_macro/Cargo.toml b/examples/water_bins/echo_client_macro/Cargo.toml new file mode 100644 index 0000000..3cb2fd3 --- /dev/null +++ b/examples/water_bins/echo_client_macro/Cargo.toml @@ -0,0 +1,31 @@ +[package] +name = "echo-client_macro" +version = "0.1.0" +authors.workspace = true +description.workspace = true +edition.workspace = true +publish = false + +[lib] +name = "echo_client_macro" +path = "src/lib.rs" +crate-type = ["cdylib"] + +[dependencies] +tokio = { version = "1.24.2", default-features = false, features = ["rt", "macros", "io-util", "time", "sync"] } +tokio-util = { version = "0.7.1", features = ["codec"] } + +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0.107" +bincode = "1.3" + +anyhow = "1.0.7" +tracing = "0.1" +tracing-subscriber = "0.3.17" +toml = "0.5.9" +lazy_static = "1.4" +url = { version = "2.2.2", features = ["serde"] } +libc = "0.2.147" + +# water wasm lib import +water-wasm = { path = "../../../crates/wasm/", version = "0.1.0" } diff --git a/examples/water_bins/echo_client_macro/proxy.wasm b/examples/water_bins/echo_client_macro/proxy.wasm new file mode 100644 index 0000000..05afb34 Binary files /dev/null and b/examples/water_bins/echo_client_macro/proxy.wasm differ diff --git a/examples/water_bins/echo_client_macro/src/async_socks5_listener.rs b/examples/water_bins/echo_client_macro/src/async_socks5_listener.rs new file mode 100644 index 0000000..ddac8f7 --- /dev/null +++ b/examples/water_bins/echo_client_macro/src/async_socks5_listener.rs @@ -0,0 +1,302 @@ +use anyhow::Result; +use bincode::{self}; +use tokio::{ + io::{AsyncReadExt, AsyncWriteExt}, + net::{TcpListener, TcpStream}, +}; +use tracing::info; + +use std::net::{SocketAddr, ToSocketAddrs}; +use std::{os::fd::FromRawFd, vec}; + +use crate::{StreamConfigV1, DIALER}; +use water_wasm::{ConnStream, Dialer}; + +// ----------------------- Listener methods ----------------------- +#[export_name = "_water_listen_v1"] +fn listen() { + wrapper().unwrap(); +} + +fn _listener_creation() -> Result { + let global_conn = match DIALER.lock() { + Ok(conf) => conf, + Err(e) => { + eprintln!("[WASM] > ERROR: {}", e); + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + "failed to lock config", + )); + } + }; + + // FIXME: hardcoded the filename for now, make it a config later + let stream = StreamConfigV1::init( + global_conn.config.local_address.clone(), + global_conn.config.local_port, + "LISTEN".to_string(), + ); + + let encoded: Vec = bincode::serialize(&stream).expect("Failed to serialize"); + + let address = encoded.as_ptr() as u32; + let size = encoded.len() as u32; + + let fd = unsafe { water_wasm::net::c::create_listen(address, size) }; + + if fd < 0 { + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + "failed to create listener", + )); + } + + info!( + "[WASM] ready to start listening at {}:{}", + global_conn.config.local_address, global_conn.config.local_port + ); + + Ok(fd) +} + +#[tokio::main(flavor = "current_thread")] +async fn wrapper() -> std::io::Result<()> { + let fd = _listener_creation().unwrap(); + + // Set up pre-established listening socket. + let standard = unsafe { std::net::TcpListener::from_raw_fd(fd) }; + // standard.set_nonblocking(true).unwrap(); + let listener = TcpListener::from_std(standard)?; + + info!("[WASM] Starting to listen..."); + + loop { + // Accept new sockets in a loop. + let socket = match listener.accept().await { + Ok(s) => s.0, + Err(e) => { + eprintln!("[WASM] > ERROR: {}", e); + continue; + } + }; + + // Spawn a background task for each new connection. + tokio::spawn(async move { + eprintln!("[WASM] > CONNECTED"); + match handle_incoming(socket).await { + Ok(()) => eprintln!("[WASM] > DISCONNECTED"), + Err(e) => eprintln!("[WASM] > ERROR: {}", e), + } + }); + } +} + +// SS handle incoming connections +async fn handle_incoming(mut stream: TcpStream) -> std::io::Result<()> { + let mut buffer = [0; 512]; + + // Read the SOCKS5 greeting + let nbytes = stream + .read(&mut buffer) + .await + .expect("Failed to read from stream"); + + println!("Received {} bytes: {:?}", nbytes, buffer[..nbytes].to_vec()); + + if nbytes < 2 || buffer[0] != 0x05 { + eprintln!("Not a SOCKS5 request"); + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "Not a SOCKS5 request", + )); + } + + let nmethods = buffer[1] as usize; + if nbytes < 2 + nmethods { + eprintln!("Incomplete SOCKS5 greeting"); + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "Incomplete SOCKS5 greeting", + )); + } + + // For simplicity, always use "NO AUTHENTICATION REQUIRED" + stream + .write_all(&[0x05, 0x00]) + .await + .expect("Failed to write to stream"); + + // Read the actual request + let nbytes = stream + .read(&mut buffer) + .await + .expect("Failed to read from stream"); + + println!("Received {} bytes: {:?}", nbytes, buffer[..nbytes].to_vec()); + + if nbytes < 7 || buffer[0] != 0x05 || buffer[1] != 0x01 { + println!("Invalid SOCKS5 request"); + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "Invalid SOCKS5 request", + )); + } + + // Extract address and port + let addr: SocketAddr = match buffer[3] { + 0x01 => { + // IPv4 + if nbytes < 10 { + eprintln!("Incomplete request for IPv4 address"); + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "Incomplete request for IPv4 address", + )); + } + let addr = std::net::Ipv4Addr::new(buffer[4], buffer[5], buffer[6], buffer[7]); + let port = u16::from_be_bytes([buffer[8], buffer[9]]); + SocketAddr::from((addr, port)) + } + 0x03 => { + // Domain name + let domain_length = buffer[4] as usize; + if nbytes < domain_length + 5 { + eprintln!("Incomplete request for domain name"); + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "Incomplete request for domain name", + )); + } + let domain = + std::str::from_utf8(&buffer[5..5 + domain_length]).expect("Invalid domain string"); + + println!("Domain: {}", domain); + + let port = + u16::from_be_bytes([buffer[5 + domain_length], buffer[5 + domain_length + 1]]); + + println!("Port: {}", port); + + let domain_with_port = format!("{}:443", domain); // Assuming HTTPS + + // domain.to_socket_addrs().unwrap().next().unwrap() + match domain_with_port.to_socket_addrs() { + Ok(mut addrs) => match addrs.next() { + Some(addr) => addr, + None => { + eprintln!("Domain resolved, but no addresses found for {}", domain); + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + format!("Domain resolved, but no addresses found for {}", domain), + )); + } + }, + Err(e) => { + eprintln!("Failed to resolve domain {}: {}", domain, e); + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + format!("Failed to resolve domain {}: {}", domain, e), + )); + } + } + } + _ => { + eprintln!("Address type not supported"); + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "Address type not supported", + )); + } + }; + + // NOTE: create a new Dialer to dial any target address as it wants to + // Add more features later -- connect to target thru rules (direct / server) + + // Connect to target address + let mut tcp_dialer = Dialer::new(); + tcp_dialer.config.remote_address = addr.ip().to_string(); + tcp_dialer.config.remote_port = addr.port(); + + tcp_dialer.dial().expect("Failed to dial"); + + let target_stream = match tcp_dialer.file_conn.outbound_conn.file.unwrap() { + ConnStream::TcpStream(s) => s, + _ => { + eprintln!("Failed to get outbound tcp stream"); + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "Failed to get outbound tcp stream", + )); + } + }; + + target_stream + .set_nonblocking(true) + .expect("Failed to set non-blocking"); + + let target_stream = + TcpStream::from_std(target_stream).expect("Failed to convert to tokio stream"); + + // Construct the response based on the target address + let response = match addr { + SocketAddr::V4(a) => { + let mut r = vec![0x05, 0x00, 0x00, 0x01]; + r.extend_from_slice(&a.ip().octets()); + r.extend_from_slice(&a.port().to_be_bytes()); + r + } + SocketAddr::V6(a) => { + let mut r = vec![0x05, 0x00, 0x00, 0x04]; + r.extend_from_slice(&a.ip().octets()); + r.extend_from_slice(&a.port().to_be_bytes()); + r + } + }; + + stream + .write_all(&response) + .await + .expect("Failed to write to stream"); + + let (mut client_read, mut client_write) = tokio::io::split(stream); + let (mut target_read, mut target_write) = tokio::io::split(target_stream); + + let client_to_target = async move { + let mut buffer = vec![0; 4096]; + loop { + match client_read.read(&mut buffer).await { + Ok(0) => { + break; + } + Ok(n) => { + if (target_write.write_all(&buffer[0..n]).await).is_err() { + break; + } + } + Err(_) => break, + } + } + }; + + let target_to_client = async move { + let mut buffer = vec![0; 4096]; + loop { + match target_read.read(&mut buffer).await { + Ok(0) => { + break; + } + Ok(n) => { + if (client_write.write_all(&buffer[0..n]).await).is_err() { + break; + } + } + Err(_) => break, + } + } + }; + + // Run both handlers concurrently + tokio::join!(client_to_target, target_to_client); + + Ok(()) +} diff --git a/examples/water_bins/echo_client_macro/src/lib.rs b/examples/water_bins/echo_client_macro/src/lib.rs new file mode 100644 index 0000000..214be88 --- /dev/null +++ b/examples/water_bins/echo_client_macro/src/lib.rs @@ -0,0 +1,160 @@ +// =================== Imports & Modules ===================== +use std::{io::Read, os::fd::FromRawFd, sync::Mutex}; + +use lazy_static::lazy_static; +use tracing::{info, Level}; + +use water_wasm::*; + +pub mod async_socks5_listener; + +// create a mutable global variable stores a pointer to the config +lazy_static! { + static ref DIALER: Mutex = Mutex::new(Dialer::new()); + // static ref CONN: Mutex = Mutex::new(Connection::new()); +} + +#[cfg(target_family = "wasm")] +#[export_name = "_water_init"] +pub fn _init(debug: bool) { + if debug { + tracing_subscriber::fmt().with_max_level(Level::INFO).init(); + } + + info!("[WASM] running in _init"); +} + +#[cfg(not(target_family = "wasm"))] +pub fn _init(debug: bool) { + if debug { + tracing_subscriber::fmt().with_max_level(Level::INFO).init(); + } + + info!("[WASM] running in _init"); +} + +#[export_name = "_water_set_inbound"] +pub fn _water_bridging(fd: i32) { + let mut global_dialer = match DIALER.lock() { + Ok(dialer) => dialer, + Err(e) => { + eprintln!("[WASM] > ERROR: {}", e); + return; + } + }; + + global_dialer.file_conn.set_inbound( + fd, + ConnStream::File(unsafe { std::fs::File::from_raw_fd(fd) }), + ); +} + +#[export_name = "_water_set_outbound"] +pub fn _water_bridging_out(fd: i32) { + let mut global_dialer = match DIALER.lock() { + Ok(dialer) => dialer, + Err(e) => { + eprintln!("[WASM] > ERROR: {}", e); + return; + } + }; + + global_dialer.file_conn.set_outbound( + fd, + ConnStream::TcpStream(unsafe { std::net::TcpStream::from_raw_fd(fd) }), + ); +} + +#[export_name = "_water_config"] +pub fn _process_config(fd: i32) { + info!("[WASM] running in _process_config"); + + let mut config_file = unsafe { std::fs::File::from_raw_fd(fd) }; + let mut config = String::new(); + match config_file.read_to_string(&mut config) { + Ok(_) => { + let config: Config = match serde_json::from_str(&config) { + Ok(config) => config, + Err(e) => { + eprintln!("[WASM] > _process_config ERROR: {}", e); + return; + } + }; + + let mut global_dialer = match DIALER.lock() { + Ok(dialer) => dialer, + Err(e) => { + eprintln!("[WASM] > ERROR: {}", e); + return; + } + }; + + // global_dialer.file_conn.config = config.clone(); + global_dialer.config = config; + } + Err(e) => { + eprintln!( + "[WASM] > WASM _process_config failed reading path ERROR: {}", + e + ); + } + }; +} + +#[export_name = "_water_write"] +pub fn _write(bytes_write: i64) -> i64 { + let mut global_dialer = match DIALER.lock() { + Ok(dialer) => dialer, + Err(e) => { + eprintln!("[WASM] > ERROR: {}", e); + return -1; + } + }; + + match global_dialer + .file_conn + ._write_2_outbound(&mut DefaultEncoder, bytes_write) + { + Ok(n) => n, + Err(e) => { + eprintln!("[WASM] > ERROR in _write: {}", e); + -1 + } + } +} + +#[export_name = "_water_read"] +pub fn _read() -> i64 { + match DIALER.lock() { + Ok(mut global_dialer) => { + match global_dialer + .file_conn + ._read_from_outbound(&mut DefaultDecoder) + { + Ok(n) => n, + Err(e) => { + eprintln!("[WASM] > ERROR in _read: {}", e); + -1 + } + } + } + Err(e) => { + eprintln!("[WASM] > ERROR: {}", e); + -1 + } + } +} + +#[export_name = "_water_dial"] +pub fn _dial() { + match DIALER.lock() { + Ok(mut global_dialer) => { + if let Err(e) = global_dialer.dial() { + eprintln!("[WASM] > ERROR in _dial: {}", e); + } + } + Err(e) => { + eprintln!("[WASM] > ERROR: {}", e); + } + } +}