Skip to content

Commit fc431db

Browse files
authored
clean expired result (#382)
2 parents fcc9541 + 4a78c33 commit fc431db

File tree

4 files changed

+51
-26
lines changed

4 files changed

+51
-26
lines changed

core/src/co_pool/mod.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ impl<'p> CoroutinePool<'p> {
236236
}
237237

238238
/// Attempt to obtain task results with the given `task_name`.
239-
pub fn try_get_task_result(&self, task_name: &str) -> Option<Result<Option<usize>, &'p str>> {
239+
pub fn try_take_task_result(&self, task_name: &str) -> Option<Result<Option<usize>, &'p str>> {
240240
self.results.remove(task_name).map(|(_, r)| r)
241241
}
242242

@@ -251,7 +251,7 @@ impl<'p> CoroutinePool<'p> {
251251
wait_time: Duration,
252252
) -> std::io::Result<Result<Option<usize>, &str>> {
253253
let key = Box::leak(Box::from(task_name));
254-
if let Some(r) = self.try_get_task_result(key) {
254+
if let Some(r) = self.try_take_task_result(key) {
255255
self.notify(key);
256256
drop(self.waits.remove(key));
257257
return Ok(r);
@@ -260,7 +260,7 @@ impl<'p> CoroutinePool<'p> {
260260
let timeout_time = get_timeout_time(wait_time);
261261
loop {
262262
_ = self.try_run();
263-
if let Some(r) = self.try_get_task_result(key) {
263+
if let Some(r) = self.try_take_task_result(key) {
264264
return Ok(r);
265265
}
266266
if timeout_time.saturating_sub(now()) == 0 {
@@ -285,7 +285,7 @@ impl<'p> CoroutinePool<'p> {
285285
)
286286
.map_err(|e| Error::new(ErrorKind::Other, format!("{e}")))?,
287287
);
288-
if let Some(r) = self.try_get_task_result(key) {
288+
if let Some(r) = self.try_take_task_result(key) {
289289
self.notify(key);
290290
assert!(self.waits.remove(key).is_some());
291291
return Ok(r);
@@ -436,8 +436,8 @@ impl<'p> CoroutinePool<'p> {
436436
}
437437
}
438438
Self::init_current(self);
439-
let left_time = self.try_timeout_schedule(timeout_time);
439+
let r = self.try_timeout_schedule(timeout_time);
440440
Self::clean_current();
441-
left_time
441+
r.map(|(left_time, _)| left_time)
442442
}
443443
}

core/src/net/join.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,15 @@ use std::time::Duration;
99
#[derive(Debug)]
1010
pub struct JoinHandle(&'static Arc<EventLoop<'static>>, *const c_char);
1111

12+
impl Drop for JoinHandle {
13+
fn drop(&mut self) {
14+
if let Ok(name) = self.get_name() {
15+
// clean data
16+
_ = self.0.try_take_task_result(name);
17+
}
18+
}
19+
}
20+
1221
impl JoinHandle {
1322
/// create `JoinHandle` instance.
1423
pub(crate) fn err(pool: &'static Arc<EventLoop<'static>>) -> Self {

core/src/scheduler.rs

Lines changed: 26 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use crate::coroutine::suspender::Suspender;
77
use crate::coroutine::Coroutine;
88
use crate::{co, impl_current_for, impl_display_by_debug, impl_for_named};
99
use dashmap::DashMap;
10-
use std::collections::{BinaryHeap, VecDeque};
10+
use std::collections::{BinaryHeap, HashMap, VecDeque};
1111
use std::ffi::c_longlong;
1212
use std::io::{Error, ErrorKind};
1313
use std::sync::atomic::{AtomicUsize, Ordering};
@@ -90,7 +90,6 @@ pub struct Scheduler<'s> {
9090
suspend: BinaryHeap<SuspendItem<'s>>,
9191
syscall: DashMap<&'s str, SchedulableCoroutine<'s>>,
9292
syscall_suspend: BinaryHeap<SyscallSuspendItem<'s>>,
93-
results: DashMap<&'s str, Result<Option<usize>, &'s str>>,
9493
}
9594

9695
impl Default for Scheduler<'_> {
@@ -107,9 +106,10 @@ impl Drop for Scheduler<'_> {
107106
if std::thread::panicking() {
108107
return;
109108
}
109+
let name = self.name.clone();
110110
_ = self
111111
.try_timed_schedule(Duration::from_secs(30))
112-
.unwrap_or_else(|_| panic!("Failed to stop scheduler {} !", self.name()));
112+
.unwrap_or_else(|e| panic!("Failed to stop scheduler {name} due to {e} !"));
113113
assert!(
114114
self.ready.is_empty(),
115115
"There are still coroutines to be carried out in the ready queue:{:#?} !",
@@ -134,6 +134,7 @@ impl_current_for!(SCHEDULER, Scheduler<'s>);
134134

135135
impl_display_by_debug!(Scheduler<'s>);
136136

137+
#[allow(clippy::type_complexity)]
137138
impl<'s> Scheduler<'s> {
138139
/// Creates a new scheduler.
139140
#[must_use]
@@ -149,7 +150,6 @@ impl<'s> Scheduler<'s> {
149150
suspend: BinaryHeap::default(),
150151
syscall: DashMap::default(),
151152
syscall_suspend: BinaryHeap::default(),
152-
results: DashMap::default(),
153153
}
154154
}
155155

@@ -229,8 +229,9 @@ impl<'s> Scheduler<'s> {
229229
///
230230
/// # Errors
231231
/// see `try_timeout_schedule`.
232-
pub fn try_schedule(&mut self) -> std::io::Result<()> {
233-
self.try_timeout_schedule(u64::MAX).map(|_| ())
232+
pub fn try_schedule(&mut self) -> std::io::Result<HashMap<&str, Result<Option<usize>, &str>>> {
233+
self.try_timeout_schedule(u64::MAX)
234+
.map(|(_, results)| results)
234235
}
235236

236237
/// Try scheduling the coroutines for up to `dur`.
@@ -240,7 +241,10 @@ impl<'s> Scheduler<'s> {
240241
///
241242
/// # Errors
242243
/// see `try_timeout_schedule`.
243-
pub fn try_timed_schedule(&mut self, dur: Duration) -> std::io::Result<u64> {
244+
pub fn try_timed_schedule(
245+
&mut self,
246+
dur: Duration,
247+
) -> std::io::Result<(u64, HashMap<&str, Result<Option<usize>, &str>>)> {
244248
self.try_timeout_schedule(get_timeout_time(dur))
245249
}
246250

@@ -253,18 +257,25 @@ impl<'s> Scheduler<'s> {
253257
///
254258
/// # Errors
255259
/// if change to ready fails.
256-
pub fn try_timeout_schedule(&mut self, timeout_time: u64) -> std::io::Result<u64> {
260+
pub fn try_timeout_schedule(
261+
&mut self,
262+
timeout_time: u64,
263+
) -> std::io::Result<(u64, HashMap<&str, Result<Option<usize>, &str>>)> {
257264
Self::init_current(self);
258-
let left_time = self.do_schedule(timeout_time);
265+
let r = self.do_schedule(timeout_time);
259266
Self::clean_current();
260-
left_time
267+
r
261268
}
262269

263-
fn do_schedule(&mut self, timeout_time: u64) -> std::io::Result<u64> {
270+
fn do_schedule(
271+
&mut self,
272+
timeout_time: u64,
273+
) -> std::io::Result<(u64, HashMap<&str, Result<Option<usize>, &str>>)> {
274+
let mut results = HashMap::new();
264275
loop {
265276
let left_time = timeout_time.saturating_sub(now());
266277
if 0 == left_time {
267-
return Ok(0);
278+
return Ok((0, results));
268279
}
269280
self.check_ready()?;
270281
// schedule coroutines
@@ -295,14 +306,14 @@ impl<'s> Scheduler<'s> {
295306
CoroutineState::Complete(result) => {
296307
let co_name = Box::leak(Box::from(coroutine.name()));
297308
assert!(
298-
self.results.insert(co_name, Ok(result)).is_none(),
309+
results.insert(co_name, Ok(result)).is_none(),
299310
"not consume result"
300311
);
301312
}
302313
CoroutineState::Error(message) => {
303314
let co_name = Box::leak(Box::from(coroutine.name()));
304315
assert!(
305-
self.results.insert(co_name, Err(message)).is_none(),
316+
results.insert(co_name, Err(message)).is_none(),
306317
"not consume result"
307318
);
308319
}
@@ -315,7 +326,7 @@ impl<'s> Scheduler<'s> {
315326
}
316327
continue;
317328
}
318-
return Ok(left_time);
329+
return Ok((left_time, results));
319330
}
320331
}
321332

core/tests/scheduler.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ fn scheduler_basic() -> std::io::Result<()> {
2020
None,
2121
None,
2222
)?;
23-
scheduler.try_schedule()
23+
scheduler.try_schedule()?;
24+
Ok(())
2425
}
2526

2627
#[cfg(not(all(unix, feature = "preemptive")))]
@@ -36,7 +37,8 @@ fn scheduler_backtrace() -> std::io::Result<()> {
3637
None,
3738
None,
3839
)?;
39-
scheduler.try_schedule()
40+
scheduler.try_schedule()?;
41+
Ok(())
4042
}
4143

4244
#[test]
@@ -62,7 +64,8 @@ fn scheduler_suspend() -> std::io::Result<()> {
6264
None,
6365
None,
6466
)?;
65-
scheduler.try_schedule()
67+
scheduler.try_schedule()?;
68+
Ok(())
6669
}
6770

6871
#[test]
@@ -80,7 +83,8 @@ fn scheduler_delay() -> std::io::Result<()> {
8083
)?;
8184
scheduler.try_schedule()?;
8285
std::thread::sleep(Duration::from_millis(100));
83-
scheduler.try_schedule()
86+
scheduler.try_schedule()?;
87+
Ok(())
8488
}
8589

8690
#[cfg(not(all(unix, feature = "preemptive")))]
@@ -122,5 +126,6 @@ fn scheduler_listener() -> std::io::Result<()> {
122126
None,
123127
None,
124128
)?;
125-
scheduler.try_schedule()
129+
scheduler.try_schedule()?;
130+
Ok(())
126131
}

0 commit comments

Comments
 (0)