Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v0 (plus) API and spec #6

Merged
merged 29 commits into from
Jan 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
0063bb6
update: change namespaces to prefix
Oct 23, 2023
582d24d
update: v0+ API matching + plain runnable with Dialer
Oct 30, 2023
0302fa0
update: lint
Oct 30, 2023
d31964c
fix: fmt
Oct 30, 2023
400eb32
fix: clippy
Oct 30, 2023
177cbc0
fix: resolve conflicts from the prev PR
Nov 1, 2023
f2f1d72
update: keep the library code safe
Nov 1, 2023
fdec4f1
update: cleaning code + more sensible comments
erikziyunchi Nov 5, 2023
330f987
fix: clippy
erikziyunchi Nov 5, 2023
1e5f376
update: remove dup .wasm + link go-side runner for plain.wasm & PR fo…
erikziyunchi Nov 5, 2023
493061d
update: adding listener(base), tests, refactoring
erikziyunchi Nov 9, 2023
8447289
fix: fmt
erikziyunchi Nov 9, 2023
044eff6
fix: clippy
erikziyunchi Nov 9, 2023
cfddea1
fix: conflict
erikziyunchi Nov 9, 2023
20b832d
update: add relay, test
erikziyunchi Nov 10, 2023
8bbc3c7
fix: sleep 1 sec before terminating relay to allow the test finish + …
erikziyunchi Nov 10, 2023
d8bfa5f
fix: clippy & give a bit more time before terminate for relay test
erikziyunchi Nov 10, 2023
c2010a5
fix: isolate relay test with another test file
erikziyunchi Nov 10, 2023
3f569a3
fix: fmt
erikziyunchi Nov 10, 2023
90b5d93
fix: change port in relay test
erikziyunchi Nov 10, 2023
f6e7b46
update: add multi listener for v0
erikziyunchi Dec 18, 2023
cd43711
update: lint
erikziyunchi Dec 18, 2023
9e38b7a
fix: resolve conflict
erikziyunchi Dec 19, 2023
aa3bed6
fix: lint
erikziyunchi Dec 19, 2023
b169832
fix: new ss wasm binary + change port in test
erikziyunchi Dec 19, 2023
664e389
fix: fmt
erikziyunchi Dec 19, 2023
d06a6bc
update: add source code for plain & reverse; common code crate for WA…
erikziyunchi Jan 6, 2024
ccde8cd
update: do dns query at test, added todo for a exported host function…
erikziyunchi Jan 6, 2024
0e2fafa
fix: lint
erikziyunchi Jan 6, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,9 @@ jobs:

# Build for wasm32-wasi target
- name: Build wasm32-wasi Target
run: |
for member in crates/wasm/ examples/water_bins/ss_client_wasm_v1/ examples/water_bins/echo_client/; do
cargo build --verbose --manifest-path $member/Cargo.toml --target wasm32-wasi
done
run: bash ./scripts/build_wasm_targets.sh
env:
RUSTFLAGS: --cfg tokio_unstable

- name: Test
run: cargo test --verbose --workspace --all-features
run: cargo test --verbose --workspace --all-features
5 changes: 5 additions & 0 deletions crates/wasm_v0/.cargo/config
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
[build]
target = "wasm32-wasi"

[target.wasm32-wasi]
rustflags = [ "--cfg", "tokio_unstable"]
28 changes: 28 additions & 0 deletions crates/wasm_v0/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
[package]
name = "water-wasm-v0"
version = "0.1.0"
authors.workspace = true
description.workspace = true
edition.workspace = true

[lib]
name = "water_wasm_v0"
path = "src/lib.rs"
crate-type = ["cdylib", "lib"]

[dependencies]
tokio = { version = "1.33.0", default-features = false, features = ["fs", "net", "rt", "macros", "io-util", "io-std", "time", "sync"] }
tokio-util = { version = "0.7.1", features = ["codec"] }

serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.107"
bincode = "1.3"

anyhow = "1.0.7"
tracing = "0.1"
tracing-subscriber = "0.3.17"
toml = "0.5.9"
lazy_static = "1.4"
url = { version = "2.2.2", features = ["serde"] }
libc = "0.2.147"

96 changes: 96 additions & 0 deletions crates/wasm_v0/src/common.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
use std::os::fd::FromRawFd;
use tokio::net::TcpStream;

// WASI Imports
extern "C" {
pub fn host_accept() -> i32; // obtain a connection (specified by returned fd) accepted by the host
pub fn host_dial() -> i32; // obtain a connection (specified by returned fd) dialed by the host
pub fn host_defer(); // call when exiting
#[allow(dead_code)]
pub fn pull_config() -> i32; // obtain a configuration file (specified by returned fd) from the host
}

// enumerated constants for Role (i32)
// 0: unknown
// 1: dialer
// 2: listener
// 3: relay
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum Role {
Unknown = 0,
Dialer = 1,
Listener = 2,
Relay = 3,
}

pub struct AsyncFdConn {
fd: i32,
temp_stream: Option<std::net::TcpStream>, // used to hold the std tcp stream, will be upgraded to tokio stream later
stream: Option<TcpStream>,
}

impl Default for AsyncFdConn {
fn default() -> Self {
Self::new()
}
}

impl AsyncFdConn {
pub fn new() -> Self {
AsyncFdConn {
fd: -1,
temp_stream: None,
stream: None,
}
}

pub fn wrap(&mut self, fd: i32) -> Result<(), String> {
if self.fd > 0 {
return Err("already wrapped".to_string());
}
if fd < 0 {
return Err("invalid fd".to_string());
}
self.fd = fd;
println!("wrap: fd = {}", fd);
let stdstream = unsafe { std::net::TcpStream::from_raw_fd(fd) };

self.temp_stream = Some(stdstream);
// println!("wrap: stdstream = {:?}", stdstream);
// stdstream
// .set_nonblocking(true)
// .expect("Failed to set non-blocking");

// println!("wrap: stream = {:?}", stdstream);
// self.stream =
// Some(TcpStream::from_std(stdstream).expect("Failed to convert to tokio stream"));
// Ok(())
Ok(())
}

pub fn tokio_upgrade(&mut self) -> Result<(), String> {
if self.fd < 0 {
return Err("invalid fd".to_string());
}
let stdstream = self.temp_stream.take().unwrap();
stdstream
.set_nonblocking(true)
.expect("Failed to set non-blocking");
self.stream =
Some(TcpStream::from_std(stdstream).expect("Failed to convert to tokio stream"));
Ok(())
}

pub fn close(&mut self) {
if self.fd < 0 {
return;
}
let stream = self.stream.take().unwrap();
drop(stream);
self.fd = -1;
}

pub fn stream(&mut self) -> Option<&mut TcpStream> {
self.stream.as_mut()
}
}
20 changes: 20 additions & 0 deletions crates/wasm_v0/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Error is a enum in i32
#[allow(dead_code)]
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
pub enum Error {
None = 0,
Unknown = -1, // general error
InvalidArgument = -2, // invalid argument supplied to func call
InvalidConfig = -3, // config file provided is invalid
InvalidFd = -4, // invalid file descriptor provided
InvalidFunction = -5, // invalid function called
DoubleInit = -6, // initializing twice
FailedIO = -7, // Failing an I/O operation
NotInitialized = -8, // not initialized
}

impl Error {
pub fn i32(&self) -> i32 {
*self as i32
}
}
3 changes: 3 additions & 0 deletions crates/wasm_v0/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub mod common;
pub mod error;
pub mod v0plus;
220 changes: 220 additions & 0 deletions crates/wasm_v0/src/v0plus.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
use crate::{common::*, error};

pub const VERSION: i32 = 0x00000000; // v0plus share the same version number with v0

pub struct Dialer {
caller_conn: AsyncFdConn,
remote_conn: AsyncFdConn,
}

pub struct Listener {
caller_conn: AsyncFdConn,
source_conn: AsyncFdConn,
}
pub struct Relay {
source_conn: AsyncFdConn,
remote_conn: AsyncFdConn,
}

impl Default for Dialer {
fn default() -> Self {
Self::new()
}
}

impl Dialer {
pub fn new() -> Self {
Dialer {
caller_conn: AsyncFdConn::new(),
remote_conn: AsyncFdConn::new(),
}
}

pub fn dial(&mut self, caller_conn_fd: i32) -> Result<i32, String> {
// check if caller_conn_fd is valid
if caller_conn_fd < 0 {
return Err("invalid caller_conn_fd".to_string());
}
match self.caller_conn.wrap(caller_conn_fd) {
Ok(_) => {}
Err(e) => {
return Err(e);
}
}

// call external dial() to get remote_conn_fd
let remote_conn_fd = unsafe { host_dial() };
if remote_conn_fd < 0 {
return Err("dial failed".to_string());
}
match self.remote_conn.wrap(remote_conn_fd) {
Ok(_) => {}
Err(e) => {
return Err(e);
}
}

// return remote_conn_fd
Ok(remote_conn_fd)
}

// // borrow self.caller_conn
// pub fn caller(&mut self) -> Option<&mut TcpStream> {
// self.caller_conn.stream()
// }

// // borrow self.remote_conn
// pub fn remote(&mut self) -> Option<&mut TcpStream> {
// self.remote_conn.stream()
// }

pub fn close(&mut self) {
self.caller_conn.close();
self.remote_conn.close();
unsafe { host_defer() };
}
}

impl Default for Listener {
fn default() -> Self {
Self::new()
}
}

impl Listener {
pub fn new() -> Self {
Listener {
caller_conn: AsyncFdConn::new(),
source_conn: AsyncFdConn::new(),
}
}

pub fn accept(&mut self, caller_conn_fd: i32) -> Result<i32, String> {
// check if caller_conn_fd is valid
if caller_conn_fd < 0 {
return Err("Listener: invalid caller_conn_fd".to_string());
}

match self.caller_conn.wrap(caller_conn_fd) {
Ok(_) => {}
Err(e) => {
return Err(e);
}
}

// call external accept() to get source_conn_fd
let source_conn_fd = unsafe { host_accept() };
if source_conn_fd < 0 {
return Err("Listener: accept failed".to_string());
}

match self.source_conn.wrap(source_conn_fd) {
Ok(_) => {}
Err(e) => {
return Err(e);
}
}

// return source_conn_fd
Ok(source_conn_fd)
}

// // borrow self.caller_conn
// pub fn caller(&mut self) -> Option<&mut TcpStream> {
// self.caller_conn.stream()
// }

// // borrow self.source_conn
// pub fn source(&mut self) -> Option<&mut TcpStream> {
// self.source_conn.stream()
// }

pub fn close(&mut self) {
self.caller_conn.close();
self.source_conn.close();
unsafe { host_defer() };
}
}

impl Default for Relay {
fn default() -> Self {
Self::new()
}
}

impl Relay {
pub fn new() -> Self {
Relay {
source_conn: AsyncFdConn::new(),
remote_conn: AsyncFdConn::new(),
}
}

pub fn associate(&mut self) -> Result<i32, String> {
// call external accept() to get source_conn_fd
let source_conn_fd = unsafe { host_accept() };
if source_conn_fd < 0 {
return Err("Relay: accept failed".to_string());
}

match self.source_conn.wrap(source_conn_fd) {
Ok(_) => {}
Err(e) => {
return Err(e);
}
}

// call external dial() to get remote_conn_fd
let remote_conn_fd = unsafe { host_dial() };
if remote_conn_fd < 0 {
return Err("Relay: dial failed".to_string());
}
match self.remote_conn.wrap(remote_conn_fd) {
Ok(_) => {}
Err(e) => {
return Err(e);
}
}

// return remote_conn_fd
Ok(error::Error::None.i32())
}

// // borrow self.source_conn
// pub fn source(&mut self) -> Option<&mut TcpStream> {
// self.source_conn.stream()
// }

// // borrow self.remote_conn
// pub fn remote(&mut self) -> Option<&mut TcpStream> {
// self.remote_conn.stream()
// }

pub fn close(&mut self) {
self.source_conn.close();
self.remote_conn.close();
unsafe { host_defer() };
}
}

pub trait ConnPair {
fn conn_pair(&mut self) -> Option<(&mut AsyncFdConn, &mut AsyncFdConn)>;
}

impl ConnPair for Dialer {
fn conn_pair(&mut self) -> Option<(&mut AsyncFdConn, &mut AsyncFdConn)> {
Some((&mut self.caller_conn, &mut self.remote_conn))
}
}

impl ConnPair for Listener {
fn conn_pair(&mut self) -> Option<(&mut AsyncFdConn, &mut AsyncFdConn)> {
Some((&mut self.caller_conn, &mut self.source_conn))
}
}

impl ConnPair for Relay {
fn conn_pair(&mut self) -> Option<(&mut AsyncFdConn, &mut AsyncFdConn)> {
Some((&mut self.source_conn, &mut self.remote_conn))
}
}
Loading
Loading