diff --git a/codex-rs/core/src/event_mapping.rs b/codex-rs/core/src/event_mapping.rs index c9edd5409e..f5f71ccb8a 100644 --- a/codex-rs/core/src/event_mapping.rs +++ b/codex-rs/core/src/event_mapping.rs @@ -15,6 +15,13 @@ use uuid::Uuid; use crate::user_instructions::UserInstructions; +fn is_user_shell_command_prefix(text: &str) -> bool { + let trimmed = text.trim_start(); + let lowered = trimmed.to_ascii_lowercase(); + lowered.starts_with("") + || lowered.starts_with("") +} + fn is_session_prefix(text: &str) -> bool { let trimmed = text.trim_start(); let lowered = trimmed.to_ascii_lowercase(); @@ -31,7 +38,7 @@ fn parse_user_message(message: &[ContentItem]) -> Option { for content_item in message.iter() { match content_item { ContentItem::InputText { text } => { - if is_session_prefix(text) { + if is_session_prefix(text) || is_user_shell_command_prefix(text) { return None; } content.push(UserInput::Text { text: text.clone() }); @@ -42,7 +49,7 @@ fn parse_user_message(message: &[ContentItem]) -> Option { }); } ContentItem::OutputText { text } => { - if is_session_prefix(text) { + if is_session_prefix(text) || is_user_shell_command_prefix(text) { return None; } warn!("Output text in user message: {}", text); diff --git a/codex-rs/core/src/exec.rs b/codex-rs/core/src/exec.rs index b4dacd9af6..e9ee9ea82a 100644 --- a/codex-rs/core/src/exec.rs +++ b/codex-rs/core/src/exec.rs @@ -6,6 +6,7 @@ use std::io; use std::path::Path; use std::path::PathBuf; use std::process::ExitStatus; +use std::sync::Arc; use std::time::Duration; use std::time::Instant; @@ -23,6 +24,7 @@ use crate::protocol::EventMsg; use crate::protocol::ExecCommandOutputDeltaEvent; use crate::protocol::ExecOutputStream; use crate::protocol::SandboxPolicy; +use crate::protocol::UserCommandOutputDeltaEvent; use crate::sandboxing::CommandSpec; use crate::sandboxing::ExecEnv; use crate::sandboxing::SandboxManager; @@ -84,6 +86,43 @@ pub struct StdoutStream { pub tx_event: Sender, } +type DeltaEventFn = dyn Fn(&str, ExecOutputStream, Vec) -> EventMsg + Send + Sync; + +#[derive(Clone)] +pub struct DeltaEventBuilder { + inner: Arc, +} + +impl DeltaEventBuilder { + pub fn exec_command() -> Self { + Self { + inner: Arc::new(|call_id, stream, chunk| { + EventMsg::ExecCommandOutputDelta(ExecCommandOutputDeltaEvent { + call_id: call_id.to_string(), + stream, + chunk, + }) + }), + } + } + + pub fn user_command() -> Self { + Self { + inner: Arc::new(|call_id, stream, chunk| { + EventMsg::UserCommandOutputDelta(UserCommandOutputDeltaEvent { + call_id: call_id.to_string(), + stream, + chunk, + }) + }), + } + } + + pub fn build(&self, call_id: &str, stream: ExecOutputStream, chunk: Vec) -> EventMsg { + (self.inner)(call_id, stream, chunk) + } +} + pub async fn process_exec_tool_call( params: ExecParams, sandbox_type: SandboxType, @@ -138,6 +177,7 @@ pub(crate) async fn execute_exec_env( env: ExecEnv, sandbox_policy: &SandboxPolicy, stdout_stream: Option, + delta_event_builder: Option, ) -> Result { let ExecEnv { command, @@ -161,7 +201,15 @@ pub(crate) async fn execute_exec_env( }; let start = Instant::now(); - let raw_output_result = exec(params, sandbox, sandbox_policy, stdout_stream).await; + let delta_event_builder = delta_event_builder.unwrap_or_else(DeltaEventBuilder::exec_command); + let raw_output_result = exec( + params, + sandbox, + sandbox_policy, + stdout_stream, + delta_event_builder.clone(), + ) + .await; let duration = start.elapsed(); finalize_exec_result(raw_output_result, sandbox, duration) } @@ -434,6 +482,7 @@ async fn exec( sandbox: SandboxType, sandbox_policy: &SandboxPolicy, stdout_stream: Option, + delta_event_builder: DeltaEventBuilder, ) -> Result { #[cfg(target_os = "windows")] if sandbox == SandboxType::WindowsRestrictedToken { @@ -465,7 +514,7 @@ async fn exec( env, ) .await?; - consume_truncated_output(child, timeout, stdout_stream).await + consume_truncated_output(child, timeout, stdout_stream, delta_event_builder).await } /// Consumes the output of a child process, truncating it so it is suitable for @@ -474,6 +523,7 @@ async fn consume_truncated_output( mut child: Child, timeout: Duration, stdout_stream: Option, + delta_event_builder: DeltaEventBuilder, ) -> Result { // Both stdout and stderr were configured with `Stdio::piped()` // above, therefore `take()` should normally return `Some`. If it doesn't @@ -497,12 +547,14 @@ async fn consume_truncated_output( stdout_stream.clone(), false, Some(agg_tx.clone()), + delta_event_builder.clone(), )); let stderr_handle = tokio::spawn(read_capped( BufReader::new(stderr_reader), stdout_stream.clone(), true, Some(agg_tx.clone()), + delta_event_builder.clone(), )); let (exit_status, timed_out) = tokio::select! { @@ -554,6 +606,7 @@ async fn read_capped( stream: Option, is_stderr: bool, aggregate_tx: Option>>, + delta_event_builder: DeltaEventBuilder, ) -> io::Result>> { let mut buf = Vec::with_capacity(AGGREGATE_BUFFER_INITIAL_CAPACITY); let mut tmp = [0u8; READ_CHUNK_SIZE]; @@ -571,15 +624,15 @@ async fn read_capped( && emitted_deltas < MAX_EXEC_OUTPUT_DELTAS_PER_CALL { let chunk = tmp[..n].to_vec(); - let msg = EventMsg::ExecCommandOutputDelta(ExecCommandOutputDeltaEvent { - call_id: stream.call_id.clone(), - stream: if is_stderr { + let msg = delta_event_builder.build( + &stream.call_id, + if is_stderr { ExecOutputStream::Stderr } else { ExecOutputStream::Stdout }, chunk, - }); + ); let event = Event { id: stream.sub_id.clone(), msg, diff --git a/codex-rs/core/src/rollout/policy.rs b/codex-rs/core/src/rollout/policy.rs index e008832641..afe5c87b4b 100644 --- a/codex-rs/core/src/rollout/policy.rs +++ b/codex-rs/core/src/rollout/policy.rs @@ -41,6 +41,9 @@ pub(crate) fn should_persist_event_msg(ev: &EventMsg) -> bool { | EventMsg::AgentReasoning(_) | EventMsg::AgentReasoningRawContent(_) | EventMsg::TokenCount(_) + | EventMsg::UserCommandBegin(_) + | EventMsg::UserCommandOutputDelta(_) + | EventMsg::UserCommandEnd(_) | EventMsg::EnteredReviewMode(_) | EventMsg::ExitedReviewMode(_) | EventMsg::UndoCompleted(_) diff --git a/codex-rs/core/src/sandboxing/mod.rs b/codex-rs/core/src/sandboxing/mod.rs index 608b39ceef..a51a0728dc 100644 --- a/codex-rs/core/src/sandboxing/mod.rs +++ b/codex-rs/core/src/sandboxing/mod.rs @@ -165,5 +165,5 @@ pub async fn execute_env( policy: &SandboxPolicy, stdout_stream: Option, ) -> crate::error::Result { - execute_exec_env(env.clone(), policy, stdout_stream).await + execute_exec_env(env.clone(), policy, stdout_stream, None).await } diff --git a/codex-rs/core/src/tasks/user_shell.rs b/codex-rs/core/src/tasks/user_shell.rs index 0e57e1b728..5c7935667a 100644 --- a/codex-rs/core/src/tasks/user_shell.rs +++ b/codex-rs/core/src/tasks/user_shell.rs @@ -1,28 +1,36 @@ use std::sync::Arc; +use std::time::Duration; use async_trait::async_trait; -use codex_protocol::models::ShellToolCallParams; +use codex_protocol::models::ContentItem; +use codex_protocol::models::ResponseItem; use codex_protocol::user_input::UserInput; -use tokio::sync::Mutex; use tokio_util::sync::CancellationToken; use tracing::error; use uuid::Uuid; use crate::codex::TurnContext; +use crate::exec::DeltaEventBuilder; +use crate::exec::ExecToolCallOutput; +use crate::exec::SandboxType; +use crate::exec::StdoutStream; +use crate::exec::StreamOutput; +use crate::exec::execute_exec_env; +use crate::exec_env::create_env; +use crate::parse_command::parse_command; use crate::protocol::EventMsg; +use crate::protocol::SandboxPolicy; use crate::protocol::TaskStartedEvent; +use crate::protocol::UserCommandBeginEvent; +use crate::protocol::UserCommandEndEvent; +use crate::sandboxing::ExecEnv; use crate::state::TaskKind; -use crate::tools::context::ToolPayload; -use crate::tools::parallel::ToolCallRuntime; -use crate::tools::router::ToolCall; -use crate::tools::router::ToolRouter; -use crate::turn_diff_tracker::TurnDiffTracker; +use crate::tools::format_exec_output_for_model; +use crate::tools::format_exec_output_str; use super::SessionTask; use super::SessionTaskContext; -const USER_SHELL_TOOL_NAME: &str = "local_shell"; - #[derive(Clone)] pub(crate) struct UserShellCommandTask { command: String, @@ -78,34 +86,150 @@ impl SessionTask for UserShellCommandTask { } }; - let params = ShellToolCallParams { + fn build_user_message(text: String) -> ResponseItem { + ResponseItem::Message { + id: None, + role: "user".to_string(), + content: vec![ContentItem::InputText { text }], + } + } + + let call_id = Uuid::new_v4().to_string(); + let raw_command = self.command.clone(); + let command_text = format!("\n{raw_command}\n"); + let command_items = [build_user_message(command_text)]; + session + .record_conversation_items(turn_context.as_ref(), &command_items) + .await; + + let parsed_cmd = parse_command(&shell_invocation); + session + .send_event( + turn_context.as_ref(), + EventMsg::UserCommandBegin(UserCommandBeginEvent { + call_id: call_id.clone(), + command: shell_invocation.clone(), + cwd: turn_context.cwd.clone(), + parsed_cmd, + }), + ) + .await; + + let exec_env = ExecEnv { command: shell_invocation, - workdir: None, + cwd: turn_context.cwd.clone(), + env: create_env(&turn_context.shell_environment_policy), timeout_ms: None, + sandbox: SandboxType::None, with_escalated_permissions: None, justification: None, + arg0: None, }; - let tool_call = ToolCall { - tool_name: USER_SHELL_TOOL_NAME.to_string(), - call_id: Uuid::new_v4().to_string(), - payload: ToolPayload::LocalShell { params }, - }; + let stdout_stream = Some(StdoutStream { + sub_id: turn_context.sub_id.clone(), + call_id: call_id.clone(), + tx_event: session.get_tx_event(), + }); - let router = Arc::new(ToolRouter::from_config(&turn_context.tools_config, None)); - let tracker = Arc::new(Mutex::new(TurnDiffTracker::new())); - let runtime = ToolCallRuntime::new( - Arc::clone(&router), - Arc::clone(&session), - Arc::clone(&turn_context), - Arc::clone(&tracker), + let sandbox_policy = SandboxPolicy::DangerFullAccess; + let exec_future = execute_exec_env( + exec_env, + &sandbox_policy, + stdout_stream, + Some(DeltaEventBuilder::user_command()), ); + tokio::pin!(exec_future); + + let exec_result = tokio::select! { + res = &mut exec_future => Some(res), + _ = cancellation_token.cancelled() => None, + }; - if let Err(err) = runtime - .handle_tool_call(tool_call, cancellation_token) - .await - { - error!("user shell command failed: {err:?}"); + match exec_result { + None => { + let aborted_message = "command aborted by user".to_string(); + let aborted_text = format!( + "\n{aborted_message}\n" + ); + let output_items = [build_user_message(aborted_text)]; + session + .record_conversation_items(turn_context.as_ref(), &output_items) + .await; + session + .send_event( + turn_context.as_ref(), + EventMsg::UserCommandEnd(UserCommandEndEvent { + call_id, + stdout: String::new(), + stderr: aborted_message.clone(), + aggregated_output: aborted_message.clone(), + exit_code: -1, + duration: Duration::ZERO, + formatted_output: aborted_message, + }), + ) + .await; + } + Some(Ok(output)) => { + session + .send_event( + turn_context.as_ref(), + EventMsg::UserCommandEnd(UserCommandEndEvent { + call_id: call_id.clone(), + stdout: output.stdout.text.clone(), + stderr: output.stderr.text.clone(), + aggregated_output: output.aggregated_output.text.clone(), + exit_code: output.exit_code, + duration: output.duration, + formatted_output: format_exec_output_str(&output), + }), + ) + .await; + + let output_payload = format_exec_output_for_model(&output); + let output_text = format!( + "\n{output_payload}\n" + ); + let output_items = [build_user_message(output_text)]; + session + .record_conversation_items(turn_context.as_ref(), &output_items) + .await; + } + Some(Err(err)) => { + error!("user shell command failed: {err:?}"); + let message = format!("execution error: {err:?}"); + let exec_output = ExecToolCallOutput { + exit_code: -1, + stdout: StreamOutput::new(String::new()), + stderr: StreamOutput::new(message.clone()), + aggregated_output: StreamOutput::new(message.clone()), + duration: Duration::ZERO, + timed_out: false, + }; + session + .send_event( + turn_context.as_ref(), + EventMsg::UserCommandEnd(UserCommandEndEvent { + call_id, + stdout: exec_output.stdout.text.clone(), + stderr: exec_output.stderr.text.clone(), + aggregated_output: exec_output.aggregated_output.text.clone(), + exit_code: exec_output.exit_code, + duration: exec_output.duration, + formatted_output: format_exec_output_str(&exec_output), + }), + ) + .await; + let output_payload = format_exec_output_for_model(&exec_output); + let output_text = format!( + "\n{output_payload}\n" + ); + let output_items = [build_user_message(output_text)]; + session + .record_conversation_items(turn_context.as_ref(), &output_items) + .await; + } } None } diff --git a/codex-rs/core/tests/suite/user_shell_cmd.rs b/codex-rs/core/tests/suite/user_shell_cmd.rs index 0832d7e6b4..b8626485e5 100644 --- a/codex-rs/core/tests/suite/user_shell_cmd.rs +++ b/codex-rs/core/tests/suite/user_shell_cmd.rs @@ -1,9 +1,9 @@ use codex_core::ConversationManager; use codex_core::NewConversation; use codex_core::protocol::EventMsg; -use codex_core::protocol::ExecCommandEndEvent; use codex_core::protocol::Op; use codex_core::protocol::TurnAbortReason; +use codex_core::protocol::UserCommandEndEvent; use core_test_support::load_default_config_for_test; use core_test_support::wait_for_event; use std::path::PathBuf; @@ -63,8 +63,8 @@ async fn user_shell_cmd_ls_and_cat_in_temp_dir() { .submit(Op::RunUserShellCommand { command: list_cmd }) .await .unwrap(); - let msg = wait_for_event(&codex, |ev| matches!(ev, EventMsg::ExecCommandEnd(_))).await; - let EventMsg::ExecCommandEnd(ExecCommandEndEvent { + let msg = wait_for_event(&codex, |ev| matches!(ev, EventMsg::UserCommandEnd(_))).await; + let EventMsg::UserCommandEnd(UserCommandEndEvent { stdout, exit_code, .. }) = msg else { @@ -84,8 +84,8 @@ async fn user_shell_cmd_ls_and_cat_in_temp_dir() { .submit(Op::RunUserShellCommand { command: cat_cmd }) .await .unwrap(); - let msg = wait_for_event(&codex, |ev| matches!(ev, EventMsg::ExecCommandEnd(_))).await; - let EventMsg::ExecCommandEnd(ExecCommandEndEvent { + let msg = wait_for_event(&codex, |ev| matches!(ev, EventMsg::UserCommandEnd(_))).await; + let EventMsg::UserCommandEnd(UserCommandEndEvent { mut stdout, exit_code, .. @@ -128,7 +128,7 @@ async fn user_shell_cmd_can_be_interrupted() { .unwrap(); // Wait until it has started (ExecCommandBegin), then interrupt. - let _ = wait_for_event(&codex, |ev| matches!(ev, EventMsg::ExecCommandBegin(_))).await; + let _ = wait_for_event(&codex, |ev| matches!(ev, EventMsg::UserCommandBegin(_))).await; codex.submit(Op::Interrupt).await.unwrap(); // Expect a TurnAborted(Interrupted) notification. diff --git a/codex-rs/exec/src/event_processor_with_human_output.rs b/codex-rs/exec/src/event_processor_with_human_output.rs index 93e0e493bb..46ab5c3f14 100644 --- a/codex-rs/exec/src/event_processor_with_human_output.rs +++ b/codex-rs/exec/src/event_processor_with_human_output.rs @@ -522,6 +522,9 @@ impl EventProcessor for EventProcessorWithHumanOutput { | EventMsg::McpListToolsResponse(_) | EventMsg::ListCustomPromptsResponse(_) | EventMsg::RawResponseItem(_) + | EventMsg::UserCommandBegin(_) + | EventMsg::UserCommandOutputDelta(_) + | EventMsg::UserCommandEnd(_) | EventMsg::UserMessage(_) | EventMsg::EnteredReviewMode(_) | EventMsg::ExitedReviewMode(_) diff --git a/codex-rs/mcp-server/src/codex_tool_runner.rs b/codex-rs/mcp-server/src/codex_tool_runner.rs index 96e875153d..a1cd35dd5f 100644 --- a/codex-rs/mcp-server/src/codex_tool_runner.rs +++ b/codex-rs/mcp-server/src/codex_tool_runner.rs @@ -274,6 +274,9 @@ async fn run_codex_tool_session_inner( | EventMsg::ExecCommandBegin(_) | EventMsg::ExecCommandOutputDelta(_) | EventMsg::ExecCommandEnd(_) + | EventMsg::UserCommandBegin(_) + | EventMsg::UserCommandOutputDelta(_) + | EventMsg::UserCommandEnd(_) | EventMsg::BackgroundEvent(_) | EventMsg::StreamError(_) | EventMsg::PatchApplyBegin(_) diff --git a/codex-rs/protocol/src/protocol.rs b/codex-rs/protocol/src/protocol.rs index 2d0b0f013a..c811a17b13 100644 --- a/codex-rs/protocol/src/protocol.rs +++ b/codex-rs/protocol/src/protocol.rs @@ -493,6 +493,15 @@ pub enum EventMsg { ExecCommandEnd(ExecCommandEndEvent), + /// Notification that the user initiated a shell command. + UserCommandBegin(UserCommandBeginEvent), + + /// Incremental chunk of output from a running user command. + UserCommandOutputDelta(UserCommandOutputDeltaEvent), + + /// Completion notification for a user shell command. + UserCommandEnd(UserCommandEndEvent), + /// Notification that the agent attached a local image via the view_image tool. ViewImageToolCall(ViewImageToolCallEvent), @@ -1267,6 +1276,51 @@ pub struct ExecCommandOutputDeltaEvent { pub chunk: Vec, } +#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS)] +pub struct UserCommandBeginEvent { + /// Identifier so this can be paired with the UserCommandEnd event. + pub call_id: String, + /// The command to be executed. + pub command: Vec, + /// The command's working directory. + pub cwd: PathBuf, + pub parsed_cmd: Vec, +} + +#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS)] +pub struct UserCommandEndEvent { + /// Identifier for the UserCommandBegin that finished. + pub call_id: String, + /// Captured stdout. + pub stdout: String, + /// Captured stderr. + pub stderr: String, + /// Captured aggregated output. + #[serde(default)] + pub aggregated_output: String, + /// The command's exit code. + pub exit_code: i32, + /// The duration of the command execution. + #[ts(type = "string")] + pub duration: Duration, + /// Formatted output from the command, as seen by the model. + pub formatted_output: String, +} + +#[serde_as] +#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, JsonSchema, TS)] +pub struct UserCommandOutputDeltaEvent { + /// Identifier for the UserCommandBegin that produced this chunk. + pub call_id: String, + /// Which stream produced this chunk. + pub stream: ExecOutputStream, + /// Raw bytes from the stream (may not be valid UTF-8). + #[serde_as(as = "serde_with::base64::Base64")] + #[schemars(with = "String")] + #[ts(type = "string")] + pub chunk: Vec, +} + #[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS)] pub struct BackgroundEventEvent { pub message: String, diff --git a/codex-rs/tui/src/chatwidget.rs b/codex-rs/tui/src/chatwidget.rs index 5239c66e80..2eb3a3754e 100644 --- a/codex-rs/tui/src/chatwidget.rs +++ b/codex-rs/tui/src/chatwidget.rs @@ -40,6 +40,8 @@ use codex_core::protocol::TurnAbortReason; use codex_core::protocol::TurnDiffEvent; use codex_core::protocol::UndoCompletedEvent; use codex_core::protocol::UndoStartedEvent; +use codex_core::protocol::UserCommandBeginEvent; +use codex_core::protocol::UserCommandEndEvent; use codex_core::protocol::UserMessageEvent; use codex_core::protocol::ViewImageToolCallEvent; use codex_core::protocol::WarningEvent; @@ -627,6 +629,30 @@ impl ChatWidget { self.defer_or_handle(|q| q.push_exec_end(ev), |s| s.handle_exec_end_now(ev2)); } + fn on_user_command_begin(&mut self, ev: UserCommandBeginEvent) { + self.flush_answer_stream_with_separator(); + let ev2 = ev.clone(); + self.defer_or_handle( + |q| q.push_user_command_begin(ev), + |s| s.handle_user_command_begin_now(ev2), + ); + } + + fn on_user_command_output_delta( + &mut self, + _ev: codex_core::protocol::UserCommandOutputDeltaEvent, + ) { + // TODO: Handle streaming exec output if/when implemented + } + + fn on_user_command_end(&mut self, ev: UserCommandEndEvent) { + let ev2 = ev.clone(); + self.defer_or_handle( + |q| q.push_user_command_end(ev), + |s| s.handle_user_command_end_now(ev2), + ); + } + fn on_mcp_tool_call_begin(&mut self, ev: McpToolCallBeginEvent) { let ev2 = ev.clone(); self.defer_or_handle(|q| q.push_mcp_begin(ev), |s| s.handle_mcp_begin_now(ev2)); @@ -785,11 +811,23 @@ impl ChatWidget { self.request_redraw(); } - pub(crate) fn handle_exec_end_now(&mut self, ev: ExecCommandEndEvent) { - let running = self.running_commands.remove(&ev.call_id); + fn handle_command_end_internal( + &mut self, + call_id: String, + aggregated_output: String, + formatted_output: String, + exit_code: i32, + duration: std::time::Duration, + default_is_user_shell_command: bool, + ) { + let running = self.running_commands.remove(&call_id); let (command, parsed, is_user_shell_command) = match running { Some(rc) => (rc.command, rc.parsed_cmd, rc.is_user_shell_command), - None => (vec![ev.call_id.clone()], Vec::new(), false), + None => ( + vec![call_id.clone()], + Vec::new(), + default_is_user_shell_command, + ), }; let needs_new = self @@ -800,7 +838,7 @@ impl ChatWidget { if needs_new { self.flush_active_cell(); self.active_cell = Some(Box::new(new_active_exec_command( - ev.call_id.clone(), + call_id.clone(), command, parsed, is_user_shell_command, @@ -813,13 +851,13 @@ impl ChatWidget { .and_then(|c| c.as_any_mut().downcast_mut::()) { cell.complete_call( - &ev.call_id, + &call_id, CommandOutput { - exit_code: ev.exit_code, - formatted_output: ev.formatted_output.clone(), - aggregated_output: ev.aggregated_output.clone(), + exit_code, + formatted_output, + aggregated_output, }, - ev.duration, + duration, ); if cell.should_flush() { self.flush_active_cell(); @@ -827,6 +865,44 @@ impl ChatWidget { } } + pub(crate) fn handle_exec_end_now(&mut self, ev: ExecCommandEndEvent) { + let ExecCommandEndEvent { + call_id, + aggregated_output, + formatted_output, + exit_code, + duration, + .. + } = ev; + self.handle_command_end_internal( + call_id, + aggregated_output, + formatted_output, + exit_code, + duration, + false, + ); + } + + pub(crate) fn handle_user_command_end_now(&mut self, ev: UserCommandEndEvent) { + let UserCommandEndEvent { + call_id, + aggregated_output, + formatted_output, + exit_code, + duration, + .. + } = ev; + self.handle_command_end_internal( + call_id, + aggregated_output, + formatted_output, + exit_code, + duration, + true, + ); + } + pub(crate) fn handle_patch_apply_end_now( &mut self, event: codex_core::protocol::PatchApplyEndEvent, @@ -875,14 +951,19 @@ impl ChatWidget { }); } - pub(crate) fn handle_exec_begin_now(&mut self, ev: ExecCommandBeginEvent) { - // Ensure the status indicator is visible while the command runs. + fn handle_command_begin_internal( + &mut self, + call_id: String, + command: Vec, + parsed_cmd: Vec, + is_user_shell_command: bool, + ) { self.running_commands.insert( - ev.call_id.clone(), + call_id.clone(), RunningCommand { - command: ev.command.clone(), - parsed_cmd: ev.parsed_cmd.clone(), - is_user_shell_command: ev.is_user_shell_command, + command: command.clone(), + parsed_cmd: parsed_cmd.clone(), + is_user_shell_command, }, ); if let Some(cell) = self @@ -890,10 +971,10 @@ impl ChatWidget { .as_mut() .and_then(|c| c.as_any_mut().downcast_mut::()) && let Some(new_exec) = cell.with_added_call( - ev.call_id.clone(), - ev.command.clone(), - ev.parsed_cmd.clone(), - ev.is_user_shell_command, + call_id.clone(), + command.clone(), + parsed_cmd.clone(), + is_user_shell_command, ) { *cell = new_exec; @@ -901,16 +982,37 @@ impl ChatWidget { self.flush_active_cell(); self.active_cell = Some(Box::new(new_active_exec_command( - ev.call_id.clone(), - ev.command.clone(), - ev.parsed_cmd, - ev.is_user_shell_command, + call_id, + command, + parsed_cmd, + is_user_shell_command, ))); } self.request_redraw(); } + pub(crate) fn handle_exec_begin_now(&mut self, ev: ExecCommandBeginEvent) { + let ExecCommandBeginEvent { + call_id, + command, + parsed_cmd, + is_user_shell_command, + .. + } = ev; + self.handle_command_begin_internal(call_id, command, parsed_cmd, is_user_shell_command); + } + + pub(crate) fn handle_user_command_begin_now(&mut self, ev: UserCommandBeginEvent) { + let UserCommandBeginEvent { + call_id, + command, + parsed_cmd, + .. + } = ev; + self.handle_command_begin_internal(call_id, command, parsed_cmd, true); + } + pub(crate) fn handle_mcp_begin_now(&mut self, ev: McpToolCallBeginEvent) { self.flush_answer_stream_with_separator(); self.flush_active_cell(); @@ -1453,7 +1555,8 @@ impl ChatWidget { match msg { EventMsg::AgentMessageDelta(_) | EventMsg::AgentReasoningDelta(_) - | EventMsg::ExecCommandOutputDelta(_) => {} + | EventMsg::ExecCommandOutputDelta(_) + | EventMsg::UserCommandOutputDelta(_) => {} _ => { tracing::trace!("handle_codex_event: {:?}", msg); } @@ -1506,9 +1609,12 @@ impl ChatWidget { } EventMsg::ExecCommandBegin(ev) => self.on_exec_command_begin(ev), EventMsg::ExecCommandOutputDelta(delta) => self.on_exec_command_output_delta(delta), + EventMsg::UserCommandBegin(ev) => self.on_user_command_begin(ev), + EventMsg::UserCommandOutputDelta(delta) => self.on_user_command_output_delta(delta), EventMsg::PatchApplyBegin(ev) => self.on_patch_apply_begin(ev), EventMsg::PatchApplyEnd(ev) => self.on_patch_apply_end(ev), EventMsg::ExecCommandEnd(ev) => self.on_exec_command_end(ev), + EventMsg::UserCommandEnd(ev) => self.on_user_command_end(ev), EventMsg::ViewImageToolCall(ev) => self.on_view_image_tool_call(ev), EventMsg::McpToolCallBegin(ev) => self.on_mcp_tool_call_begin(ev), EventMsg::McpToolCallEnd(ev) => self.on_mcp_tool_call_end(ev), diff --git a/codex-rs/tui/src/chatwidget/interrupts.rs b/codex-rs/tui/src/chatwidget/interrupts.rs index 531de3e646..688a3bac67 100644 --- a/codex-rs/tui/src/chatwidget/interrupts.rs +++ b/codex-rs/tui/src/chatwidget/interrupts.rs @@ -7,6 +7,8 @@ use codex_core::protocol::ExecCommandEndEvent; use codex_core::protocol::McpToolCallBeginEvent; use codex_core::protocol::McpToolCallEndEvent; use codex_core::protocol::PatchApplyEndEvent; +use codex_core::protocol::UserCommandBeginEvent; +use codex_core::protocol::UserCommandEndEvent; use super::ChatWidget; @@ -16,6 +18,8 @@ pub(crate) enum QueuedInterrupt { ApplyPatchApproval(String, ApplyPatchApprovalRequestEvent), ExecBegin(ExecCommandBeginEvent), ExecEnd(ExecCommandEndEvent), + UserCommandBegin(UserCommandBeginEvent), + UserCommandEnd(UserCommandEndEvent), McpBegin(McpToolCallBeginEvent), McpEnd(McpToolCallEndEvent), PatchEnd(PatchApplyEndEvent), @@ -59,6 +63,14 @@ impl InterruptManager { self.queue.push_back(QueuedInterrupt::ExecEnd(ev)); } + pub(crate) fn push_user_command_begin(&mut self, ev: UserCommandBeginEvent) { + self.queue.push_back(QueuedInterrupt::UserCommandBegin(ev)); + } + + pub(crate) fn push_user_command_end(&mut self, ev: UserCommandEndEvent) { + self.queue.push_back(QueuedInterrupt::UserCommandEnd(ev)); + } + pub(crate) fn push_mcp_begin(&mut self, ev: McpToolCallBeginEvent) { self.queue.push_back(QueuedInterrupt::McpBegin(ev)); } @@ -80,6 +92,8 @@ impl InterruptManager { } QueuedInterrupt::ExecBegin(ev) => chat.handle_exec_begin_now(ev), QueuedInterrupt::ExecEnd(ev) => chat.handle_exec_end_now(ev), + QueuedInterrupt::UserCommandBegin(ev) => chat.handle_user_command_begin_now(ev), + QueuedInterrupt::UserCommandEnd(ev) => chat.handle_user_command_end_now(ev), QueuedInterrupt::McpBegin(ev) => chat.handle_mcp_begin_now(ev), QueuedInterrupt::McpEnd(ev) => chat.handle_mcp_end_now(ev), QueuedInterrupt::PatchEnd(ev) => chat.handle_patch_apply_end_now(ev), diff --git a/codex-rs/tui/src/exec_cell/render.rs b/codex-rs/tui/src/exec_cell/render.rs index 4f21f96e90..516e65c132 100644 --- a/codex-rs/tui/src/exec_cell/render.rs +++ b/codex-rs/tui/src/exec_cell/render.rs @@ -345,7 +345,13 @@ impl ExecCell { Some(false) => "•".red().bold(), None => spinner(call.start_time), }; - let title = if self.is_active() { "Running" } else { "Ran" }; + let title = if self.is_active() { + "Running" + } else if call.is_user_shell_command { + "You Ran:" + } else { + "Ran:" + }; let mut header_line = Line::from(vec![bullet.clone(), " ".into(), title.bold(), " ".into()]);