Skip to content
Closed

dump #6245

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions codex-rs/core/src/event_mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@ use uuid::Uuid;

use crate::user_instructions::UserInstructions;

fn is_user_shell_command_prefix(text: &str) -> bool {
let trimmed = text.trim_start();
let lowered = trimmed.to_ascii_lowercase();
lowered.starts_with("<user_shell_command>")
|| lowered.starts_with("<user_shell_command_output>")
}

fn is_session_prefix(text: &str) -> bool {
let trimmed = text.trim_start();
let lowered = trimmed.to_ascii_lowercase();
Expand All @@ -31,7 +38,7 @@ fn parse_user_message(message: &[ContentItem]) -> Option<UserMessageItem> {
for content_item in message.iter() {
match content_item {
ContentItem::InputText { text } => {
if is_session_prefix(text) {
if is_session_prefix(text) || is_user_shell_command_prefix(text) {
return None;
}
content.push(UserInput::Text { text: text.clone() });
Expand All @@ -42,7 +49,7 @@ fn parse_user_message(message: &[ContentItem]) -> Option<UserMessageItem> {
});
}
ContentItem::OutputText { text } => {
if is_session_prefix(text) {
if is_session_prefix(text) || is_user_shell_command_prefix(text) {
return None;
}
warn!("Output text in user message: {}", text);
Expand Down
65 changes: 59 additions & 6 deletions codex-rs/core/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::io;
use std::path::Path;
use std::path::PathBuf;
use std::process::ExitStatus;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;

Expand All @@ -23,6 +24,7 @@ use crate::protocol::EventMsg;
use crate::protocol::ExecCommandOutputDeltaEvent;
use crate::protocol::ExecOutputStream;
use crate::protocol::SandboxPolicy;
use crate::protocol::UserCommandOutputDeltaEvent;
use crate::sandboxing::CommandSpec;
use crate::sandboxing::ExecEnv;
use crate::sandboxing::SandboxManager;
Expand Down Expand Up @@ -84,6 +86,43 @@ pub struct StdoutStream {
pub tx_event: Sender<Event>,
}

type DeltaEventFn = dyn Fn(&str, ExecOutputStream, Vec<u8>) -> EventMsg + Send + Sync;

#[derive(Clone)]
pub struct DeltaEventBuilder {
inner: Arc<DeltaEventFn>,
}

impl DeltaEventBuilder {
pub fn exec_command() -> Self {
Self {
inner: Arc::new(|call_id, stream, chunk| {
EventMsg::ExecCommandOutputDelta(ExecCommandOutputDeltaEvent {
call_id: call_id.to_string(),
stream,
chunk,
})
}),
}
}

pub fn user_command() -> Self {
Self {
inner: Arc::new(|call_id, stream, chunk| {
EventMsg::UserCommandOutputDelta(UserCommandOutputDeltaEvent {
call_id: call_id.to_string(),
stream,
chunk,
})
}),
}
}

pub fn build(&self, call_id: &str, stream: ExecOutputStream, chunk: Vec<u8>) -> EventMsg {
(self.inner)(call_id, stream, chunk)
}
}

pub async fn process_exec_tool_call(
params: ExecParams,
sandbox_type: SandboxType,
Expand Down Expand Up @@ -138,6 +177,7 @@ pub(crate) async fn execute_exec_env(
env: ExecEnv,
sandbox_policy: &SandboxPolicy,
stdout_stream: Option<StdoutStream>,
delta_event_builder: Option<DeltaEventBuilder>,
) -> Result<ExecToolCallOutput> {
let ExecEnv {
command,
Expand All @@ -161,7 +201,15 @@ pub(crate) async fn execute_exec_env(
};

let start = Instant::now();
let raw_output_result = exec(params, sandbox, sandbox_policy, stdout_stream).await;
let delta_event_builder = delta_event_builder.unwrap_or_else(DeltaEventBuilder::exec_command);
let raw_output_result = exec(
params,
sandbox,
sandbox_policy,
stdout_stream,
delta_event_builder.clone(),
)
.await;
let duration = start.elapsed();
finalize_exec_result(raw_output_result, sandbox, duration)
}
Expand Down Expand Up @@ -434,6 +482,7 @@ async fn exec(
sandbox: SandboxType,
sandbox_policy: &SandboxPolicy,
stdout_stream: Option<StdoutStream>,
delta_event_builder: DeltaEventBuilder,
) -> Result<RawExecToolCallOutput> {
#[cfg(target_os = "windows")]
if sandbox == SandboxType::WindowsRestrictedToken {
Expand Down Expand Up @@ -465,7 +514,7 @@ async fn exec(
env,
)
.await?;
consume_truncated_output(child, timeout, stdout_stream).await
consume_truncated_output(child, timeout, stdout_stream, delta_event_builder).await
}

/// Consumes the output of a child process, truncating it so it is suitable for
Expand All @@ -474,6 +523,7 @@ async fn consume_truncated_output(
mut child: Child,
timeout: Duration,
stdout_stream: Option<StdoutStream>,
delta_event_builder: DeltaEventBuilder,
) -> Result<RawExecToolCallOutput> {
// Both stdout and stderr were configured with `Stdio::piped()`
// above, therefore `take()` should normally return `Some`. If it doesn't
Expand All @@ -497,12 +547,14 @@ async fn consume_truncated_output(
stdout_stream.clone(),
false,
Some(agg_tx.clone()),
delta_event_builder.clone(),
));
let stderr_handle = tokio::spawn(read_capped(
BufReader::new(stderr_reader),
stdout_stream.clone(),
true,
Some(agg_tx.clone()),
delta_event_builder.clone(),
));

let (exit_status, timed_out) = tokio::select! {
Expand Down Expand Up @@ -554,6 +606,7 @@ async fn read_capped<R: AsyncRead + Unpin + Send + 'static>(
stream: Option<StdoutStream>,
is_stderr: bool,
aggregate_tx: Option<Sender<Vec<u8>>>,
delta_event_builder: DeltaEventBuilder,
) -> io::Result<StreamOutput<Vec<u8>>> {
let mut buf = Vec::with_capacity(AGGREGATE_BUFFER_INITIAL_CAPACITY);
let mut tmp = [0u8; READ_CHUNK_SIZE];
Expand All @@ -571,15 +624,15 @@ async fn read_capped<R: AsyncRead + Unpin + Send + 'static>(
&& emitted_deltas < MAX_EXEC_OUTPUT_DELTAS_PER_CALL
{
let chunk = tmp[..n].to_vec();
let msg = EventMsg::ExecCommandOutputDelta(ExecCommandOutputDeltaEvent {
call_id: stream.call_id.clone(),
stream: if is_stderr {
let msg = delta_event_builder.build(
&stream.call_id,
if is_stderr {
ExecOutputStream::Stderr
} else {
ExecOutputStream::Stdout
},
chunk,
});
);
let event = Event {
id: stream.sub_id.clone(),
msg,
Expand Down
3 changes: 3 additions & 0 deletions codex-rs/core/src/rollout/policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ pub(crate) fn should_persist_event_msg(ev: &EventMsg) -> bool {
| EventMsg::AgentReasoning(_)
| EventMsg::AgentReasoningRawContent(_)
| EventMsg::TokenCount(_)
| EventMsg::UserCommandBegin(_)
| EventMsg::UserCommandOutputDelta(_)
| EventMsg::UserCommandEnd(_)
| EventMsg::EnteredReviewMode(_)
| EventMsg::ExitedReviewMode(_)
| EventMsg::UndoCompleted(_)
Expand Down
2 changes: 1 addition & 1 deletion codex-rs/core/src/sandboxing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,5 +165,5 @@ pub async fn execute_env(
policy: &SandboxPolicy,
stdout_stream: Option<StdoutStream>,
) -> crate::error::Result<ExecToolCallOutput> {
execute_exec_env(env.clone(), policy, stdout_stream).await
execute_exec_env(env.clone(), policy, stdout_stream, None).await
}
Loading
Loading