Skip to content

Commit 226f326

Browse files
authored
code polish (#392)
2 parents 561b7e5 + 2e54c5d commit 226f326

File tree

2 files changed

+62
-31
lines changed

2 files changed

+62
-31
lines changed

core/src/co_pool/mod.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -197,11 +197,14 @@ impl<'p> CoroutinePool<'p> {
197197
assert_eq!(PoolState::Running, self.stopping()?);
198198
_ = self.try_timed_schedule_task(dur)?;
199199
assert_eq!(PoolState::Stopping, self.stopped()?);
200-
Ok(())
201200
}
202-
PoolState::Stopping => Err(Error::new(ErrorKind::Other, "should never happens")),
203-
PoolState::Stopped => Ok(()),
201+
PoolState::Stopping => {
202+
_ = self.try_timed_schedule_task(dur)?;
203+
assert_eq!(PoolState::Stopping, self.stopped()?);
204+
}
205+
PoolState::Stopped => {}
204206
}
207+
Ok(())
205208
}
206209

207210
/// Submit a new task to this pool.

core/src/net/event_loop.rs

Lines changed: 56 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,21 @@ pub(crate) struct EventLoop<'e> {
5858
phantom_data: PhantomData<&'e EventLoop<'e>>,
5959
}
6060

61+
impl Drop for EventLoop<'_> {
62+
fn drop(&mut self) {
63+
if std::thread::panicking() {
64+
return;
65+
}
66+
self.stop_sync(Duration::from_secs(30))
67+
.unwrap_or_else(|e| panic!("Failed to stop event-loop {} due to {e} !", self.name()));
68+
assert_eq!(
69+
PoolState::Stopped,
70+
self.state(),
71+
"The event-loop is not stopped !"
72+
);
73+
}
74+
}
75+
6176
impl<'e> Deref for EventLoop<'e> {
6277
type Target = CoroutinePool<'e>;
6378

@@ -399,49 +414,62 @@ impl<'e> EventLoop<'e> {
399414
match self.state() {
400415
PoolState::Running => {
401416
assert_eq!(PoolState::Running, self.stopping()?);
402-
let timeout_time = crate::common::get_timeout_time(wait_time);
403-
loop {
404-
let left_time = timeout_time.saturating_sub(crate::common::now());
405-
if 0 == left_time {
406-
return Err(Error::new(ErrorKind::TimedOut, "stop timeout !"));
407-
}
408-
self.wait_event(Some(Duration::from_nanos(left_time).min(SLICE)))?;
409-
if self.is_empty() && self.get_running_size() == 0 {
410-
assert_eq!(PoolState::Stopping, self.stopped()?);
411-
return Ok(());
412-
}
413-
}
417+
self.do_stop_sync(wait_time)
414418
}
415-
PoolState::Stopping => Err(Error::new(ErrorKind::Other, "should never happens")),
419+
PoolState::Stopping => self.do_stop_sync(wait_time),
416420
PoolState::Stopped => Ok(()),
417421
}
418422
}
419423

424+
fn do_stop_sync(&mut self, wait_time: Duration) -> std::io::Result<()> {
425+
let timeout_time = crate::common::get_timeout_time(wait_time);
426+
loop {
427+
let left_time = timeout_time.saturating_sub(crate::common::now());
428+
if 0 == left_time {
429+
return Err(Error::new(ErrorKind::TimedOut, "stop timeout !"));
430+
}
431+
self.wait_event(Some(Duration::from_nanos(left_time).min(SLICE)))?;
432+
if self.is_empty() && self.get_running_size() == 0 {
433+
assert_eq!(PoolState::Stopping, self.stopped()?);
434+
return Ok(());
435+
}
436+
}
437+
}
438+
420439
pub(super) fn stop(&self, wait_time: Duration) -> std::io::Result<()> {
421440
match self.state() {
422441
PoolState::Running => {
423442
if BeanFactory::remove_bean::<JoinHandle<()>>(&self.get_thread_name()).is_some() {
424443
assert_eq!(PoolState::Running, self.stopping()?);
425-
//开启了单独的线程
426-
let (lock, cvar) = &*self.stop;
427-
let result = cvar
428-
.wait_timeout_while(
429-
lock.lock().expect("lock failed"),
430-
wait_time,
431-
|&mut pending| pending,
432-
)
433-
.expect("lock failed");
434-
if result.1.timed_out() {
435-
return Err(Error::new(ErrorKind::TimedOut, "stop timeout !"));
436-
}
437-
assert_eq!(PoolState::Stopping, self.stopped()?);
444+
return self.do_stop(wait_time);
438445
}
439-
Ok(())
446+
Err(Error::new(
447+
ErrorKind::Unsupported,
448+
"use EventLoop::stop_sync instead !",
449+
))
440450
}
441-
PoolState::Stopping => Err(Error::new(ErrorKind::Other, "should never happens")),
451+
PoolState::Stopping => self.do_stop(wait_time),
442452
PoolState::Stopped => Ok(()),
443453
}
444454
}
455+
456+
fn do_stop(&self, wait_time: Duration) -> std::io::Result<()> {
457+
//开启了单独的线程
458+
let (lock, cvar) = &*self.stop;
459+
let result = cvar
460+
.wait_timeout_while(
461+
lock.lock().expect("lock failed"),
462+
wait_time,
463+
|&mut pending| pending,
464+
)
465+
.expect("lock failed");
466+
if result.1.timed_out() {
467+
return Err(Error::new(ErrorKind::TimedOut, "stop timeout !"));
468+
}
469+
assert_eq!(PoolState::Stopping, self.stopped()?);
470+
assert!(BeanFactory::remove_bean::<Self>(self.name()).is_some());
471+
Ok(())
472+
}
445473
}
446474

447475
impl_current_for!(EVENT_LOOP, EventLoop<'e>);

0 commit comments

Comments
 (0)