Skip to content

Commit 9ed9e11

Browse files
authored
support cancel coroutine/task (#399)
2 parents 1aa0ebb + c852665 commit 9ed9e11

File tree

18 files changed

+254
-11
lines changed

18 files changed

+254
-11
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
### 0.7.x
2+
3+
- [x] support cancel coroutine/task
4+
15
### 0.6.x
26

37
- [x] support custom task and coroutine priority.

README.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ English | [中文](README_ZH.md)
3434

3535
- [ ] add
3636
performance [benchmark](https://github.com/TechEmpower/FrameworkBenchmarks/wiki/Project-Information-Framework-Tests-Overview);
37-
- [ ] cancel coroutine/task;
3837
- [ ] add metrics;
3938
- [ ] add synchronization toolkit;
4039
- [ ] support and compatibility for AF_XDP socket;

README_ZH.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030

3131
- [ ]
3232
增加性能[基准测试](https://github.com/TechEmpower/FrameworkBenchmarks/wiki/Project-Information-Framework-Tests-Overview);
33-
- [ ] 取消协程/任务;
3433
- [ ] 增加性能指标监控;
3534
- [ ] 增加并发工具包;
3635
- [ ] 支持AF_XDP套接字;

core/src/co_pool/creator.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ impl Listener<(), Option<usize>> for CoroutineCreator {
2929
.store(pool.get_running_size().saturating_sub(1), Ordering::Release);
3030
}
3131
}
32-
CoroutineState::Error(_) => {
32+
CoroutineState::Cancelled | CoroutineState::Error(_) => {
3333
if let Some(pool) = CoroutinePool::current() {
3434
//worker协程异常退出,需要先回收再创建
3535
pool.running

core/src/co_pool/mod.rs

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,9 @@ use crate::common::ordered_work_steal::{OrderedLocalQueue, OrderedWorkStealQueue
66
use crate::common::{get_timeout_time, now, CondvarBlocker};
77
use crate::coroutine::suspender::Suspender;
88
use crate::scheduler::{SchedulableCoroutine, Scheduler};
9-
use crate::{error, impl_current_for, impl_display_by_debug, impl_for_named, trace};
9+
use crate::{error, impl_current_for, impl_display_by_debug, impl_for_named, trace, warn};
1010
use dashmap::{DashMap, DashSet};
11+
use once_cell::sync::Lazy;
1112
use std::cell::Cell;
1213
use std::ffi::c_longlong;
1314
use std::io::{Error, ErrorKind};
@@ -25,6 +26,11 @@ mod state;
2526
/// Creator for coroutine pool.
2627
mod creator;
2728

29+
/// `task_name` -> `co_name`
30+
static RUNNING_TASKS: Lazy<DashMap<&str, &str>> = Lazy::new(DashMap::new);
31+
32+
static CANCEL_TASKS: Lazy<DashSet<&str>> = Lazy::new(DashSet::new);
33+
2834
/// The coroutine pool impls.
2935
#[repr(C)]
3036
#[derive(Debug)]
@@ -383,7 +389,17 @@ impl<'p> CoroutinePool<'p> {
383389

384390
fn try_run(&self) -> Option<()> {
385391
self.task_queue.pop().map(|task| {
392+
let tname = task.get_name().to_string().leak();
393+
if CANCEL_TASKS.contains(tname) {
394+
_ = CANCEL_TASKS.remove(tname);
395+
warn!("Cancel task:{} successfully !", tname);
396+
return;
397+
}
398+
if let Some(co) = SchedulableCoroutine::current() {
399+
_ = RUNNING_TASKS.insert(tname, co.name());
400+
}
386401
let (task_name, result) = task.run();
402+
_ = RUNNING_TASKS.remove(tname);
387403
let n = task_name.clone().leak();
388404
if self.no_waits.contains(n) {
389405
_ = self.no_waits.remove(n);
@@ -406,6 +422,44 @@ impl<'p> CoroutinePool<'p> {
406422
}
407423
}
408424

425+
/// Try to cancel a task.
426+
pub fn try_cancel_task(task_name: &str) {
427+
// 检查正在运行的任务是否是要取消的任务
428+
if let Some(info) = RUNNING_TASKS.get(task_name) {
429+
let co_name = *info;
430+
// todo windows support
431+
#[allow(unused_variables)]
432+
if let Some(pthread) = Scheduler::get_scheduling_thread(co_name) {
433+
// 发送SIGVTALRM信号,在运行时取消任务
434+
#[cfg(unix)]
435+
if nix::sys::pthread::pthread_kill(pthread, nix::sys::signal::Signal::SIGVTALRM)
436+
.is_ok()
437+
{
438+
warn!(
439+
"Attempt to cancel task:{} running on coroutine:{} by thread:{}, cancelling...",
440+
task_name, co_name, pthread
441+
);
442+
} else {
443+
error!(
444+
"Attempt to cancel task:{} running on coroutine:{} by thread:{} failed !",
445+
task_name, co_name, pthread
446+
);
447+
}
448+
} else {
449+
// 添加到待取消队列
450+
Scheduler::try_cancel_coroutine(co_name);
451+
warn!(
452+
"Attempt to cancel task:{} running on coroutine:{}, cancelling...",
453+
task_name, co_name
454+
);
455+
}
456+
} else {
457+
// 添加到待取消队列
458+
_ = CANCEL_TASKS.insert(Box::leak(Box::from(task_name)));
459+
warn!("Attempt to cancel task:{}, cancelling...", task_name);
460+
}
461+
}
462+
409463
/// Schedule the tasks.
410464
///
411465
/// Allow multiple threads to concurrently submit task to the pool,

core/src/co_pool/task.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,12 @@ impl<'t> Task<'t> {
3333
}
3434
}
3535

36+
/// get the task name.
37+
#[must_use]
38+
pub fn get_name(&self) -> &str {
39+
&self.name
40+
}
41+
3642
/// execute the task
3743
///
3844
/// # Errors

core/src/common/constants.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,9 +195,11 @@ pub enum CoroutineState<Y, R> {
195195
Suspend(Y, u64),
196196
///The coroutine enters the syscall.
197197
Syscall(Y, SyscallName, SyscallState),
198+
/// The coroutine cancelled.
199+
Cancelled,
198200
/// The coroutine completed with a return value.
199201
Complete(R),
200-
/// The coroutine completed with a error message.
202+
/// The coroutine completed with an error message.
201203
Error(&'static str),
202204
}
203205

core/src/coroutine/korosensei.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -445,6 +445,10 @@ where
445445
let current = self.state();
446446
match current {
447447
CoroutineState::Running => {
448+
if Suspender::<Yield, Param>::is_cancel() {
449+
self.cancel()?;
450+
return Ok(CoroutineState::Cancelled);
451+
}
448452
let timestamp = Suspender::<Yield, Param>::timestamp();
449453
self.suspend(y, timestamp)?;
450454
Ok(CoroutineState::Suspend(y, timestamp))

core/src/coroutine/listener.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@ pub trait Listener<Yield, Return>: Debug {
2727
/// callback when the coroutine enters syscall.
2828
fn on_syscall(&self, local: &CoroutineLocal, old_state: CoroutineState<Yield, Return>) {}
2929

30+
/// Callback when the coroutine is cancelled.
31+
fn on_cancel(&self, local: &CoroutineLocal, old_state: CoroutineState<Yield, Return>) {}
32+
3033
/// Callback when the coroutine is completed.
3134
fn on_complete(
3235
&self,
@@ -91,6 +94,11 @@ where
9194
old_state: CoroutineState<Yield, Return>
9295
), "on_syscall");
9396

97+
broadcast!(on_cancel(
98+
local: &CoroutineLocal,
99+
old_state: CoroutineState<Yield, Return>
100+
), "on_cancel");
101+
94102
broadcast!(on_complete(
95103
local: &CoroutineLocal,
96104
old_state: CoroutineState<Yield, Return>,

core/src/coroutine/mod.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,35 @@ impl<'c, Param, Yield, Return> Coroutine<'c, Param, Yield, Return> {
135135
callback,
136136
)
137137
}
138+
139+
/// handle SIGVTALRM
140+
#[cfg(unix)]
141+
fn setup_sigvtalrm_handler() {
142+
use nix::sys::signal::{sigaction, SaFlags, SigAction, SigHandler, SigSet, Signal};
143+
use std::sync::atomic::{AtomicBool, Ordering};
144+
static CANCEL_HANDLER_INITED: AtomicBool = AtomicBool::new(false);
145+
if CANCEL_HANDLER_INITED
146+
.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
147+
.is_ok()
148+
{
149+
extern "C" fn sigvtalrm_handler<Param, Yield>(_: libc::c_int) {
150+
if let Some(suspender) = suspender::Suspender::<Param, Yield>::current() {
151+
suspender.cancel();
152+
}
153+
}
154+
// install SIGVTALRM signal handler
155+
let mut set = SigSet::empty();
156+
set.add(Signal::SIGVTALRM);
157+
let sa = SigAction::new(
158+
SigHandler::Handler(sigvtalrm_handler::<Param, Yield>),
159+
SaFlags::SA_RESTART,
160+
set,
161+
);
162+
unsafe {
163+
_ = sigaction(Signal::SIGVTALRM, &sa).expect("install SIGVTALRM handler failed !");
164+
}
165+
}
166+
}
138167
}
139168

140169
impl<Yield, Return> Coroutine<'_, (), Yield, Return>
@@ -170,6 +199,8 @@ where
170199
}
171200
Self::init_current(self);
172201
self.running()?;
202+
#[cfg(unix)]
203+
Self::setup_sigvtalrm_handler();
173204
let r = self.raw_resume(arg);
174205
Self::clean_current();
175206
r

0 commit comments

Comments
 (0)