Skip to content

Commit

Permalink
feat: implement write op on windows
Browse files Browse the repository at this point in the history
  • Loading branch information
loongs-zhang committed Feb 7, 2024
1 parent 46c921f commit 0d41c53
Show file tree
Hide file tree
Showing 14 changed files with 243 additions and 83 deletions.
42 changes: 24 additions & 18 deletions .github/workflows/ci.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ if [ "${CROSS}" = "1" ]; then
CARGO=cross
fi

CARGO_TEST_FLAGS=
if [ "${NO_RUN}" = "1" ]; then
CARGO_TEST_FLAGS=--no-run
fi

# If a test crashes, we want to know which one it was.
export RUST_TEST_THREADS=1
export RUST_BACKTRACE=1
Expand All @@ -19,38 +24,39 @@ export RUST_BACKTRACE=1
cd "${PROJECT_DIR}"/monoio

# only enable legacy driver
"${CARGO}" test --target "${TARGET}" --no-default-features --features "async-cancel,bytes,legacy,macros,utils"
"${CARGO}" test --target "${TARGET}" --no-default-features --features "async-cancel,bytes,legacy,macros,utils" --release
"${CARGO}" test $CARGO_TEST_FLAGS --target "${TARGET}" --no-default-features --features "async-cancel,bytes,legacy,macros,utils"
"${CARGO}" test $CARGO_TEST_FLAGS --target "${TARGET}" --no-default-features --features "async-cancel,bytes,legacy,macros,utils" --release

if [ "${TARGET}" = "x86_64-unknown-linux-gnu" ] || [ "${TARGET}" = "i686-unknown-linux-gnu" ]; then

# only enabled uring driver
"${CARGO}" test --target "${TARGET}" --no-default-features --features "async-cancel,bytes,iouring,macros,utils"
"${CARGO}" test --target "${TARGET}" --no-default-features --features "async-cancel,bytes,iouring,macros,utils" --release
"${CARGO}" test $CARGO_TEST_FLAGS --target "${TARGET}" --no-default-features --features "async-cancel,bytes,iouring,macros,utils"
"${CARGO}" test $CARGO_TEST_FLAGS --target "${TARGET}" --no-default-features --features "async-cancel,bytes,iouring,macros,utils" --release
fi

if [ "${TARGET}" != "aarch64-unknown-linux-gnu" ] && [ "${TARGET}" != "armv7-unknown-linux-gnueabihf" ] &&
[ "${TARGET}" != "riscv64gc-unknown-linux-gnu" ] && [ "${TARGET}" != "s390x-unknown-linux-gnu" ]; then
# enable uring+legacy driver
"${CARGO}" test --target "${TARGET}"
"${CARGO}" test --target "${TARGET}" --release

if [ "${CHANNEL}" == "nightly" ]; then
"${CARGO}" test --target "${TARGET}" --all-features
"${CARGO}" test --target "${TARGET}" --all-features --release
fi
"${CARGO}" test $CARGO_TEST_FLAGS --target "${TARGET}"
"${CARGO}" test $CARGO_TEST_FLAGS --target "${TARGET}" --release
fi

if [ "${CHANNEL}" == "nightly" ] && ( [ "${TARGET}" = "x86_64-unknown-linux-gnu" ] || [ "${TARGET}" = "i686-unknown-linux-gnu" ] ); then
"${CARGO}" test $CARGO_TEST_FLAGS --target "${TARGET}" --all-features
"${CARGO}" test $CARGO_TEST_FLAGS --target "${TARGET}" --all-features --release
fi

# test monoio-compat mod
cd "${PROJECT_DIR}"/monoio-compat

"${CARGO}" test --target "${TARGET}"
"${CARGO}" test --target "${TARGET}" --release
"${CARGO}" test $CARGO_TEST_FLAGS --target "${TARGET}"
"${CARGO}" test $CARGO_TEST_FLAGS --target "${TARGET}" --release

"${CARGO}" test --target "${TARGET}" --no-default-features --features hyper
"${CARGO}" test --target "${TARGET}" --no-default-features --features hyper --release
"${CARGO}" test $CARGO_TEST_FLAGS --target "${TARGET}" --no-default-features --features hyper
"${CARGO}" test $CARGO_TEST_FLAGS --target "${TARGET}" --no-default-features --features hyper --release

if [ "${CHANNEL}" == "nightly" ]; then
"${CARGO}" test --target "${TARGET}" --all-features
"${CARGO}" test --target "${TARGET}" --all-features --release
"${CARGO}" test $CARGO_TEST_FLAGS --target "${TARGET}" --all-features
"${CARGO}" test $CARGO_TEST_FLAGS --target "${TARGET}" --all-features --release
fi

# todo maybe we should test examples here ?
5 changes: 5 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ jobs:
TARGET: ${{ matrix.target }}
OS: ${{ matrix.os }}
PROJECT_DIR: ${{ github.workspace }}
NO_RUN: ${{ matrix.no_run }}
run: sh .github/workflows/ci.sh

strategy:
Expand Down Expand Up @@ -102,9 +103,13 @@ jobs:
# unsupported yet
# - target: x86_64-pc-windows-msvc
# os: windows-latest
# no_run: 1
# - target: x86_64-pc-windows-gnu
# os: windows-latest
# no_run: 1
# - target: i686-pc-windows-msvc
# os: windows-latest
# no_run: 1
# - target: i686-pc-windows-gnu
# os: windows-latest
# no_run: 1
13 changes: 6 additions & 7 deletions monoio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,22 @@ memchr = "2.7"
bytes = { version = "1", optional = true }
flume = { version = "0.11", optional = true }
mio = { version = "0.8", features = [
"net",
"os-poll",
"os-ext",
"net",
"os-poll",
"os-ext",
], optional = true }
threadpool = { version = "1", optional = true }
tokio = { version = "1", default-features = false, optional = true }
tracing = { version = "0.1", default-features = false, features = [
"std",
"std",
], optional = true }
ctrlc = { version = "3", optional = true }
lazy_static = { version = "1", optional = true }
once_cell = { version = "1.19.0", optional = true }

# windows dependencies(will be added when windows support finished)
[target.'cfg(windows)'.dependencies.windows-sys]
features = ["Win32_Foundation", "Win32_Networking_WinSock"]
version = "0.48.0"
[target.'cfg(windows)'.dependencies]
windows-sys = { version = "0.48.0", features = ["Win32_Foundation", "Win32_Networking_WinSock", "Win32_System_IO"] }

# unix dependencies
[target.'cfg(unix)'.dependencies]
Expand Down
4 changes: 2 additions & 2 deletions monoio/src/buf/raw_buf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,13 +175,13 @@ unsafe impl IoVecBufMut for RawBufVectored {

#[cfg(windows)]
#[inline]
fn write_wsabuf_ptr(&self) -> *mut WSABUF {
fn write_wsabuf_ptr(&mut self) -> *mut WSABUF {
self.ptr as *mut WSABUF
}

#[cfg(windows)]
#[inline]
fn write_wsabuf_len(&self) -> usize {
fn write_wsabuf_len(&mut self) -> usize {
self.len
}

Expand Down
10 changes: 6 additions & 4 deletions monoio/src/driver/op/accept.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#[cfg(all(unix, any(feature = "legacy", feature = "poll-io")))]
use std::os::unix::prelude::AsRawFd;
use std::{
io,
mem::{size_of, MaybeUninit},
Expand All @@ -13,12 +15,12 @@ use {
accept, socklen_t, INVALID_SOCKET, SOCKADDR_STORAGE,
},
};
#[cfg(any(feature = "legacy", feature = "poll-io"))]
use {crate::syscall_u32, std::os::unix::prelude::AsRawFd};

use super::{super::shared_fd::SharedFd, Op, OpAble};
#[cfg(any(feature = "legacy", feature = "poll-io"))]
use crate::driver::ready::Direction;
#[cfg(any(feature = "legacy", feature = "poll-io"))]
use crate::syscall_u32;

/// Accept
pub(crate) struct Accept {
Expand Down Expand Up @@ -62,7 +64,7 @@ impl OpAble for Accept {
.build()
}

#[cfg(all(any(feature = "legacy", feature = "poll-io"), not(windows)))]
#[cfg(any(feature = "legacy", feature = "poll-io"))]
#[inline]
fn legacy_interest(&self) -> Option<(Direction, usize)> {
self.fd.registered_index().map(|idx| (Direction::Read, idx))
Expand All @@ -77,7 +79,7 @@ impl OpAble for Accept {
syscall!(accept(fd, addr, len), PartialEq::eq, INVALID_SOCKET)
}

#[cfg(any(feature = "legacy", feature = "poll-io"))]
#[cfg(all(any(feature = "legacy", feature = "poll-io"), unix))]
fn legacy_call(&mut self) -> io::Result<u32> {
let fd = self.fd.as_raw_fd();
let addr = self.addr.0.as_mut_ptr() as *mut _;
Expand Down
2 changes: 1 addition & 1 deletion monoio/src/driver/op/fsync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl OpAble for Fsync {
)
}

#[cfg(any(feature = "legacy", feature = "poll-io"))]
#[cfg(all(any(feature = "legacy", feature = "poll-io"), unix))]
fn legacy_call(&mut self) -> io::Result<u32> {
#[cfg(target_os = "linux")]
if self.data_sync {
Expand Down
11 changes: 6 additions & 5 deletions monoio/src/driver/op/read.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
use std::io;
#[cfg(all(unix, any(feature = "legacy", feature = "poll-io")))]
use std::os::unix::prelude::AsRawFd;
#[cfg(all(windows, any(feature = "legacy", feature = "poll-io")))]
use std::os::windows::io::AsRawSocket;

#[cfg(all(target_os = "linux", feature = "iouring"))]
use io_uring::{opcode, types};
#[cfg(any(feature = "legacy", feature = "poll-io"))]
use {
crate::{driver::ready::Direction, syscall_u32},
std::os::unix::prelude::AsRawFd,
};

use super::{super::shared_fd::SharedFd, Op, OpAble};
use crate::{
buf::{IoBufMut, IoVecBufMut},
BufResult,
};
#[cfg(any(feature = "legacy", feature = "poll-io"))]
use crate::{driver::ready::Direction, syscall_u32};

pub(crate) struct Read<T> {
/// Holds a strong ref to the FD, preventing the file from being closed
Expand Down
20 changes: 13 additions & 7 deletions monoio/src/driver/op/recv.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#[cfg(all(unix, any(feature = "legacy", feature = "poll-io")))]
use std::os::unix::prelude::AsRawFd;
use std::{
io,
mem::{transmute, MaybeUninit},
Expand All @@ -6,14 +8,18 @@ use std::{

#[cfg(all(target_os = "linux", feature = "iouring"))]
use io_uring::{opcode, types};
#[cfg(any(feature = "legacy", feature = "poll-io"))]
#[cfg(unix)]
use libc::{socklen_t, AF_INET, AF_INET6};
#[cfg(all(windows, any(feature = "legacy", feature = "poll-io")))]
use {
crate::{driver::ready::Direction, syscall_u32},
std::os::unix::prelude::AsRawFd,
std::os::windows::io::AsRawSocket,
windows_sys::Win32::Networking::WinSock::{socklen_t, AF_INET, AF_INET6},
};

use super::{super::shared_fd::SharedFd, Op, OpAble};
use crate::{buf::IoBufMut, net::unix::SocketAddr as UnixSocketAddr, BufResult};
#[cfg(any(feature = "legacy", feature = "poll-io"))]
use crate::{driver::ready::Direction, syscall_u32};

pub(crate) struct Recv<T> {
/// Holds a strong ref to the FD, preventing the file from being closed
Expand Down Expand Up @@ -112,7 +118,7 @@ impl<T: IoBufMut> Op<RecvMsg<T>> {
info.2.msg_iov = info.1.as_mut_ptr();
info.2.msg_iovlen = 1;
info.2.msg_name = &mut info.0 as *mut _ as *mut libc::c_void;
info.2.msg_namelen = std::mem::size_of::<libc::sockaddr_storage>() as libc::socklen_t;
info.2.msg_namelen = std::mem::size_of::<libc::sockaddr_storage>() as socklen_t;

Op::submit_with(RecvMsg { fd, buf, info })
}
Expand All @@ -127,15 +133,15 @@ impl<T: IoBufMut> Op<RecvMsg<T>> {

let addr = unsafe {
match storage.ss_family as libc::c_int {
libc::AF_INET => {
AF_INET => {
// Safety: if the ss_family field is AF_INET then storage must be a
// sockaddr_in.
let addr: &libc::sockaddr_in = transmute(&storage);
let ip = Ipv4Addr::from(addr.sin_addr.s_addr.to_ne_bytes());
let port = u16::from_be(addr.sin_port);
SocketAddr::V4(SocketAddrV4::new(ip, port))
}
libc::AF_INET6 => {
AF_INET6 => {
// Safety: if the ss_family field is AF_INET6 then storage must be a
// sockaddr_in6.
let addr: &libc::sockaddr_in6 = transmute(&storage);
Expand Down Expand Up @@ -214,7 +220,7 @@ impl<T: IoBufMut> Op<RecvMsgUnix<T>> {
info.2.msg_iov = info.1.as_mut_ptr();
info.2.msg_iovlen = 1;
info.2.msg_name = &mut info.0 as *mut _ as *mut libc::c_void;
info.2.msg_namelen = std::mem::size_of::<libc::sockaddr_storage>() as libc::socklen_t;
info.2.msg_namelen = std::mem::size_of::<libc::sockaddr_storage>() as socklen_t;

Op::submit_with(RecvMsgUnix { fd, buf, info })
}
Expand Down
32 changes: 27 additions & 5 deletions monoio/src/driver/op/send.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
#[cfg(all(unix, any(feature = "legacy", feature = "poll-io")))]
use std::os::unix::prelude::AsRawFd;
use std::{io, net::SocketAddr};

#[cfg(all(target_os = "linux", feature = "iouring"))]
use io_uring::{opcode, types};
use socket2::SockAddr;
#[cfg(any(feature = "legacy", feature = "poll-io"))]
#[cfg(all(windows, any(feature = "legacy", feature = "poll-io")))]
use {
crate::{driver::ready::Direction, syscall_u32},
std::os::unix::prelude::AsRawFd,
crate::syscall, std::os::windows::io::AsRawSocket,
windows_sys::Win32::Networking::WinSock::send,
};

use super::{super::shared_fd::SharedFd, Op, OpAble};
use crate::{buf::IoBuf, net::unix::SocketAddr as UnixSocketAddr, BufResult};
#[cfg(any(feature = "legacy", feature = "poll-io"))]
use crate::{driver::ready::Direction, syscall_u32};

pub(crate) struct Send<T> {
/// Holds a strong ref to the FD, preventing the file from being closed
Expand Down Expand Up @@ -84,19 +88,31 @@ impl<T: IoBuf> OpAble for Send<T> {

#[cfg(any(feature = "legacy", feature = "poll-io"))]
fn legacy_call(&mut self) -> io::Result<u32> {
#[cfg(unix)]
let fd = self.fd.as_raw_fd();
#[cfg(windows)]
let fd = self.fd.as_raw_socket();
#[cfg(target_os = "linux")]
#[allow(deprecated)]
let flags = libc::MSG_NOSIGNAL as _;
#[cfg(not(target_os = "linux"))]
let flags = 0;

syscall_u32!(send(
#[cfg(windows)]
return syscall!(send(
fd,
self.buf.read_ptr(),
self.buf.bytes_init() as _,
flags
));

#[cfg(unix)]
return syscall_u32!(send(
fd,
self.buf.read_ptr() as _,
self.buf.bytes_init(),
flags
))
));
}
}

Expand Down Expand Up @@ -176,7 +192,10 @@ impl<T: IoBuf> OpAble for SendMsg<T> {
const FLAGS: libc::c_int = libc::MSG_NOSIGNAL as libc::c_int;
#[cfg(not(target_os = "linux"))]
const FLAGS: libc::c_int = 0;
#[cfg(unix)]
let fd = self.fd.as_raw_fd();
#[cfg(windows)]
let fd = self.fd.as_raw_socket();
syscall_u32!(sendmsg(fd, &mut self.info.2 as *mut _, FLAGS))
}
}
Expand Down Expand Up @@ -258,7 +277,10 @@ impl<T: IoBuf> OpAble for SendMsgUnix<T> {
const FLAGS: libc::c_int = libc::MSG_NOSIGNAL as libc::c_int;
#[cfg(not(target_os = "linux"))]
const FLAGS: libc::c_int = 0;
#[cfg(unix)]
let fd = self.fd.as_raw_fd();
#[cfg(windows)]
let fd = self.fd.as_raw_socket();
syscall_u32!(sendmsg(fd, &mut self.info.2 as *mut _, FLAGS))
}
}
Loading

0 comments on commit 0d41c53

Please sign in to comment.