@@ -24,16 +24,34 @@ cfg_if::cfg_if! {
2424 }
2525}
2626
27+ cfg_if:: cfg_if! {
28+ if #[ cfg( all( windows, feature = "iocp" ) ) ] {
29+ use dashmap:: DashMap ;
30+ use std:: ffi:: { c_longlong, c_uint} ;
31+ use windows_sys:: core:: { PCSTR , PSTR } ;
32+ use windows_sys:: Win32 :: Networking :: WinSock :: {
33+ LPWSAOVERLAPPED_COMPLETION_ROUTINE , SEND_RECV_FLAGS , SOCKADDR , SOCKET , WSABUF ,
34+ } ;
35+ use windows_sys:: Win32 :: System :: IO :: OVERLAPPED ;
36+ }
37+ }
38+
2739#[ repr( C ) ]
2840#[ derive( Debug ) ]
2941pub ( crate ) struct EventLoop < ' e > {
3042 stop : Arc < ( Mutex < bool > , Condvar ) > ,
3143 shared_stop : Arc < ( Mutex < AtomicUsize > , Condvar ) > ,
3244 cpu : usize ,
33- #[ cfg( all( target_os = "linux" , feature = "io_uring" ) ) ]
45+ #[ cfg( any(
46+ all( target_os = "linux" , feature = "io_uring" ) ,
47+ all( windows, feature = "iocp" )
48+ ) ) ]
3449 operator : crate :: net:: operator:: Operator < ' e > ,
3550 #[ allow( clippy:: type_complexity) ]
36- #[ cfg( all( target_os = "linux" , feature = "io_uring" ) ) ]
51+ #[ cfg( any(
52+ all( target_os = "linux" , feature = "io_uring" ) ,
53+ all( windows, feature = "iocp" )
54+ ) ) ]
3755 syscall_wait_table : DashMap < usize , Arc < ( Mutex < Option < c_longlong > > , Condvar ) > > ,
3856 selector : Poller ,
3957 pool : CoroutinePool < ' e > ,
@@ -87,9 +105,15 @@ impl<'e> EventLoop<'e> {
87105 stop : Arc :: new ( ( Mutex :: new ( false ) , Condvar :: new ( ) ) ) ,
88106 shared_stop,
89107 cpu,
90- #[ cfg( all( target_os = "linux" , feature = "io_uring" ) ) ]
108+ #[ cfg( any(
109+ all( target_os = "linux" , feature = "io_uring" ) ,
110+ all( windows, feature = "iocp" )
111+ ) ) ]
91112 operator : crate :: net:: operator:: Operator :: new ( cpu) ?,
92- #[ cfg( all( target_os = "linux" , feature = "io_uring" ) ) ]
113+ #[ cfg( any(
114+ all( target_os = "linux" , feature = "io_uring" ) ,
115+ all( windows, feature = "iocp" )
116+ ) ) ]
93117 syscall_wait_table : DashMap :: new ( ) ,
94118 selector : Poller :: new ( ) ?,
95119 pool : CoroutinePool :: new ( name, stack_size, min_size, max_size, keep_alive_time) ,
@@ -222,6 +246,8 @@ impl<'e> EventLoop<'e> {
222246 cfg_if:: cfg_if! {
223247 if #[ cfg( all( target_os = "linux" , feature = "io_uring" ) ) ] {
224248 left_time = self . adapt_io_uring( left_time) ?;
249+ } else if #[ cfg( all( windows, feature = "iocp" ) ) ] {
250+ left_time = self . adapt_iocp( left_time) ?;
225251 }
226252 }
227253
@@ -267,6 +293,28 @@ impl<'e> EventLoop<'e> {
267293 Ok ( left_time)
268294 }
269295
296+ #[ cfg( all( windows, feature = "iocp" ) ) ]
297+ fn adapt_iocp ( & self , mut left_time : Option < Duration > ) -> std:: io:: Result < Option < Duration > > {
298+ // use IOCP
299+ let ( count, mut cq, left) = self . operator . select ( left_time, 0 ) ?;
300+ if count > 0 {
301+ for cqe in & mut cq {
302+ let token = cqe. token ;
303+ if let Some ( ( _, pair) ) = self . syscall_wait_table . remove ( & token) {
304+ let ( lock, cvar) = & * pair;
305+ let mut pending = lock. lock ( ) . expect ( "lock failed" ) ;
306+ * pending = Some ( cqe. result ) ;
307+ cvar. notify_one ( ) ;
308+ }
309+ unsafe { self . resume ( token) } ;
310+ }
311+ }
312+ if left != left_time {
313+ left_time = Some ( left. unwrap_or ( Duration :: ZERO ) ) ;
314+ }
315+ Ok ( left_time)
316+ }
317+
270318 unsafe fn resume ( & self , token : usize ) {
271319 if COROUTINE_TOKENS . remove ( & token) . is_none ( ) {
272320 return ;
@@ -446,6 +494,34 @@ impl_io_uring!(mkdirat(dirfd: c_int, pathname: *const c_char, mode: mode_t) -> c
446494impl_io_uring ! ( renameat( olddirfd: c_int, oldpath: * const c_char, newdirfd: c_int, newpath: * const c_char) -> c_int) ;
447495impl_io_uring ! ( renameat2( olddirfd: c_int, oldpath: * const c_char, newdirfd: c_int, newpath: * const c_char, flags: c_uint) -> c_int) ;
448496
497+ macro_rules! impl_iocp {
498+ ( $syscall: ident( $( $arg: ident : $arg_type: ty) ,* ) -> $result: ty ) => {
499+ #[ cfg( all( windows, feature = "iocp" ) ) ]
500+ impl EventLoop <' _> {
501+ #[ allow( non_snake_case, clippy:: too_many_arguments) ]
502+ pub ( super ) fn $syscall(
503+ & self ,
504+ $( $arg: $arg_type) ,*
505+ ) -> std:: io:: Result <Arc <( Mutex <Option <c_longlong>>, Condvar ) >> {
506+ let token = EventLoop :: token( SyscallName :: $syscall) ;
507+ self . operator. $syscall( token, $( $arg, ) * ) ?;
508+ let arc = Arc :: new( ( Mutex :: new( None ) , Condvar :: new( ) ) ) ;
509+ assert!(
510+ self . syscall_wait_table. insert( token, arc. clone( ) ) . is_none( ) ,
511+ "The previous token was not retrieved in a timely manner"
512+ ) ;
513+ Ok ( arc)
514+ }
515+ }
516+ }
517+ }
518+
519+ impl_iocp ! ( accept( fd: SOCKET , addr: * mut SOCKADDR , len: * mut c_int) -> c_int) ;
520+ impl_iocp ! ( recv( fd: SOCKET , buf: PSTR , len: c_int, flags: SEND_RECV_FLAGS ) -> c_int) ;
521+ impl_iocp ! ( WSARecv ( fd: SOCKET , buf: * const WSABUF , dwbuffercount: c_uint, lpnumberofbytesrecvd: * mut c_uint, lpflags : * mut c_uint, lpoverlapped: * mut OVERLAPPED , lpcompletionroutine : LPWSAOVERLAPPED_COMPLETION_ROUTINE ) -> c_int) ;
522+ impl_iocp ! ( send( fd: SOCKET , buf: PCSTR , len: c_int, flags: SEND_RECV_FLAGS ) -> c_int) ;
523+ impl_iocp ! ( WSASend ( fd: SOCKET , buf: * const WSABUF , dwbuffercount: c_uint, lpnumberofbytesrecvd: * mut c_uint, dwflags : c_uint, lpoverlapped: * mut OVERLAPPED , lpcompletionroutine : LPWSAOVERLAPPED_COMPLETION_ROUTINE ) -> c_int) ;
524+
449525#[ cfg( all( test, not( all( unix, feature = "preemptive" ) ) ) ) ]
450526mod tests {
451527 use crate :: net:: event_loop:: EventLoop ;
0 commit comments