Skip to content

Commit d5defe3

Browse files
committed
add cancel tests
1 parent 9ed9e11 commit d5defe3

File tree

7 files changed

+125
-24
lines changed

7 files changed

+125
-24
lines changed

core/src/co_pool/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -378,7 +378,7 @@ impl<'p> CoroutinePool<'p> {
378378
"The coroutine pool has reached its maximum size !",
379379
));
380380
}
381-
self.deref().submit_co(f, stack_size, priority).map(|()| {
381+
self.deref().submit_co(f, stack_size, priority).map(|_| {
382382
_ = self.running.fetch_add(1, Ordering::Release);
383383
})
384384
}

core/src/coroutine/suspender.rs

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,22 @@ impl<Param, Yield> Suspender<'_, Param, Yield> {
3434
self.suspend_with(arg)
3535
}
3636

37+
pub(crate) fn timestamp() -> u64 {
38+
TIMESTAMP
39+
.with(|s| unsafe {
40+
s.as_ptr()
41+
.as_mut()
42+
.unwrap_or_else(|| {
43+
panic!(
44+
"thread:{} get TIMESTAMP current failed",
45+
std::thread::current().name().unwrap_or("unknown")
46+
)
47+
})
48+
.pop_front()
49+
})
50+
.unwrap_or(0)
51+
}
52+
3753
/// Cancel the execution of the coroutine.
3854
pub fn cancel(&self) -> ! {
3955
CANCEL.with(|s| unsafe {
@@ -51,22 +67,6 @@ impl<Param, Yield> Suspender<'_, Param, Yield> {
5167
unreachable!()
5268
}
5369

54-
pub(crate) fn timestamp() -> u64 {
55-
TIMESTAMP
56-
.with(|s| unsafe {
57-
s.as_ptr()
58-
.as_mut()
59-
.unwrap_or_else(|| {
60-
panic!(
61-
"thread:{} get TIMESTAMP current failed",
62-
std::thread::current().name().unwrap_or("unknown")
63-
)
64-
})
65-
.pop_front()
66-
})
67-
.unwrap_or(0)
68-
}
69-
7070
pub(crate) fn is_cancel() -> bool {
7171
CANCEL
7272
.with(|s| unsafe {

core/src/scheduler.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ impl<'s> Scheduler<'s> {
186186
f: impl FnOnce(&Suspender<(), ()>, ()) -> Option<usize> + 'static,
187187
stack_size: Option<usize>,
188188
priority: Option<c_longlong>,
189-
) -> std::io::Result<()> {
189+
) -> std::io::Result<String> {
190190
self.submit_raw_co(co!(
191191
Some(format!("{}@{}", self.name(), uuid::Uuid::new_v4())),
192192
f,
@@ -204,12 +204,13 @@ impl<'s> Scheduler<'s> {
204204
///
205205
/// Allow multiple threads to concurrently submit coroutine to the scheduler,
206206
/// but only allow one thread to execute scheduling.
207-
pub fn submit_raw_co(&self, mut co: SchedulableCoroutine<'s>) -> std::io::Result<()> {
207+
pub fn submit_raw_co(&self, mut co: SchedulableCoroutine<'s>) -> std::io::Result<String> {
208208
for listener in self.listeners.clone() {
209209
co.add_raw_listener(listener);
210210
}
211+
let co_name = co.name().to_string();
211212
self.ready.push(co);
212-
Ok(())
213+
Ok(co_name)
213214
}
214215

215216
/// Resume a coroutine from the syscall table to the ready queue,

core/tests/co_pool.rs

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
#[cfg(not(all(unix, feature = "preemptive")))]
22
#[test]
33
fn co_pool_basic() -> std::io::Result<()> {
4-
let task_name = "test_simple";
54
let mut pool = open_coroutine_core::co_pool::CoroutinePool::default();
65
pool.set_max_size(1);
76
assert!(pool.is_empty());
@@ -13,7 +12,7 @@ fn co_pool_basic() -> std::io::Result<()> {
1312
)?;
1413
assert!(!pool.is_empty());
1514
pool.submit_task(
16-
Some(String::from(task_name)),
15+
Some(String::from("test_simple")),
1716
|_| {
1817
println!("2");
1918
Some(2)
@@ -74,3 +73,29 @@ fn co_pool_stop() -> std::io::Result<()> {
7473
)
7574
.map(|_| ())
7675
}
76+
77+
#[cfg(not(all(unix, feature = "preemptive")))]
78+
#[test]
79+
fn co_pool_cancel() -> std::io::Result<()> {
80+
let mut pool = open_coroutine_core::co_pool::CoroutinePool::default();
81+
pool.set_max_size(1);
82+
assert!(pool.is_empty());
83+
let task_name = pool.submit_task(
84+
Some(String::from("test_panic")),
85+
|_| panic!("test panic, just ignore it"),
86+
None,
87+
None,
88+
)?;
89+
assert!(!pool.is_empty());
90+
open_coroutine_core::co_pool::CoroutinePool::try_cancel_task(&task_name);
91+
pool.submit_task(
92+
Some(String::from("test_simple")),
93+
|_| {
94+
println!("2");
95+
Some(2)
96+
},
97+
None,
98+
None,
99+
)?;
100+
pool.try_schedule_task()
101+
}

core/tests/coroutine.rs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,3 +280,42 @@ fn coroutine_syscall_not_preemptive() -> std::io::Result<()> {
280280
))
281281
}
282282
}
283+
284+
#[cfg(all(unix, not(feature = "preemptive")))]
285+
#[test]
286+
fn coroutine_cancel() -> std::io::Result<()> {
287+
use std::os::unix::prelude::JoinHandleExt;
288+
let pair = std::sync::Arc::new((std::sync::Mutex::new(true), std::sync::Condvar::new()));
289+
let pair2 = pair.clone();
290+
let handle = std::thread::Builder::new()
291+
.name("cancel".to_string())
292+
.spawn(move || {
293+
let mut coroutine: Coroutine<(), (), ()> = co!(|_, ()| { loop {} })?;
294+
assert_eq!(CoroutineState::Cancelled, coroutine.resume()?);
295+
assert_eq!(CoroutineState::Cancelled, coroutine.state());
296+
// should execute to here
297+
let (lock, cvar) = &*pair2;
298+
let mut pending = lock.lock().unwrap();
299+
*pending = false;
300+
cvar.notify_one();
301+
Ok::<(), std::io::Error>(())
302+
})?;
303+
// wait for the thread to start up
304+
std::thread::sleep(std::time::Duration::from_millis(500));
305+
nix::sys::pthread::pthread_kill(handle.as_pthread_t(), nix::sys::signal::Signal::SIGVTALRM)?;
306+
let (lock, cvar) = &*pair;
307+
let result = cvar
308+
.wait_timeout_while(
309+
lock.lock().unwrap(),
310+
std::time::Duration::from_millis(3000),
311+
|&mut pending| pending,
312+
)
313+
.unwrap();
314+
if result.1.timed_out() {
315+
Err(std::io::Error::other(
316+
"The test thread should send signals to coroutines in running state",
317+
))
318+
} else {
319+
Ok(())
320+
}
321+
}

core/tests/scheduler.rs

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,8 +117,8 @@ fn scheduler_listener() -> std::io::Result<()> {
117117

118118
let mut scheduler = Scheduler::default();
119119
scheduler.add_listener(TestListener::default());
120-
scheduler.submit_co(|_, _| panic!("test panic, just ignore it"), None, None)?;
121-
scheduler.submit_co(
120+
_ = scheduler.submit_co(|_, _| panic!("test panic, just ignore it"), None, None)?;
121+
_ = scheduler.submit_co(
122122
|_, _| {
123123
println!("2");
124124
None
@@ -129,3 +129,31 @@ fn scheduler_listener() -> std::io::Result<()> {
129129
scheduler.try_schedule()?;
130130
Ok(())
131131
}
132+
133+
#[test]
134+
fn scheduler_try_cancel_coroutine() -> std::io::Result<()> {
135+
let mut scheduler = Scheduler::default();
136+
let co_name = scheduler.submit_co(
137+
|suspender, _| {
138+
println!("[coroutine1] suspend");
139+
suspender.suspend();
140+
println!("[coroutine1] back");
141+
None
142+
},
143+
None,
144+
None,
145+
)?;
146+
Scheduler::try_cancel_coroutine(&co_name);
147+
_ = scheduler.submit_co(
148+
|suspender, _| {
149+
println!("[coroutine2] suspend");
150+
suspender.suspend();
151+
println!("[coroutine2] back");
152+
None
153+
},
154+
None,
155+
None,
156+
)?;
157+
scheduler.try_schedule()?;
158+
Ok(())
159+
}

open-coroutine/src/lib.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -375,6 +375,14 @@ mod tests {
375375
fn test() {
376376
init(Config::single());
377377
_ = any_join!(task!(|_| 1, ()), task!(|_| 2, ()), task!(|_| 3, ()));
378+
task!(
379+
|_| {
380+
println!("Try cancel!");
381+
},
382+
(),
383+
)
384+
.try_cancel()
385+
.expect("cancel failed");
378386
let join = task!(
379387
|_| {
380388
println!("Hello, world!");

0 commit comments

Comments
 (0)