Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions devenv-tasks/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,19 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
run_mode: mode,
};

// Create shared signal handler
// Create a global signal handler
let signal_handler = SignalHandler::start();
let cancellation_token = signal_handler.cancellation_token();

let mut tasks_ui = TasksUi::builder(config, verbosity)
.with_cancellation_token(cancellation_token)
.with_cancellation_token(signal_handler.cancellation_token())
.build()
.await?;
let (status, _outputs) = tasks_ui.run().await?;

if signal_handler.last_signal().is_some() {
signal_handler.exit_process();
}

if status.failed + status.dependency_failed > 0 {
std::process::exit(1);
}
Expand Down
78 changes: 49 additions & 29 deletions devenv-tasks/src/signal_handler.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
use tokio::signal;
use std::sync::atomic::{AtomicI32, Ordering};
use std::sync::Arc;
use tokio::signal::unix::signal;
use tokio_util::sync::CancellationToken;
use tracing::debug;
use nix::libc;

/// A shared signal handler service that manages signal handling across the entire application.
/// This replaces per-task signal handlers with a single, efficient, centralized handler.
pub struct SignalHandler {
cancellation_token: CancellationToken,
last_signal: Arc<AtomicI32>,
_handle: tokio::task::JoinHandle<()>,
}

Expand All @@ -16,43 +20,39 @@ impl SignalHandler {
pub fn start() -> Self {
let cancellation_token = CancellationToken::new();
let token_clone = cancellation_token.clone();
let last_signal = Arc::new(AtomicI32::new(0));
let last_signal_clone = Arc::clone(&last_signal);

let mut sigint = signal(libc::SIGINT.into()).expect("Failed to install SIGINT handler");
let mut sigterm = signal(libc::SIGTERM.into()).expect("Failed to install SIGTERM handler");
let mut sighup = signal(libc::SIGHUP.into()).expect("Failed to install SIGHUP handler");

let handle = tokio::spawn(async move {
let ctrl_c = signal::ctrl_c();

#[cfg(unix)]
{
let mut sigterm = signal::unix::signal(signal::unix::SignalKind::terminate())
.expect("Failed to install SIGTERM handler");

tokio::select! {
_ = ctrl_c => {
debug!("Received SIGINT (Ctrl+C), triggering shutdown...");
eprintln!("Received SIGINT (Ctrl+C), shutting down gracefully...");
token_clone.cancel();
}
_ = sigterm.recv() => {
debug!("Received SIGTERM, triggering shutdown...");
eprintln!("Received SIGTERM, shutting down gracefully...");
token_clone.cancel();
}
tokio::select! {
_ = sigint.recv() => {
debug!("Received SIGINT (Ctrl+C), triggering shutdown...");
eprintln!("Received SIGINT (Ctrl+C), shutting down gracefully...");
last_signal_clone.store(libc::SIGINT, Ordering::Relaxed);
token_clone.cancel();
}
}

#[cfg(not(unix))]
{
tokio::select! {
_ = ctrl_c => {
debug!("Received SIGINT (Ctrl+C), triggering shutdown...");
eprintln!("Received SIGINT (Ctrl+C), shutting down gracefully...");
token_clone.cancel();
}
_ = sigterm.recv() => {
debug!("Received SIGTERM, triggering shutdown...");
eprintln!("Received SIGTERM, shutting down gracefully...");
last_signal_clone.store(libc::SIGTERM, Ordering::Relaxed);
token_clone.cancel();
}
_ = sighup.recv() => {
debug!("Received SIGHUP, triggering shutdown...");
eprintln!("Received SIGHUP, shutting down gracefully...");
last_signal_clone.store(libc::SIGHUP, Ordering::Relaxed);
token_clone.cancel();
}
}
});

Self {
cancellation_token,
last_signal,
_handle: handle,
}
}
Expand All @@ -67,6 +67,26 @@ impl SignalHandler {
pub fn is_cancelled(&self) -> bool {
self.cancellation_token.is_cancelled()
}

/// Get the last signal that was received, if any.
pub fn last_signal(&self) -> Option<i32> {
match self.last_signal.load(Ordering::Relaxed) {
0 => None,
i => Some(i)
}
}

/// Restore the default handler for the last received signal and re-raise the signal to terminate with the correct exit code.
pub fn exit_process(&self) -> ! {
let signal = self.last_signal().unwrap_or(libc::SIGTERM);
unsafe {
libc::signal(signal, libc::SIG_DFL);
libc::kill(libc::getpid(), signal);
}

// Unreachable: something went wrong
std::process::exit(1);
}
}

impl Drop for SignalHandler {
Expand Down
25 changes: 7 additions & 18 deletions devenv-tasks/src/task_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,7 @@ impl TaskState {

// Create a new process group for better signal handling
// This ensures that signals sent to the parent are propagated to all children
#[cfg(unix)]
{
command.process_group(0);
}
command.process_group(0);

// Set DEVENV_TASK_INPUTS
if let Some(inputs) = &self.task.inputs {
Expand Down Expand Up @@ -331,28 +328,20 @@ impl TaskState {
eprintln!("Task {} received shutdown signal, terminating child process", self.task.name);

// Kill the child process and its process group
#[cfg(unix)]
if let Some(pid) = child.id() {
use ::nix::sys::signal::{self, Signal};
use ::nix::unistd::Pid;

// Send SIGTERM to the process group first for graceful shutdown
let _ = signal::killpg(Pid::from_raw(pid as i32), Signal::SIGTERM);

// Wait a bit for graceful shutdown
tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
if child.try_wait().unwrap_or(None).is_none() {
// Force kill if still running
let _ = signal::killpg(Pid::from_raw(pid as i32), Signal::SIGKILL);
signal::killpg(Pid::from_raw(pid as i32), Signal::SIGTERM).ok();
tokio::select! {
_ = child.wait() => {}
_ = tokio::time::sleep(std::time::Duration::from_secs(5)) => {
child.kill().await.ok();
}
}
}

#[cfg(not(unix))]
{
// On non-Unix systems, try to kill the child process directly
let _ = child.kill().await;
}

return Ok(TaskCompleted::Cancelled(now.elapsed()));
}
result = stdout_reader.next_line(), if !stdout_closed => {
Expand Down
Loading