Skip to content

Commit 1710088

Browse files
committed
basic support IOCP Operator
1 parent 1303607 commit 1710088

File tree

15 files changed

+1035
-52
lines changed

15 files changed

+1035
-52
lines changed

.github/workflows/ci-preemptive.sh

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,3 +34,13 @@ if [ "${TARGET}" = "x86_64-unknown-linux-gnu" ]; then
3434
"${CARGO}" test --target "${TARGET}" --no-default-features --features io_uring,preemptive,ci
3535
"${CARGO}" test --target "${TARGET}" --no-default-features --features io_uring,preemptive,ci --release
3636
fi
37+
38+
# test IOCP
39+
if [ "${OS}" = "windows-latest" ]; then
40+
cd "${PROJECT_DIR}"/core
41+
"${CARGO}" test --target "${TARGET}" --no-default-features --features iocp,preemptive,ci
42+
"${CARGO}" test --target "${TARGET}" --no-default-features --features iocp,preemptive,ci --release
43+
cd "${PROJECT_DIR}"/open-coroutine
44+
"${CARGO}" test --target "${TARGET}" --no-default-features --features iocp,preemptive,ci
45+
"${CARGO}" test --target "${TARGET}" --no-default-features --features iocp,preemptive,ci --release
46+
fi

.github/workflows/ci.sh

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,3 +34,13 @@ if [ "${TARGET}" = "x86_64-unknown-linux-gnu" ]; then
3434
"${CARGO}" test --target "${TARGET}" --no-default-features --features io_uring,ci
3535
"${CARGO}" test --target "${TARGET}" --no-default-features --features io_uring,ci --release
3636
fi
37+
38+
# test IOCP
39+
if [ "${OS}" = "windows-latest" ]; then
40+
cd "${PROJECT_DIR}"/core
41+
"${CARGO}" test --target "${TARGET}" --no-default-features --features iocp,ci
42+
"${CARGO}" test --target "${TARGET}" --no-default-features --features iocp,ci --release
43+
cd "${PROJECT_DIR}"/open-coroutine
44+
"${CARGO}" test --target "${TARGET}" --no-default-features --features iocp,ci
45+
"${CARGO}" test --target "${TARGET}" --no-default-features --features iocp,ci --release
46+
fi

core/Cargo.toml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ windows-sys = { workspace = true, features = [
5858
"Win32_Networking_WinSock",
5959
"Win32_System_SystemInformation",
6060
"Win32_System_Diagnostics_Debug",
61+
"Win32_System_WindowsProgramming",
6162
] }
6263
polling = { workspace = true, optional = true }
6364

@@ -95,5 +96,11 @@ net = ["korosensei", "polling", "mio", "crossbeam-utils", "core_affinity"]
9596
# Provide io_uring adaptation, this feature only works in linux.
9697
io_uring = ["net", "io-uring"]
9798

99+
# Provide IOCP adaptation, this feature only works in windows.
100+
iocp = ["net"]
101+
102+
# Provide completion IO adaptation
103+
completion_io = ["io_uring", "iocp"]
104+
98105
# Provide syscall implementation.
99106
syscall = ["net"]

core/src/net/event_loop.rs

Lines changed: 80 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,16 +24,34 @@ cfg_if::cfg_if! {
2424
}
2525
}
2626

27+
cfg_if::cfg_if! {
28+
if #[cfg(all(windows, feature = "iocp"))] {
29+
use dashmap::DashMap;
30+
use std::ffi::{c_longlong, c_uint};
31+
use windows_sys::core::{PCSTR, PSTR};
32+
use windows_sys::Win32::Networking::WinSock::{
33+
LPWSAOVERLAPPED_COMPLETION_ROUTINE, SEND_RECV_FLAGS, SOCKADDR, SOCKET, WSABUF,
34+
};
35+
use windows_sys::Win32::System::IO::OVERLAPPED;
36+
}
37+
}
38+
2739
#[repr(C)]
2840
#[derive(Debug)]
2941
pub(crate) struct EventLoop<'e> {
3042
stop: Arc<(Mutex<bool>, Condvar)>,
3143
shared_stop: Arc<(Mutex<AtomicUsize>, Condvar)>,
3244
cpu: usize,
33-
#[cfg(all(target_os = "linux", feature = "io_uring"))]
45+
#[cfg(any(
46+
all(target_os = "linux", feature = "io_uring"),
47+
all(windows, feature = "iocp")
48+
))]
3449
operator: crate::net::operator::Operator<'e>,
3550
#[allow(clippy::type_complexity)]
36-
#[cfg(all(target_os = "linux", feature = "io_uring"))]
51+
#[cfg(any(
52+
all(target_os = "linux", feature = "io_uring"),
53+
all(windows, feature = "iocp")
54+
))]
3755
syscall_wait_table: DashMap<usize, Arc<(Mutex<Option<c_longlong>>, Condvar)>>,
3856
selector: Poller,
3957
pool: CoroutinePool<'e>,
@@ -87,9 +105,15 @@ impl<'e> EventLoop<'e> {
87105
stop: Arc::new((Mutex::new(false), Condvar::new())),
88106
shared_stop,
89107
cpu,
90-
#[cfg(all(target_os = "linux", feature = "io_uring"))]
108+
#[cfg(any(
109+
all(target_os = "linux", feature = "io_uring"),
110+
all(windows, feature = "iocp")
111+
))]
91112
operator: crate::net::operator::Operator::new(cpu)?,
92-
#[cfg(all(target_os = "linux", feature = "io_uring"))]
113+
#[cfg(any(
114+
all(target_os = "linux", feature = "io_uring"),
115+
all(windows, feature = "iocp")
116+
))]
93117
syscall_wait_table: DashMap::new(),
94118
selector: Poller::new()?,
95119
pool: CoroutinePool::new(name, stack_size, min_size, max_size, keep_alive_time),
@@ -222,6 +246,8 @@ impl<'e> EventLoop<'e> {
222246
cfg_if::cfg_if! {
223247
if #[cfg(all(target_os = "linux", feature = "io_uring"))] {
224248
left_time = self.adapt_io_uring(left_time)?;
249+
} else if #[cfg(all(windows, feature = "iocp"))] {
250+
left_time = self.adapt_iocp(left_time)?;
225251
}
226252
}
227253

@@ -267,6 +293,28 @@ impl<'e> EventLoop<'e> {
267293
Ok(left_time)
268294
}
269295

296+
#[cfg(all(windows, feature = "iocp"))]
297+
fn adapt_iocp(&self, mut left_time: Option<Duration>) -> std::io::Result<Option<Duration>> {
298+
// use IOCP
299+
let (count, mut cq, left) = self.operator.select(left_time, 0)?;
300+
if count > 0 {
301+
for cqe in &mut cq {
302+
let token = cqe.token;
303+
if let Some((_, pair)) = self.syscall_wait_table.remove(&token) {
304+
let (lock, cvar) = &*pair;
305+
let mut pending = lock.lock().expect("lock failed");
306+
*pending = Some(cqe.result);
307+
cvar.notify_one();
308+
}
309+
unsafe { self.resume(token) };
310+
}
311+
}
312+
if left != left_time {
313+
left_time = Some(left.unwrap_or(Duration::ZERO));
314+
}
315+
Ok(left_time)
316+
}
317+
270318
unsafe fn resume(&self, token: usize) {
271319
if COROUTINE_TOKENS.remove(&token).is_none() {
272320
return;
@@ -446,6 +494,34 @@ impl_io_uring!(mkdirat(dirfd: c_int, pathname: *const c_char, mode: mode_t) -> c
446494
impl_io_uring!(renameat(olddirfd: c_int, oldpath: *const c_char, newdirfd: c_int, newpath: *const c_char) -> c_int);
447495
impl_io_uring!(renameat2(olddirfd: c_int, oldpath: *const c_char, newdirfd: c_int, newpath: *const c_char, flags: c_uint) -> c_int);
448496

497+
macro_rules! impl_iocp {
498+
( $syscall: ident($($arg: ident : $arg_type: ty),*) -> $result: ty ) => {
499+
#[cfg(all(windows, feature = "iocp"))]
500+
impl EventLoop<'_> {
501+
#[allow(non_snake_case, clippy::too_many_arguments)]
502+
pub(super) fn $syscall(
503+
&self,
504+
$($arg: $arg_type),*
505+
) -> std::io::Result<Arc<(Mutex<Option<c_longlong>>, Condvar)>> {
506+
let token = EventLoop::token(SyscallName::$syscall);
507+
self.operator.$syscall(token, $($arg, )*)?;
508+
let arc = Arc::new((Mutex::new(None), Condvar::new()));
509+
assert!(
510+
self.syscall_wait_table.insert(token, arc.clone()).is_none(),
511+
"The previous token was not retrieved in a timely manner"
512+
);
513+
Ok(arc)
514+
}
515+
}
516+
}
517+
}
518+
519+
impl_iocp!(accept(fd: SOCKET, addr: *mut SOCKADDR, len: *mut c_int) -> c_int);
520+
impl_iocp!(recv(fd: SOCKET, buf: PSTR, len: c_int, flags: SEND_RECV_FLAGS) -> c_int);
521+
impl_iocp!(WSARecv(fd: SOCKET, buf: *const WSABUF, dwbuffercount: c_uint, lpnumberofbytesrecvd: *mut c_uint, lpflags : *mut c_uint, lpoverlapped: *mut OVERLAPPED, lpcompletionroutine : LPWSAOVERLAPPED_COMPLETION_ROUTINE) -> c_int);
522+
impl_iocp!(send(fd: SOCKET, buf: PCSTR, len: c_int, flags: SEND_RECV_FLAGS) -> c_int);
523+
impl_iocp!(WSASend(fd: SOCKET, buf: *const WSABUF, dwbuffercount: c_uint, lpnumberofbytesrecvd: *mut c_uint, dwflags : c_uint, lpoverlapped: *mut OVERLAPPED, lpcompletionroutine : LPWSAOVERLAPPED_COMPLETION_ROUTINE) -> c_int);
524+
449525
#[cfg(all(test, not(all(unix, feature = "preemptive"))))]
450526
mod tests {
451527
use crate::net::event_loop::EventLoop;

core/src/net/mod.rs

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,27 @@ cfg_if::cfg_if! {
1818
}
1919
}
2020

21+
cfg_if::cfg_if! {
22+
if #[cfg(all(windows, feature = "iocp"))] {
23+
use std::ffi::c_uint;
24+
use windows_sys::core::{PCSTR, PSTR};
25+
use windows_sys::Win32::Networking::WinSock::{
26+
LPWSAOVERLAPPED_COMPLETION_ROUTINE, SEND_RECV_FLAGS, SOCKADDR, SOCKET, WSABUF,
27+
};
28+
use windows_sys::Win32::System::IO::OVERLAPPED;
29+
}
30+
}
31+
2132
/// 做C兼容时会用到
2233
pub type UserFunc = extern "C" fn(usize) -> usize;
2334

2435
mod selector;
2536

2637
#[allow(clippy::too_many_arguments)]
27-
#[cfg(all(target_os = "linux", feature = "io_uring"))]
38+
#[cfg(any(
39+
all(target_os = "linux", feature = "io_uring"),
40+
all(windows, feature = "iocp")
41+
))]
2842
mod operator;
2943

3044
#[allow(missing_docs)]
@@ -280,3 +294,24 @@ impl_io_uring!(fsync(fd: c_int) -> c_int);
280294
impl_io_uring!(mkdirat(dirfd: c_int, pathname: *const c_char, mode: mode_t) -> c_int);
281295
impl_io_uring!(renameat(olddirfd: c_int, oldpath: *const c_char, newdirfd: c_int, newpath: *const c_char) -> c_int);
282296
impl_io_uring!(renameat2(olddirfd: c_int, oldpath: *const c_char, newdirfd: c_int, newpath: *const c_char, flags: c_uint) -> c_int);
297+
298+
macro_rules! impl_iocp {
299+
( $syscall: ident($($arg: ident : $arg_type: ty),*) -> $result: ty ) => {
300+
#[allow(non_snake_case)]
301+
#[cfg(all(windows, feature = "iocp"))]
302+
impl EventLoops {
303+
#[allow(missing_docs)]
304+
pub fn $syscall(
305+
$($arg: $arg_type),*
306+
) -> std::io::Result<Arc<(Mutex<Option<c_longlong>>, Condvar)>> {
307+
Self::event_loop().$syscall($($arg, )*)
308+
}
309+
}
310+
}
311+
}
312+
313+
impl_iocp!(accept(fd: SOCKET, addr: *mut SOCKADDR, len: *mut c_int) -> c_int);
314+
impl_iocp!(recv(fd: SOCKET, buf: PSTR, len: c_int, flags: SEND_RECV_FLAGS) -> c_int);
315+
impl_iocp!(WSARecv(fd: SOCKET, buf: *const WSABUF, dwbuffercount: c_uint, lpnumberofbytesrecvd: *mut c_uint, lpflags : *mut c_uint, lpoverlapped: *mut OVERLAPPED, lpcompletionroutine : LPWSAOVERLAPPED_COMPLETION_ROUTINE) -> c_int);
316+
impl_iocp!(send(fd: SOCKET, buf: PCSTR, len: c_int, flags: SEND_RECV_FLAGS) -> c_int);
317+
impl_iocp!(WSASend(fd: SOCKET, buf: *const WSABUF, dwbuffercount: c_uint, lpnumberofbytesrecvd: *mut c_uint, dwflags : c_uint, lpoverlapped: *mut OVERLAPPED, lpcompletionroutine : LPWSAOVERLAPPED_COMPLETION_ROUTINE) -> c_int);

core/src/net/operator/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,9 @@
22
mod linux;
33
#[cfg(all(target_os = "linux", feature = "io_uring"))]
44
pub(crate) use linux::*;
5+
6+
#[allow(non_snake_case)]
7+
#[cfg(all(windows, feature = "iocp"))]
8+
mod windows;
9+
#[cfg(all(windows, feature = "iocp"))]
10+
pub(crate) use windows::*;

0 commit comments

Comments
 (0)