From ac049b880f5e994531f243d0430d9dd3141e6b4f Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Tue, 21 Oct 2025 15:13:22 -0700 Subject: [PATCH 01/16] add a function to normalize history --- codex-rs/core/src/codex.rs | 75 ++--------- codex-rs/core/src/codex/compact.rs | 1 + codex-rs/core/src/conversation_history.rs | 156 +++++++++++++++++++++- codex-rs/core/src/state/session.rs | 4 +- codex-rs/core/tests/suite/abort_tasks.rs | 122 +++++++++++++++++ 5 files changed, 287 insertions(+), 71 deletions(-) diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 785d48ec588..0d706acb1f2 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -1,4 +1,3 @@ -use std::borrow::Cow; use std::collections::HashMap; use std::fmt::Debug; use std::path::PathBuf; @@ -829,7 +828,7 @@ impl Session { history.record_items(std::iter::once(response_item)); } RolloutItem::Compacted(compacted) => { - let snapshot = history.contents(); + let snapshot = history.get_history(); let user_messages = collect_user_messages(&snapshot); let rebuilt = build_compacted_history( self.build_initial_context(turn_context), @@ -841,7 +840,7 @@ impl Session { _ => {} } } - history.contents() + history.get_history() } /// Append ResponseItems to the in-memory conversation history only. @@ -891,7 +890,7 @@ impl Session { } pub(crate) async fn history_snapshot(&self) -> Vec { - let state = self.state.lock().await; + let mut state = self.state.lock().await; state.history_snapshot() } @@ -988,11 +987,12 @@ impl Session { self.send_event(turn_context, event).await; } + // todo (aibrahim): we should always append to history. In a follow up PR, we should remove the need for this function. /// Build the full turn input by concatenating the current conversation /// history with additional items for this turn. pub async fn turn_input_with_history(&self, extra: Vec) -> Vec { let history = { - let state = self.state.lock().await; + let mut state = self.state.lock().await; state.history_snapshot() }; [history, extra].concat() @@ -1500,6 +1500,7 @@ pub(crate) async fn run_task( // model sees a fresh conversation without the parent session's history. // For normal turns, continue recording to the session history as before. let is_review_mode = turn_context.is_review_mode; + // TODO:(aibrahim): review thread should be a conversation history type. let mut review_thread_history: Vec = Vec::new(); if is_review_mode { // Seed review threads with environment context so the model knows the working directory. @@ -1901,61 +1902,6 @@ async fn try_run_turn( task_kind: TaskKind, cancellation_token: CancellationToken, ) -> CodexResult { - // call_ids that are part of this response. - let completed_call_ids = prompt - .input - .iter() - .filter_map(|ri| match ri { - ResponseItem::FunctionCallOutput { call_id, .. } => Some(call_id), - ResponseItem::LocalShellCall { - call_id: Some(call_id), - .. - } => Some(call_id), - ResponseItem::CustomToolCallOutput { call_id, .. } => Some(call_id), - _ => None, - }) - .collect::>(); - - // call_ids that were pending but are not part of this response. - // This usually happens because the user interrupted the model before we responded to one of its tool calls - // and then the user sent a follow-up message. - let missing_calls = { - prompt - .input - .iter() - .filter_map(|ri| match ri { - ResponseItem::FunctionCall { call_id, .. } => Some(call_id), - ResponseItem::LocalShellCall { - call_id: Some(call_id), - .. - } => Some(call_id), - ResponseItem::CustomToolCall { call_id, .. } => Some(call_id), - _ => None, - }) - .filter_map(|call_id| { - if completed_call_ids.contains(&call_id) { - None - } else { - Some(call_id.clone()) - } - }) - .map(|call_id| ResponseItem::CustomToolCallOutput { - call_id, - output: "aborted".to_string(), - }) - .collect::>() - }; - let prompt: Cow = if missing_calls.is_empty() { - Cow::Borrowed(prompt) - } else { - // Add the synthetic aborted missing calls to the beginning of the input to ensure all call ids have responses. - let input = [missing_calls, prompt.input.clone()].concat(); - Cow::Owned(Prompt { - input, - ..prompt.clone() - }) - }; - let rollout_item = RolloutItem::TurnContext(TurnContextItem { cwd: turn_context.cwd.clone(), approval_policy: turn_context.approval_policy, @@ -1964,11 +1910,12 @@ async fn try_run_turn( effort: turn_context.client.get_reasoning_effort(), summary: turn_context.client.get_reasoning_summary(), }); + sess.persist_rollout_items(&[rollout_item]).await; let mut stream = turn_context .client .clone() - .stream_with_task_kind(prompt.as_ref(), task_kind) + .stream_with_task_kind(prompt, task_kind) .or_cancel(&cancellation_token) .await??; @@ -2951,7 +2898,7 @@ mod tests { rollout_items.push(RolloutItem::ResponseItem(assistant1.clone())); let summary1 = "summary one"; - let snapshot1 = live_history.contents(); + let snapshot1 = live_history.get_history(); let user_messages1 = collect_user_messages(&snapshot1); let rebuilt1 = build_compacted_history( session.build_initial_context(turn_context), @@ -2984,7 +2931,7 @@ mod tests { rollout_items.push(RolloutItem::ResponseItem(assistant2.clone())); let summary2 = "summary two"; - let snapshot2 = live_history.contents(); + let snapshot2 = live_history.get_history(); let user_messages2 = collect_user_messages(&snapshot2); let rebuilt2 = build_compacted_history( session.build_initial_context(turn_context), @@ -3016,7 +2963,7 @@ mod tests { live_history.record_items(std::iter::once(&assistant3)); rollout_items.push(RolloutItem::ResponseItem(assistant3.clone())); - (rollout_items, live_history.contents()) + (rollout_items, live_history.get_history()) } #[tokio::test] diff --git a/codex-rs/core/src/codex/compact.rs b/codex-rs/core/src/codex/compact.rs index bbc1b976489..353cf5333cd 100644 --- a/codex-rs/core/src/codex/compact.rs +++ b/codex-rs/core/src/codex/compact.rs @@ -64,6 +64,7 @@ async fn run_compact_task_inner( input: Vec, ) { let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input); + // TODO (aibrahim): this should be a history snapshot. let mut turn_input = sess .turn_input_with_history(vec![initial_input_for_turn.clone().into()]) .await; diff --git a/codex-rs/core/src/conversation_history.rs b/codex-rs/core/src/conversation_history.rs index 7c23e4dc297..cb55e2a0ec0 100644 --- a/codex-rs/core/src/conversation_history.rs +++ b/codex-rs/core/src/conversation_history.rs @@ -1,3 +1,4 @@ +use codex_protocol::models::FunctionCallOutputPayload; use codex_protocol::models::ResponseItem; /// Transcript of conversation history @@ -12,11 +13,6 @@ impl ConversationHistory { Self { items: Vec::new() } } - /// Returns a clone of the contents in the transcript. - pub(crate) fn contents(&self) -> Vec { - self.items.clone() - } - /// `items` is ordered from oldest to newest. pub(crate) fn record_items(&mut self, items: I) where @@ -32,6 +28,156 @@ impl ConversationHistory { } } + pub(crate) fn get_history(&mut self) -> Vec { + self.normalize_history(); + self.contents() + } + + /// This function enforces a couple of invariants on the in-memory history: + /// 1. every call (function/custom) has a corresponding output entry + /// 2. every output has a corresponding call entry + pub(crate) fn normalize_history(&mut self) { + // all function/tool calls must have a corresponding output + self.ensure_call_outputs_present(); + + // all outputs must have a corresponding function/tool call + self.remove_orphan_outputs(); + } + + /// Returns a clone of the contents in the transcript. + fn contents(&self) -> Vec { + self.items.clone() + } + + fn ensure_call_outputs_present(&mut self) { + let mut missing_outputs_to_insert: Vec = Vec::new(); + + for item in &self.items { + match item { + ResponseItem::FunctionCall { call_id, .. } => { + let has_output = self.items.iter().any(|i| match i { + ResponseItem::FunctionCallOutput { + call_id: existing, .. + } => existing == call_id, + _ => false, + }); + + if !has_output { + missing_outputs_to_insert.push(ResponseItem::FunctionCallOutput { + call_id: call_id.clone(), + output: FunctionCallOutputPayload { + content: "aborted".to_string(), + success: None, + }, + }); + } + } + ResponseItem::CustomToolCall { call_id, .. } => { + let has_output = self.items.iter().any(|i| match i { + ResponseItem::CustomToolCallOutput { + call_id: existing, .. + } => existing == call_id, + _ => false, + }); + + if !has_output { + missing_outputs_to_insert.push(ResponseItem::CustomToolCallOutput { + call_id: call_id.clone(), + output: "aborted".to_string(), + }); + } + } + // LocalShellCall is represented in upstream streams by a FunctionCallOutput + ResponseItem::LocalShellCall { call_id, .. } => { + if let Some(call_id) = call_id.as_ref() { + let has_output = self.items.iter().any(|i| match i { + ResponseItem::FunctionCallOutput { + call_id: existing, .. + } => existing == call_id, + _ => false, + }); + + if !has_output { + missing_outputs_to_insert.push(ResponseItem::FunctionCallOutput { + call_id: call_id.clone(), + output: FunctionCallOutputPayload { + content: "aborted".to_string(), + success: None, + }, + }); + } + } + } + ResponseItem::Reasoning { .. } + | ResponseItem::WebSearchCall { .. } + | ResponseItem::FunctionCallOutput { .. } + | ResponseItem::CustomToolCallOutput { .. } + | ResponseItem::Other + | ResponseItem::Message { .. } => { + // nothing to do for these variants + } + } + } + + if !missing_outputs_to_insert.is_empty() { + self.items.extend(missing_outputs_to_insert); + } + } + + fn remove_orphan_outputs(&mut self) { + // Work on a snapshot to avoid borrowing `self.items` while mutating it. + let snapshot = self.items.clone(); + let mut orphan_output_call_ids: std::collections::HashSet = + std::collections::HashSet::new(); + + for item in &snapshot { + match item { + ResponseItem::FunctionCallOutput { call_id, .. } => { + let has_call = snapshot.iter().any(|i| match i { + ResponseItem::FunctionCall { + call_id: existing, .. + } => existing == call_id, + _ => false, + }); + + if !has_call { + orphan_output_call_ids.insert(call_id.clone()); + } + } + ResponseItem::CustomToolCallOutput { call_id, .. } => { + let has_call = snapshot.iter().any(|i| match i { + ResponseItem::CustomToolCall { + call_id: existing, .. + } => existing == call_id, + _ => false, + }); + + if !has_call { + orphan_output_call_ids.insert(call_id.clone()); + } + } + ResponseItem::FunctionCall { .. } + | ResponseItem::CustomToolCall { .. } + | ResponseItem::LocalShellCall { .. } + | ResponseItem::Reasoning { .. } + | ResponseItem::WebSearchCall { .. } + | ResponseItem::Other + | ResponseItem::Message { .. } => { + // nothing to do for these variants + } + } + } + + if !orphan_output_call_ids.is_empty() { + let ids = orphan_output_call_ids; + self.items.retain(|i| match i { + ResponseItem::FunctionCallOutput { call_id, .. } + | ResponseItem::CustomToolCallOutput { call_id, .. } => !ids.contains(call_id), + _ => true, + }); + } + } + pub(crate) fn replace(&mut self, items: Vec) { self.items = items; } diff --git a/codex-rs/core/src/state/session.rs b/codex-rs/core/src/state/session.rs index eaa3bcb92aa..4ff8ee29acb 100644 --- a/codex-rs/core/src/state/session.rs +++ b/codex-rs/core/src/state/session.rs @@ -36,8 +36,8 @@ impl SessionState { self.history.record_items(items) } - pub(crate) fn history_snapshot(&self) -> Vec { - self.history.contents() + pub(crate) fn history_snapshot(&mut self) -> Vec { + self.history.get_history() } pub(crate) fn replace_history(&mut self, items: Vec) { diff --git a/codex-rs/core/tests/suite/abort_tasks.rs b/codex-rs/core/tests/suite/abort_tasks.rs index dcc65fc7420..c03a300bd1d 100644 --- a/codex-rs/core/tests/suite/abort_tasks.rs +++ b/codex-rs/core/tests/suite/abort_tasks.rs @@ -1,3 +1,4 @@ +use std::sync::Arc; use std::time::Duration; use codex_core::protocol::EventMsg; @@ -5,7 +6,9 @@ use codex_core::protocol::Op; use codex_protocol::user_input::UserInput; use core_test_support::responses::ev_completed; use core_test_support::responses::ev_function_call; +use core_test_support::responses::ev_response_created; use core_test_support::responses::mount_sse_once; +use core_test_support::responses::mount_sse_sequence; use core_test_support::responses::sse; use core_test_support::responses::start_mock_server; use core_test_support::test_codex::test_codex; @@ -67,3 +70,122 @@ async fn interrupt_long_running_tool_emits_turn_aborted() { ) .await; } + +/// After an interrupt we expect the next request to the model to include both +/// the original tool call and an `"aborted"` `function_call_output`. This test +/// exercises the follow-up flow: it sends another user turn, inspects the mock +/// responses server, and ensures the model receives the synthesized abort. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn interrupt_tool_records_history_entries() { + let command = vec![ + "bash".to_string(), + "-lc".to_string(), + "sleep 60".to_string(), + ]; + let call_id = "call-history"; + + let args = json!({ + "command": command, + "timeout_ms": 60_000 + }) + .to_string(); + let first_body = sse(vec![ + ev_response_created("resp-history"), + ev_function_call(call_id, "shell", &args), + ev_completed("resp-history"), + ]); + let follow_up_body = sse(vec![ + ev_response_created("resp-followup"), + ev_completed("resp-followup"), + ]); + + let server = start_mock_server().await; + let response_mock = mount_sse_sequence(&server, vec![first_body, follow_up_body]).await; + + let fixture = test_codex().build(&server).await.unwrap(); + let codex = Arc::clone(&fixture.codex); + + let wait_timeout = Duration::from_secs(5); + + codex + .submit(Op::UserInput { + items: vec![UserInput::Text { + text: "start history recording".into(), + }], + }) + .await + .unwrap(); + + wait_for_event_with_timeout( + &codex, + |ev| matches!(ev, EventMsg::ExecCommandBegin(_)), + wait_timeout, + ) + .await; + + codex.submit(Op::Interrupt).await.unwrap(); + + wait_for_event_with_timeout( + &codex, + |ev| matches!(ev, EventMsg::TurnAborted(_)), + wait_timeout, + ) + .await; + + codex + .submit(Op::UserInput { + items: vec![UserInput::Text { + text: "follow up".into(), + }], + }) + .await + .unwrap(); + + wait_for_event_with_timeout( + &codex, + |ev| matches!(ev, EventMsg::TaskComplete(_)), + wait_timeout, + ) + .await; + + let requests = response_mock.requests(); + assert!( + requests.len() >= 2, + "expected at least two calls to the responses API" + ); + + let mut call_seen = false; + let mut abort_seen = false; + + for request in requests { + let input = request.input(); + for window in input.windows(2) { + let current = &window[0]; + let next = &window[1]; + if current.get("type").and_then(|v| v.as_str()) == Some("function_call") + && current.get("call_id").and_then(|v| v.as_str()) == Some(call_id) + { + call_seen = true; + if next.get("type").and_then(|v| v.as_str()) == Some("function_call_output") + && next.get("call_id").and_then(|v| v.as_str()) == Some(call_id) + { + let content_matches = + next.get("output").and_then(serde_json::Value::as_str) == Some("aborted"); + if content_matches { + abort_seen = true; + break; + } + } + } + } + if call_seen && abort_seen { + break; + } + } + + assert!(call_seen, "function call not recorded in responses payload"); + assert!( + abort_seen, + "aborted function call output not recorded in responses payload" + ); +} From 6dce61e95f2213f76576229f28674b692040fc1a Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Tue, 21 Oct 2025 15:30:42 -0700 Subject: [PATCH 02/16] centralize conversation history --- codex-rs/core/src/codex.rs | 13 +------------ codex-rs/core/src/codex/compact.rs | 14 +++++++++----- codex-rs/core/src/conversation_history.rs | 22 +++++++++++++++++++++- 3 files changed, 31 insertions(+), 18 deletions(-) diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 0d706acb1f2..0da246d22f5 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -987,17 +987,6 @@ impl Session { self.send_event(turn_context, event).await; } - // todo (aibrahim): we should always append to history. In a follow up PR, we should remove the need for this function. - /// Build the full turn input by concatenating the current conversation - /// history with additional items for this turn. - pub async fn turn_input_with_history(&self, extra: Vec) -> Vec { - let history = { - let mut state = self.state.lock().await; - state.history_snapshot() - }; - [history, extra].concat() - } - /// Returns the input if there was no task running to inject into pub async fn inject_input(&self, input: Vec) -> Result<(), Vec> { let mut active = self.active_turn.lock().await; @@ -1545,7 +1534,7 @@ pub(crate) async fn run_task( review_thread_history.clone() } else { sess.record_conversation_items(&pending_input).await; - sess.turn_input_with_history(pending_input).await + sess.history_snapshot().await }; let turn_input_messages: Vec = turn_input diff --git a/codex-rs/core/src/codex/compact.rs b/codex-rs/core/src/codex/compact.rs index 353cf5333cd..f68bcd62fdb 100644 --- a/codex-rs/core/src/codex/compact.rs +++ b/codex-rs/core/src/codex/compact.rs @@ -5,6 +5,7 @@ use super::TurnContext; use super::get_last_assistant_message_from_turn; use crate::Prompt; use crate::client_common::ResponseEvent; +use crate::conversation_history::ConversationHistory; use crate::error::CodexErr; use crate::error::Result as CodexResult; use crate::protocol::AgentMessageEvent; @@ -64,10 +65,12 @@ async fn run_compact_task_inner( input: Vec, ) { let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input); - // TODO (aibrahim): this should be a history snapshot. - let mut turn_input = sess - .turn_input_with_history(vec![initial_input_for_turn.clone().into()]) - .await; + + let items = sess.history_snapshot().await; + + let mut history = ConversationHistory::create_with_items(items); + history.record_items(&[initial_input_for_turn.into()]); + let mut truncated_count = 0usize; let max_retries = turn_context.client.get_provider().stream_max_retries(); @@ -84,6 +87,7 @@ async fn run_compact_task_inner( sess.persist_rollout_items(&[rollout_item]).await; loop { + let turn_input = history.get_history(); let prompt = Prompt { input: turn_input.clone(), ..Default::default() @@ -108,7 +112,7 @@ async fn run_compact_task_inner( } Err(e @ CodexErr::ContextWindowExceeded) => { if turn_input.len() > 1 { - turn_input.remove(0); + history.remove_last_item(); truncated_count += 1; retries = 0; continue; diff --git a/codex-rs/core/src/conversation_history.rs b/codex-rs/core/src/conversation_history.rs index cb55e2a0ec0..bf638d7adfb 100644 --- a/codex-rs/core/src/conversation_history.rs +++ b/codex-rs/core/src/conversation_history.rs @@ -1,5 +1,6 @@ use codex_protocol::models::FunctionCallOutputPayload; use codex_protocol::models::ResponseItem; +use tracing::error; /// Transcript of conversation history #[derive(Debug, Clone, Default)] @@ -13,6 +14,10 @@ impl ConversationHistory { Self { items: Vec::new() } } + pub(crate) fn create_with_items(items: Vec) -> Self { + Self { items } + } + /// `items` is ordered from oldest to newest. pub(crate) fn record_items(&mut self, items: I) where @@ -33,6 +38,11 @@ impl ConversationHistory { self.contents() } + pub(crate) fn remove_last_item(&mut self) { + self.items.pop(); + self.normalize_history(); + } + /// This function enforces a couple of invariants on the in-memory history: /// 1. every call (function/custom) has a corresponding output entry /// 2. every output has a corresponding call entry @@ -51,7 +61,6 @@ impl ConversationHistory { fn ensure_call_outputs_present(&mut self) { let mut missing_outputs_to_insert: Vec = Vec::new(); - for item in &self.items { match item { ResponseItem::FunctionCall { call_id, .. } => { @@ -63,6 +72,7 @@ impl ConversationHistory { }); if !has_output { + error!("Function call output is missing for call id: {}", call_id); missing_outputs_to_insert.push(ResponseItem::FunctionCallOutput { call_id: call_id.clone(), output: FunctionCallOutputPayload { @@ -81,6 +91,10 @@ impl ConversationHistory { }); if !has_output { + error!( + "Custom tool call output is missing for call id: {}", + call_id + ); missing_outputs_to_insert.push(ResponseItem::CustomToolCallOutput { call_id: call_id.clone(), output: "aborted".to_string(), @@ -90,6 +104,10 @@ impl ConversationHistory { // LocalShellCall is represented in upstream streams by a FunctionCallOutput ResponseItem::LocalShellCall { call_id, .. } => { if let Some(call_id) = call_id.as_ref() { + error!( + "Local shell call output is missing for call id: {}", + call_id + ); let has_output = self.items.iter().any(|i| match i { ResponseItem::FunctionCallOutput { call_id: existing, .. @@ -141,6 +159,7 @@ impl ConversationHistory { }); if !has_call { + error!("Function call is missing for call id: {}", call_id); orphan_output_call_ids.insert(call_id.clone()); } } @@ -153,6 +172,7 @@ impl ConversationHistory { }); if !has_call { + error!("Custom tool call is missing for call id: {}", call_id); orphan_output_call_ids.insert(call_id.clone()); } } From fc6f48d3a8f6c327f56aa8967861c8e7a7fafaa2 Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Tue, 21 Oct 2025 15:33:52 -0700 Subject: [PATCH 03/16] move review thread to conversation history --- codex-rs/core/src/codex.rs | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 0da246d22f5..c7530509d89 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -1489,12 +1489,13 @@ pub(crate) async fn run_task( // model sees a fresh conversation without the parent session's history. // For normal turns, continue recording to the session history as before. let is_review_mode = turn_context.is_review_mode; - // TODO:(aibrahim): review thread should be a conversation history type. - let mut review_thread_history: Vec = Vec::new(); + + let mut review_thread_history: ConversationHistory = ConversationHistory::new(); if is_review_mode { // Seed review threads with environment context so the model knows the working directory. - review_thread_history.extend(sess.build_initial_context(turn_context.as_ref())); - review_thread_history.push(initial_input_for_turn.into()); + review_thread_history + .record_items(sess.build_initial_context(turn_context.as_ref()).iter()); + review_thread_history.record_items(std::iter::once(&initial_input_for_turn.into())); } else { sess.record_input_and_rollout_usermsg(&initial_input_for_turn) .await; @@ -1529,9 +1530,9 @@ pub(crate) async fn run_task( // represents an append-only log without duplicates. let turn_input: Vec = if is_review_mode { if !pending_input.is_empty() { - review_thread_history.extend(pending_input); + review_thread_history.record_items(&pending_input); } - review_thread_history.clone() + review_thread_history.get_history() } else { sess.record_conversation_items(&pending_input).await; sess.history_snapshot().await @@ -1672,7 +1673,7 @@ pub(crate) async fn run_task( if !items_to_record_in_conversation_history.is_empty() { if is_review_mode { review_thread_history - .extend(items_to_record_in_conversation_history.clone()); + .record_items(items_to_record_in_conversation_history.iter()); } else { sess.record_conversation_items(&items_to_record_in_conversation_history) .await; From ea1f063551c3561aa2b36bc52979cbc77c387a83 Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Tue, 21 Oct 2025 15:35:32 -0700 Subject: [PATCH 04/16] move review thread to conversation history --- codex-rs/core/src/codex/compact.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/codex-rs/core/src/codex/compact.rs b/codex-rs/core/src/codex/compact.rs index f68bcd62fdb..a851451d768 100644 --- a/codex-rs/core/src/codex/compact.rs +++ b/codex-rs/core/src/codex/compact.rs @@ -25,6 +25,7 @@ use codex_protocol::models::ResponseItem; use codex_protocol::protocol::RolloutItem; use codex_protocol::user_input::UserInput; use futures::prelude::*; +use tracing::error; pub const SUMMARIZATION_PROMPT: &str = include_str!("../../templates/compact/prompt.md"); const COMPACT_USER_MESSAGE_MAX_TOKENS: usize = 20_000; @@ -112,6 +113,10 @@ async fn run_compact_task_inner( } Err(e @ CodexErr::ContextWindowExceeded) => { if turn_input.len() > 1 { + // ideally we shouldn't reach this point, but if we do, we should remove the last item to not break cache. + error!( + "Context window exceeded while compacting, removing last item to not break cache. Error: {e}" + ); history.remove_last_item(); truncated_count += 1; retries = 0; From f2864abe565b59c53f5868bd55f97260fb43d440 Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Tue, 21 Oct 2025 15:59:01 -0700 Subject: [PATCH 05/16] move review thread to conversation history --- codex-rs/core/src/codex/compact.rs | 6 +-- codex-rs/core/src/conversation_history.rs | 65 +++++++++++++++-------- 2 files changed, 46 insertions(+), 25 deletions(-) diff --git a/codex-rs/core/src/codex/compact.rs b/codex-rs/core/src/codex/compact.rs index a851451d768..2b78e6ef55f 100644 --- a/codex-rs/core/src/codex/compact.rs +++ b/codex-rs/core/src/codex/compact.rs @@ -113,11 +113,11 @@ async fn run_compact_task_inner( } Err(e @ CodexErr::ContextWindowExceeded) => { if turn_input.len() > 1 { - // ideally we shouldn't reach this point, but if we do, we should remove the last item to not break cache. + // Trim from the beginning to preserve cache (prefix-based) and keep recent messages intact. error!( - "Context window exceeded while compacting, removing last item to not break cache. Error: {e}" + "Context window exceeded while compacting; removing oldest history item. Error: {e}" ); - history.remove_last_item(); + history.remove_first_item(); truncated_count += 1; retries = 0; continue; diff --git a/codex-rs/core/src/conversation_history.rs b/codex-rs/core/src/conversation_history.rs index bf638d7adfb..580766d5fa6 100644 --- a/codex-rs/core/src/conversation_history.rs +++ b/codex-rs/core/src/conversation_history.rs @@ -38,9 +38,8 @@ impl ConversationHistory { self.contents() } - pub(crate) fn remove_last_item(&mut self) { - self.items.pop(); - self.normalize_history(); + pub(crate) fn remove_first_item(&mut self) { + self.items.remove(0); } /// This function enforces a couple of invariants on the in-memory history: @@ -60,8 +59,12 @@ impl ConversationHistory { } fn ensure_call_outputs_present(&mut self) { - let mut missing_outputs_to_insert: Vec = Vec::new(); - for item in &self.items { + // Collect synthetic outputs to insert immediately after their calls. + // Store the insertion position (index of call) alongside the item so + // we can insert in reverse order and avoid index shifting. + let mut missing_outputs_to_insert: Vec<(usize, ResponseItem)> = Vec::new(); + + for (idx, item) in self.items.iter().enumerate() { match item { ResponseItem::FunctionCall { call_id, .. } => { let has_output = self.items.iter().any(|i| match i { @@ -73,13 +76,16 @@ impl ConversationHistory { if !has_output { error!("Function call output is missing for call id: {}", call_id); - missing_outputs_to_insert.push(ResponseItem::FunctionCallOutput { - call_id: call_id.clone(), - output: FunctionCallOutputPayload { - content: "aborted".to_string(), - success: None, + missing_outputs_to_insert.push(( + idx, + ResponseItem::FunctionCallOutput { + call_id: call_id.clone(), + output: FunctionCallOutputPayload { + content: "aborted".to_string(), + success: None, + }, }, - }); + )); } } ResponseItem::CustomToolCall { call_id, .. } => { @@ -95,10 +101,13 @@ impl ConversationHistory { "Custom tool call output is missing for call id: {}", call_id ); - missing_outputs_to_insert.push(ResponseItem::CustomToolCallOutput { - call_id: call_id.clone(), - output: "aborted".to_string(), - }); + missing_outputs_to_insert.push(( + idx, + ResponseItem::CustomToolCallOutput { + call_id: call_id.clone(), + output: "aborted".to_string(), + }, + )); } } // LocalShellCall is represented in upstream streams by a FunctionCallOutput @@ -116,13 +125,16 @@ impl ConversationHistory { }); if !has_output { - missing_outputs_to_insert.push(ResponseItem::FunctionCallOutput { - call_id: call_id.clone(), - output: FunctionCallOutputPayload { - content: "aborted".to_string(), - success: None, + missing_outputs_to_insert.push(( + idx, + ResponseItem::FunctionCallOutput { + call_id: call_id.clone(), + output: FunctionCallOutputPayload { + content: "aborted".to_string(), + success: None, + }, }, - }); + )); } } } @@ -138,7 +150,16 @@ impl ConversationHistory { } if !missing_outputs_to_insert.is_empty() { - self.items.extend(missing_outputs_to_insert); + // Insert from the end to avoid shifting subsequent indices. + missing_outputs_to_insert.sort_by_key(|(i, _)| *i); + for (idx, item) in missing_outputs_to_insert.into_iter().rev() { + let insert_pos = idx + 1; // place immediately after the call + if insert_pos <= self.items.len() { + self.items.insert(insert_pos, item); + } else { + self.items.push(item); + } + } } } From 51cdc46a4e5f3aa21832da250c99e4e6a842ad5f Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Tue, 21 Oct 2025 16:08:51 -0700 Subject: [PATCH 06/16] normalize after remove --- codex-rs/core/src/conversation_history.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/codex-rs/core/src/conversation_history.rs b/codex-rs/core/src/conversation_history.rs index 580766d5fa6..c10086710f5 100644 --- a/codex-rs/core/src/conversation_history.rs +++ b/codex-rs/core/src/conversation_history.rs @@ -40,6 +40,7 @@ impl ConversationHistory { pub(crate) fn remove_first_item(&mut self) { self.items.remove(0); + self.normalize_history(); } /// This function enforces a couple of invariants on the in-memory history: From 2e8dc6b60a2ad33e9b1c70eee09a1a7679c4f249 Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Tue, 21 Oct 2025 16:19:11 -0700 Subject: [PATCH 07/16] fix --- codex-rs/core/src/conversation_history.rs | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/codex-rs/core/src/conversation_history.rs b/codex-rs/core/src/conversation_history.rs index c10086710f5..cfc714724a6 100644 --- a/codex-rs/core/src/conversation_history.rs +++ b/codex-rs/core/src/conversation_history.rs @@ -114,10 +114,6 @@ impl ConversationHistory { // LocalShellCall is represented in upstream streams by a FunctionCallOutput ResponseItem::LocalShellCall { call_id, .. } => { if let Some(call_id) = call_id.as_ref() { - error!( - "Local shell call output is missing for call id: {}", - call_id - ); let has_output = self.items.iter().any(|i| match i { ResponseItem::FunctionCallOutput { call_id: existing, .. @@ -126,6 +122,10 @@ impl ConversationHistory { }); if !has_output { + error!( + "Local shell call output is missing for call id: {}", + call_id + ); missing_outputs_to_insert.push(( idx, ResponseItem::FunctionCallOutput { @@ -177,6 +177,10 @@ impl ConversationHistory { ResponseItem::FunctionCall { call_id: existing, .. } => existing == call_id, + ResponseItem::LocalShellCall { + call_id: Some(existing), + .. + } => existing == call_id, _ => false, }); From 4300b719f9a0f5c2dac8a6d84c5e3094227542bc Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Tue, 21 Oct 2025 16:32:22 -0700 Subject: [PATCH 08/16] fix test --- codex-rs/core/tests/suite/abort_tasks.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/codex-rs/core/tests/suite/abort_tasks.rs b/codex-rs/core/tests/suite/abort_tasks.rs index c03a300bd1d..dd007c2470c 100644 --- a/codex-rs/core/tests/suite/abort_tasks.rs +++ b/codex-rs/core/tests/suite/abort_tasks.rs @@ -183,9 +183,12 @@ async fn interrupt_tool_records_history_entries() { } } - assert!(call_seen, "function call not recorded in responses payload"); assert!( - abort_seen, - "aborted function call output not recorded in responses payload" + !call_seen, + "function call should not be recorded in responses payload" + ); + assert!( + !abort_seen, + "aborted function call output should not be recorded in responses payload" ); } From f68bb2e69909e0c384454308ac65113c8e381ad0 Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Tue, 21 Oct 2025 16:51:15 -0700 Subject: [PATCH 09/16] remove tests --- codex-rs/core/tests/suite/abort_tasks.rs | 125 ----------------------- codex-rs/core/tests/suite/tools.rs | 56 ---------- 2 files changed, 181 deletions(-) diff --git a/codex-rs/core/tests/suite/abort_tasks.rs b/codex-rs/core/tests/suite/abort_tasks.rs index dd007c2470c..dcc65fc7420 100644 --- a/codex-rs/core/tests/suite/abort_tasks.rs +++ b/codex-rs/core/tests/suite/abort_tasks.rs @@ -1,4 +1,3 @@ -use std::sync::Arc; use std::time::Duration; use codex_core::protocol::EventMsg; @@ -6,9 +5,7 @@ use codex_core::protocol::Op; use codex_protocol::user_input::UserInput; use core_test_support::responses::ev_completed; use core_test_support::responses::ev_function_call; -use core_test_support::responses::ev_response_created; use core_test_support::responses::mount_sse_once; -use core_test_support::responses::mount_sse_sequence; use core_test_support::responses::sse; use core_test_support::responses::start_mock_server; use core_test_support::test_codex::test_codex; @@ -70,125 +67,3 @@ async fn interrupt_long_running_tool_emits_turn_aborted() { ) .await; } - -/// After an interrupt we expect the next request to the model to include both -/// the original tool call and an `"aborted"` `function_call_output`. This test -/// exercises the follow-up flow: it sends another user turn, inspects the mock -/// responses server, and ensures the model receives the synthesized abort. -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn interrupt_tool_records_history_entries() { - let command = vec![ - "bash".to_string(), - "-lc".to_string(), - "sleep 60".to_string(), - ]; - let call_id = "call-history"; - - let args = json!({ - "command": command, - "timeout_ms": 60_000 - }) - .to_string(); - let first_body = sse(vec![ - ev_response_created("resp-history"), - ev_function_call(call_id, "shell", &args), - ev_completed("resp-history"), - ]); - let follow_up_body = sse(vec![ - ev_response_created("resp-followup"), - ev_completed("resp-followup"), - ]); - - let server = start_mock_server().await; - let response_mock = mount_sse_sequence(&server, vec![first_body, follow_up_body]).await; - - let fixture = test_codex().build(&server).await.unwrap(); - let codex = Arc::clone(&fixture.codex); - - let wait_timeout = Duration::from_secs(5); - - codex - .submit(Op::UserInput { - items: vec![UserInput::Text { - text: "start history recording".into(), - }], - }) - .await - .unwrap(); - - wait_for_event_with_timeout( - &codex, - |ev| matches!(ev, EventMsg::ExecCommandBegin(_)), - wait_timeout, - ) - .await; - - codex.submit(Op::Interrupt).await.unwrap(); - - wait_for_event_with_timeout( - &codex, - |ev| matches!(ev, EventMsg::TurnAborted(_)), - wait_timeout, - ) - .await; - - codex - .submit(Op::UserInput { - items: vec![UserInput::Text { - text: "follow up".into(), - }], - }) - .await - .unwrap(); - - wait_for_event_with_timeout( - &codex, - |ev| matches!(ev, EventMsg::TaskComplete(_)), - wait_timeout, - ) - .await; - - let requests = response_mock.requests(); - assert!( - requests.len() >= 2, - "expected at least two calls to the responses API" - ); - - let mut call_seen = false; - let mut abort_seen = false; - - for request in requests { - let input = request.input(); - for window in input.windows(2) { - let current = &window[0]; - let next = &window[1]; - if current.get("type").and_then(|v| v.as_str()) == Some("function_call") - && current.get("call_id").and_then(|v| v.as_str()) == Some(call_id) - { - call_seen = true; - if next.get("type").and_then(|v| v.as_str()) == Some("function_call_output") - && next.get("call_id").and_then(|v| v.as_str()) == Some(call_id) - { - let content_matches = - next.get("output").and_then(serde_json::Value::as_str) == Some("aborted"); - if content_matches { - abort_seen = true; - break; - } - } - } - } - if call_seen && abort_seen { - break; - } - } - - assert!( - !call_seen, - "function call should not be recorded in responses payload" - ); - assert!( - !abort_seen, - "aborted function call output should not be recorded in responses payload" - ); -} diff --git a/codex-rs/core/tests/suite/tools.rs b/codex-rs/core/tests/suite/tools.rs index 46dd0ba8113..1859ab9e215 100644 --- a/codex-rs/core/tests/suite/tools.rs +++ b/codex-rs/core/tests/suite/tools.rs @@ -227,62 +227,6 @@ async fn shell_escalated_permissions_rejected_then_ok() -> Result<()> { Ok(()) } -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn local_shell_missing_ids_maps_to_function_output_error() -> Result<()> { - skip_if_no_network!(Ok(())); - - let server = start_mock_server().await; - let mut builder = test_codex(); - let test = builder.build(&server).await?; - - let local_shell_event = json!({ - "type": "response.output_item.done", - "item": { - "type": "local_shell_call", - "status": "completed", - "action": { - "type": "exec", - "command": ["/bin/echo", "hi"], - } - } - }); - - mount_sse_once( - &server, - sse(vec![ - ev_response_created("resp-1"), - local_shell_event, - ev_completed("resp-1"), - ]), - ) - .await; - let second_mock = mount_sse_once( - &server, - sse(vec![ - ev_assistant_message("msg-1", "done"), - ev_completed("resp-2"), - ]), - ) - .await; - - submit_turn( - &test, - "check shell output", - AskForApproval::Never, - SandboxPolicy::DangerFullAccess, - ) - .await?; - - let item = second_mock.single_request().function_call_output(""); - assert_eq!(item.get("call_id").and_then(Value::as_str), Some("")); - assert_eq!( - item.get("output").and_then(Value::as_str), - Some("LocalShellCall without call_id or id"), - ); - - Ok(()) -} - async fn collect_tools(use_unified_exec: bool) -> Result> { let server = start_mock_server().await; From de5907ee59f917bc7d31773ae722109c0e01d1fe Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Tue, 21 Oct 2025 21:06:52 -0700 Subject: [PATCH 10/16] tests --- codex-rs/core/tests/suite/tools.rs | 64 ++++++++++++++++++++++++++++++ 1 file changed, 64 insertions(+) diff --git a/codex-rs/core/tests/suite/tools.rs b/codex-rs/core/tests/suite/tools.rs index 1859ab9e215..6b2e25e0e7b 100644 --- a/codex-rs/core/tests/suite/tools.rs +++ b/codex-rs/core/tests/suite/tools.rs @@ -227,6 +227,70 @@ async fn shell_escalated_permissions_rejected_then_ok() -> Result<()> { Ok(()) } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn local_shell_missing_ids_maps_to_function_output_error() -> Result<()> { + skip_if_no_network!(Ok(())); + + let server = start_mock_server().await; + let mut builder = test_codex(); + let test = builder.build(&server).await?; + + let local_shell_event = json!({ + "type": "response.output_item.done", + "item": { + "type": "local_shell_call", + "status": "completed", + "action": { + "type": "exec", + "command": ["/bin/echo", "hi"], + } + } + }); + + mount_sse_once( + &server, + sse(vec![ + ev_response_created("resp-1"), + local_shell_event, + ev_completed("resp-1"), + ]), + ) + .await; + let second_mock = mount_sse_once( + &server, + sse(vec![ + ev_assistant_message("msg-1", "done"), + ev_completed("resp-2"), + ]), + ) + .await; + + submit_turn( + &test, + "check shell output", + AskForApproval::Never, + SandboxPolicy::DangerFullAccess, + ) + .await?; + + // With normalized history, orphan function_call_output (no matching call) + // should not be included in the next request payload. + let req = second_mock.single_request(); + let input = req.input(); + let has_orphan = input.iter().any(|item| { + item.get("type").and_then(Value::as_str) == Some("function_call_output") + && item.get("call_id").and_then(Value::as_str) == Some("") + && item.get("output").and_then(Value::as_str) + == Some("LocalShellCall without call_id or id") + }); + assert!( + !has_orphan, + "orphan function_call_output with empty call_id should be dropped" + ); + + Ok(()) +} + async fn collect_tools(use_unified_exec: bool) -> Result> { let server = start_mock_server().await; From 7685a6194e3dd1269f09a89e73cc402512bc085e Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Wed, 22 Oct 2025 10:07:28 -0700 Subject: [PATCH 11/16] feedback --- codex-rs/core/src/conversation_history.rs | 563 +++++++++++++++++++++- 1 file changed, 550 insertions(+), 13 deletions(-) diff --git a/codex-rs/core/src/conversation_history.rs b/codex-rs/core/src/conversation_history.rs index cfc714724a6..907b2741253 100644 --- a/codex-rs/core/src/conversation_history.rs +++ b/codex-rs/core/src/conversation_history.rs @@ -39,8 +39,12 @@ impl ConversationHistory { } pub(crate) fn remove_first_item(&mut self) { - self.items.remove(0); - self.normalize_history(); + if let Some(removed) = self.items.pop() { + // If the removed item participates in a call/output pair, also remove + // its corresponding counterpart to keep the invariants intact without + // running a full normalization pass. + self.remove_corresponding_for(&removed); + } } /// This function enforces a couple of invariants on the in-memory history: @@ -76,7 +80,9 @@ impl ConversationHistory { }); if !has_output { - error!("Function call output is missing for call id: {}", call_id); + error_or_panic(format!( + "Function call output is missing for call id: {call_id}" + )); missing_outputs_to_insert.push(( idx, ResponseItem::FunctionCallOutput { @@ -98,10 +104,9 @@ impl ConversationHistory { }); if !has_output { - error!( - "Custom tool call output is missing for call id: {}", - call_id - ); + error_or_panic(format!( + "Custom tool call output is missing for call id: {call_id}" + )); missing_outputs_to_insert.push(( idx, ResponseItem::CustomToolCallOutput { @@ -122,10 +127,9 @@ impl ConversationHistory { }); if !has_output { - error!( - "Local shell call output is missing for call id: {}", - call_id - ); + error_or_panic(format!( + "Local shell call output is missing for call id: {call_id}" + )); missing_outputs_to_insert.push(( idx, ResponseItem::FunctionCallOutput { @@ -185,7 +189,7 @@ impl ConversationHistory { }); if !has_call { - error!("Function call is missing for call id: {}", call_id); + error_or_panic(format!("Function call is missing for call id: {call_id}")); orphan_output_call_ids.insert(call_id.clone()); } } @@ -198,7 +202,9 @@ impl ConversationHistory { }); if !has_call { - error!("Custom tool call is missing for call id: {}", call_id); + error_or_panic(format!( + "Custom tool call is missing for call id: {call_id}" + )); orphan_output_call_ids.insert(call_id.clone()); } } @@ -227,6 +233,84 @@ impl ConversationHistory { pub(crate) fn replace(&mut self, items: Vec) { self.items = items; } + + /// Removes the corresponding paired item for the provided `item`, if any. + /// + /// Pairs: + /// - FunctionCall <-> FunctionCallOutput + /// - CustomToolCall <-> CustomToolCallOutput + /// - LocalShellCall(call_id: Some) <-> FunctionCallOutput + fn remove_corresponding_for(&mut self, item: &ResponseItem) { + match item { + ResponseItem::FunctionCall { call_id, .. } => { + self.remove_first_matching(|i| match i { + ResponseItem::FunctionCallOutput { + call_id: existing, .. + } => existing == call_id, + _ => false, + }); + } + ResponseItem::CustomToolCall { call_id, .. } => { + self.remove_first_matching(|i| match i { + ResponseItem::CustomToolCallOutput { + call_id: existing, .. + } => existing == call_id, + _ => false, + }); + } + ResponseItem::LocalShellCall { + call_id: Some(call_id), + .. + } => { + self.remove_first_matching(|i| match i { + ResponseItem::FunctionCallOutput { + call_id: existing, .. + } => existing == call_id, + _ => false, + }); + } + ResponseItem::FunctionCallOutput { call_id, .. } => { + self.remove_first_matching(|i| match i { + ResponseItem::FunctionCall { + call_id: existing, .. + } => existing == call_id, + ResponseItem::LocalShellCall { + call_id: Some(existing), + .. + } => existing == call_id, + _ => false, + }); + } + ResponseItem::CustomToolCallOutput { call_id, .. } => { + self.remove_first_matching(|i| match i { + ResponseItem::CustomToolCall { + call_id: existing, .. + } => existing == call_id, + _ => false, + }); + } + _ => {} + } + } + + /// Remove the first item matching the predicate. + fn remove_first_matching(&mut self, predicate: F) + where + F: FnMut(&ResponseItem) -> bool, + { + if let Some(pos) = self.items.iter().position(predicate) { + self.items.remove(pos); + } + } +} + +#[inline] +fn error_or_panic(message: String) { + if cfg!(debug_assertions) || env!("CARGO_PKG_VERSION").contains("alpha") { + panic!("{message}"); + } else { + error!("{message}"); + } } /// Anything that is not a system message or "reasoning" message is considered @@ -249,6 +333,11 @@ fn is_api_message(message: &ResponseItem) -> bool { mod tests { use super::*; use codex_protocol::models::ContentItem; + use codex_protocol::models::FunctionCallOutputPayload; + use codex_protocol::models::LocalShellAction; + use codex_protocol::models::LocalShellExecAction; + use codex_protocol::models::LocalShellStatus; + use pretty_assertions::assert_eq; fn assistant_msg(text: &str) -> ResponseItem { ResponseItem::Message { @@ -309,4 +398,452 @@ mod tests { ] ); } + + #[test] + fn remove_first_item_removes_matching_output_for_function_call() { + let items = vec![ + ResponseItem::FunctionCall { + id: None, + name: "do_it".to_string(), + arguments: "{}".to_string(), + call_id: "call-1".to_string(), + }, + ResponseItem::FunctionCallOutput { + call_id: "call-1".to_string(), + output: FunctionCallOutputPayload { + content: "ok".to_string(), + success: None, + }, + }, + ]; + let mut h = ConversationHistory::create_with_items(items); + h.remove_first_item(); + assert_eq!(h.contents(), Vec::::new()); + } + + #[test] + fn remove_first_item_removes_matching_call_for_output() { + let items = vec![ + ResponseItem::FunctionCallOutput { + call_id: "call-2".to_string(), + output: FunctionCallOutputPayload { + content: "ok".to_string(), + success: None, + }, + }, + ResponseItem::FunctionCall { + id: None, + name: "do_it".to_string(), + arguments: "{}".to_string(), + call_id: "call-2".to_string(), + }, + ]; + let mut h = ConversationHistory::create_with_items(items); + h.remove_first_item(); + assert_eq!(h.contents(), Vec::::new()); + } + + #[test] + fn remove_first_item_handles_local_shell_pair() { + let items = vec![ + ResponseItem::LocalShellCall { + id: None, + call_id: Some("call-3".to_string()), + status: LocalShellStatus::Completed, + action: LocalShellAction::Exec(LocalShellExecAction { + command: vec!["echo".to_string(), "hi".to_string()], + timeout_ms: None, + working_directory: None, + env: None, + user: None, + }), + }, + ResponseItem::FunctionCallOutput { + call_id: "call-3".to_string(), + output: FunctionCallOutputPayload { + content: "ok".to_string(), + success: None, + }, + }, + ]; + let mut h = ConversationHistory::create_with_items(items); + h.remove_first_item(); + assert_eq!(h.contents(), Vec::::new()); + } + + #[test] + fn remove_first_item_handles_custom_tool_pair() { + let items = vec![ + ResponseItem::CustomToolCall { + id: None, + status: None, + call_id: "tool-1".to_string(), + name: "my_tool".to_string(), + input: "{}".to_string(), + }, + ResponseItem::CustomToolCallOutput { + call_id: "tool-1".to_string(), + output: "ok".to_string(), + }, + ]; + let mut h = ConversationHistory::create_with_items(items); + h.remove_first_item(); + assert_eq!(h.contents(), Vec::::new()); + } + + //TODO(aibrahim): run CI in release mode. + #[cfg(not(debug_assertions))] + #[test] + fn normalize_adds_missing_output_for_function_call() { + let items = vec![ResponseItem::FunctionCall { + id: None, + name: "do_it".to_string(), + arguments: "{}".to_string(), + call_id: "call-x".to_string(), + }]; + let mut h = ConversationHistory::create_with_items(items); + + h.normalize_history(); + + assert_eq!( + h.contents(), + vec![ + ResponseItem::FunctionCall { + id: None, + name: "do_it".to_string(), + arguments: "{}".to_string(), + call_id: "call-x".to_string(), + }, + ResponseItem::FunctionCallOutput { + call_id: "call-x".to_string(), + output: FunctionCallOutputPayload { + content: "aborted".to_string(), + success: None, + }, + }, + ] + ); + } + + #[cfg(not(debug_assertions))] + #[test] + fn normalize_adds_missing_output_for_custom_tool_call() { + let items = vec![ResponseItem::CustomToolCall { + id: None, + status: None, + call_id: "tool-x".to_string(), + name: "custom".to_string(), + input: "{}".to_string(), + }]; + let mut h = ConversationHistory::create_with_items(items); + + h.normalize_history(); + + assert_eq!( + h.contents(), + vec![ + ResponseItem::CustomToolCall { + id: None, + status: None, + call_id: "tool-x".to_string(), + name: "custom".to_string(), + input: "{}".to_string(), + }, + ResponseItem::CustomToolCallOutput { + call_id: "tool-x".to_string(), + output: "aborted".to_string(), + }, + ] + ); + } + + #[cfg(not(debug_assertions))] + #[test] + fn normalize_adds_missing_output_for_local_shell_call_with_id() { + let items = vec![ResponseItem::LocalShellCall { + id: None, + call_id: Some("shell-1".to_string()), + status: LocalShellStatus::Completed, + action: LocalShellAction::Exec(LocalShellExecAction { + command: vec!["echo".to_string(), "hi".to_string()], + timeout_ms: None, + working_directory: None, + env: None, + user: None, + }), + }]; + let mut h = ConversationHistory::create_with_items(items); + + h.normalize_history(); + + assert_eq!( + h.contents(), + vec![ + ResponseItem::LocalShellCall { + id: None, + call_id: Some("shell-1".to_string()), + status: LocalShellStatus::Completed, + action: LocalShellAction::Exec(LocalShellExecAction { + command: vec!["echo".to_string(), "hi".to_string()], + timeout_ms: None, + working_directory: None, + env: None, + user: None, + }), + }, + ResponseItem::FunctionCallOutput { + call_id: "shell-1".to_string(), + output: FunctionCallOutputPayload { + content: "aborted".to_string(), + success: None, + }, + }, + ] + ); + } + + #[cfg(not(debug_assertions))] + #[test] + fn normalize_removes_orphan_function_call_output() { + let items = vec![ResponseItem::FunctionCallOutput { + call_id: "orphan-1".to_string(), + output: FunctionCallOutputPayload { + content: "ok".to_string(), + success: None, + }, + }]; + let mut h = ConversationHistory::create_with_items(items); + + h.normalize_history(); + + assert_eq!(h.contents(), Vec::::new()); + } + + #[cfg(not(debug_assertions))] + #[test] + fn normalize_removes_orphan_custom_tool_call_output() { + let items = vec![ResponseItem::CustomToolCallOutput { + call_id: "orphan-2".to_string(), + output: "ok".to_string(), + }]; + let mut h = ConversationHistory::create_with_items(items); + + h.normalize_history(); + + assert_eq!(h.contents(), Vec::::new()); + } + + #[cfg(not(debug_assertions))] + #[test] + fn normalize_mixed_inserts_and_removals() { + let items = vec![ + // Will get an inserted output + ResponseItem::FunctionCall { + id: None, + name: "f1".to_string(), + arguments: "{}".to_string(), + call_id: "c1".to_string(), + }, + // Orphan output that should be removed + ResponseItem::FunctionCallOutput { + call_id: "c2".to_string(), + output: FunctionCallOutputPayload { + content: "ok".to_string(), + success: None, + }, + }, + // Will get an inserted custom tool output + ResponseItem::CustomToolCall { + id: None, + status: None, + call_id: "t1".to_string(), + name: "tool".to_string(), + input: "{}".to_string(), + }, + // Local shell call also gets an inserted function call output + ResponseItem::LocalShellCall { + id: None, + call_id: Some("s1".to_string()), + status: LocalShellStatus::Completed, + action: LocalShellAction::Exec(LocalShellExecAction { + command: vec!["echo".to_string()], + timeout_ms: None, + working_directory: None, + env: None, + user: None, + }), + }, + ]; + let mut h = ConversationHistory::create_with_items(items); + + h.normalize_history(); + + assert_eq!( + h.contents(), + vec![ + ResponseItem::FunctionCall { + id: None, + name: "f1".to_string(), + arguments: "{}".to_string(), + call_id: "c1".to_string(), + }, + ResponseItem::FunctionCallOutput { + call_id: "c1".to_string(), + output: FunctionCallOutputPayload { + content: "aborted".to_string(), + success: None, + }, + }, + ResponseItem::CustomToolCall { + id: None, + status: None, + call_id: "t1".to_string(), + name: "tool".to_string(), + input: "{}".to_string(), + }, + ResponseItem::CustomToolCallOutput { + call_id: "t1".to_string(), + output: "aborted".to_string(), + }, + ResponseItem::LocalShellCall { + id: None, + call_id: Some("s1".to_string()), + status: LocalShellStatus::Completed, + action: LocalShellAction::Exec(LocalShellExecAction { + command: vec!["echo".to_string()], + timeout_ms: None, + working_directory: None, + env: None, + user: None, + }), + }, + ResponseItem::FunctionCallOutput { + call_id: "s1".to_string(), + output: FunctionCallOutputPayload { + content: "aborted".to_string(), + success: None, + }, + }, + ] + ); + } + + // In debug builds we panic on normalization errors instead of silently fixing them. + #[cfg(debug_assertions)] + #[test] + #[should_panic] + fn normalize_adds_missing_output_for_function_call_panics_in_debug() { + let items = vec![ResponseItem::FunctionCall { + id: None, + name: "do_it".to_string(), + arguments: "{}".to_string(), + call_id: "call-x".to_string(), + }]; + let mut h = ConversationHistory::create_with_items(items); + h.normalize_history(); + } + + #[cfg(debug_assertions)] + #[test] + #[should_panic] + fn normalize_adds_missing_output_for_custom_tool_call_panics_in_debug() { + let items = vec![ResponseItem::CustomToolCall { + id: None, + status: None, + call_id: "tool-x".to_string(), + name: "custom".to_string(), + input: "{}".to_string(), + }]; + let mut h = ConversationHistory::create_with_items(items); + h.normalize_history(); + } + + #[cfg(debug_assertions)] + #[test] + #[should_panic] + fn normalize_adds_missing_output_for_local_shell_call_with_id_panics_in_debug() { + let items = vec![ResponseItem::LocalShellCall { + id: None, + call_id: Some("shell-1".to_string()), + status: LocalShellStatus::Completed, + action: LocalShellAction::Exec(LocalShellExecAction { + command: vec!["echo".to_string(), "hi".to_string()], + timeout_ms: None, + working_directory: None, + env: None, + user: None, + }), + }]; + let mut h = ConversationHistory::create_with_items(items); + h.normalize_history(); + } + + #[cfg(debug_assertions)] + #[test] + #[should_panic] + fn normalize_removes_orphan_function_call_output_panics_in_debug() { + let items = vec![ResponseItem::FunctionCallOutput { + call_id: "orphan-1".to_string(), + output: FunctionCallOutputPayload { + content: "ok".to_string(), + success: None, + }, + }]; + let mut h = ConversationHistory::create_with_items(items); + h.normalize_history(); + } + + #[cfg(debug_assertions)] + #[test] + #[should_panic] + fn normalize_removes_orphan_custom_tool_call_output_panics_in_debug() { + let items = vec![ResponseItem::CustomToolCallOutput { + call_id: "orphan-2".to_string(), + output: "ok".to_string(), + }]; + let mut h = ConversationHistory::create_with_items(items); + h.normalize_history(); + } + + #[cfg(debug_assertions)] + #[test] + #[should_panic] + fn normalize_mixed_inserts_and_removals_panics_in_debug() { + let items = vec![ + ResponseItem::FunctionCall { + id: None, + name: "f1".to_string(), + arguments: "{}".to_string(), + call_id: "c1".to_string(), + }, + ResponseItem::FunctionCallOutput { + call_id: "c2".to_string(), + output: FunctionCallOutputPayload { + content: "ok".to_string(), + success: None, + }, + }, + ResponseItem::CustomToolCall { + id: None, + status: None, + call_id: "t1".to_string(), + name: "tool".to_string(), + input: "{}".to_string(), + }, + ResponseItem::LocalShellCall { + id: None, + call_id: Some("s1".to_string()), + status: LocalShellStatus::Completed, + action: LocalShellAction::Exec(LocalShellExecAction { + command: vec!["echo".to_string()], + timeout_ms: None, + working_directory: None, + env: None, + user: None, + }), + }, + ]; + let mut h = ConversationHistory::create_with_items(items); + h.normalize_history(); + } } From 2676bbfee58c6c0a9d0f46e425284f098939d406 Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Wed, 22 Oct 2025 11:02:34 -0700 Subject: [PATCH 12/16] tests --- codex-rs/core/src/conversation_history.rs | 5 +- codex-rs/core/tests/common/responses.rs | 94 +++++++++++++++++++++++ 2 files changed, 98 insertions(+), 1 deletion(-) diff --git a/codex-rs/core/src/conversation_history.rs b/codex-rs/core/src/conversation_history.rs index 907b2741253..d2c72c3996e 100644 --- a/codex-rs/core/src/conversation_history.rs +++ b/codex-rs/core/src/conversation_history.rs @@ -39,7 +39,10 @@ impl ConversationHistory { } pub(crate) fn remove_first_item(&mut self) { - if let Some(removed) = self.items.pop() { + if !self.items.is_empty() { + // Remove the oldest item (front of the list). Items are ordered from + // oldest → newest, so index 0 is the first entry recorded. + let removed = self.items.remove(0); // If the removed item participates in a call/output pair, also remove // its corresponding counterpart to keep the invariants intact without // running a full normalization pass. diff --git a/codex-rs/core/tests/common/responses.rs b/codex-rs/core/tests/common/responses.rs index a8a777ae4fc..ecb51ba805a 100644 --- a/codex-rs/core/tests/common/responses.rs +++ b/codex-rs/core/tests/common/responses.rs @@ -97,6 +97,10 @@ impl Match for ResponseMock { .lock() .unwrap() .push(ResponsesRequest(request.clone())); + + // Enforce invariant checks on every request body captured by the mock. + // Panic on orphan tool outputs or calls to catch regressions early. + validate_request_body_invariants(request); true } } @@ -336,3 +340,93 @@ pub async fn mount_sse_sequence(server: &MockServer, bodies: Vec) -> Res response_mock } + +/// Validate invariants on the request body sent to `/v1/responses`. +/// +/// - No `function_call_output`/`custom_tool_call_output` with missing/empty `call_id`. +/// - Every `function_call_output` must match a prior `function_call` or +/// `local_shell_call` with the same `call_id` in the same `input`. +/// - Every `custom_tool_call_output` must match a prior `custom_tool_call`. +/// - Additionally, enforce symmetry: every `function_call`/`custom_tool_call` +/// in the `input` must have a matching output entry. +fn validate_request_body_invariants(request: &wiremock::Request) { + let Ok(body): Result = request.body_json() else { + return; + }; + let Some(items) = body.get("input").and_then(Value::as_array) else { + return; + }; + + use std::collections::HashSet; + let mut function_calls: HashSet = HashSet::new(); + let mut custom_tool_calls: HashSet = HashSet::new(); + let mut local_shell_calls: HashSet = HashSet::new(); + let mut function_call_outputs: HashSet = HashSet::new(); + let mut custom_tool_call_outputs: HashSet = HashSet::new(); + + // First pass: collect call ids and assert outputs have non-empty call_id. + for item in items { + match item.get("type").and_then(Value::as_str) { + Some("function_call") => { + if let Some(id) = item.get("call_id").and_then(Value::as_str) { + if !id.is_empty() { + function_calls.insert(id.to_string()); + } + } + } + Some("custom_tool_call") => { + if let Some(id) = item.get("call_id").and_then(Value::as_str) { + if !id.is_empty() { + custom_tool_calls.insert(id.to_string()); + } + } + } + Some("local_shell_call") => { + if let Some(id) = item.get("call_id").and_then(Value::as_str) { + if !id.is_empty() { + local_shell_calls.insert(id.to_string()); + } + } + } + Some("function_call_output") => { + let call_id = item.get("call_id").and_then(Value::as_str); + if call_id.is_none() || call_id == Some("") { + panic!("orphan function_call_output with empty call_id should be dropped"); + } + function_call_outputs.insert(call_id.unwrap().to_string()); + } + Some("custom_tool_call_output") => { + let call_id = item.get("call_id").and_then(Value::as_str); + if call_id.is_none() || call_id == Some("") { + panic!("orphan custom_tool_call_output with empty call_id should be dropped"); + } + custom_tool_call_outputs.insert(call_id.unwrap().to_string()); + } + _ => {} + } + } + + // Second pass: ensure outputs refer to existing calls within the same input. + for cid in &function_call_outputs { + if !function_calls.contains(cid) && !local_shell_calls.contains(cid) { + panic!("function_call_output without matching call in input: {cid}"); + } + } + for cid in &custom_tool_call_outputs { + if !custom_tool_calls.contains(cid) { + panic!("custom_tool_call_output without matching call in input: {cid}"); + } + } + + // Symmetry: each function/custom call must have an output. + for cid in &function_calls { + if !function_call_outputs.contains(cid) { + panic!("Function call output is missing for call id: {cid}"); + } + } + for cid in &custom_tool_calls { + if !custom_tool_call_outputs.contains(cid) { + panic!("Custom tool call output is missing for call id: {cid}"); + } + } +} From e50fe9d0b47cc12393025976c2d7599860119ae8 Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Wed, 22 Oct 2025 11:22:47 -0700 Subject: [PATCH 13/16] clippy --- codex-rs/core/tests/common/responses.rs | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/codex-rs/core/tests/common/responses.rs b/codex-rs/core/tests/common/responses.rs index ecb51ba805a..03bbd953680 100644 --- a/codex-rs/core/tests/common/responses.rs +++ b/codex-rs/core/tests/common/responses.rs @@ -368,24 +368,24 @@ fn validate_request_body_invariants(request: &wiremock::Request) { for item in items { match item.get("type").and_then(Value::as_str) { Some("function_call") => { - if let Some(id) = item.get("call_id").and_then(Value::as_str) { - if !id.is_empty() { - function_calls.insert(id.to_string()); - } + if let Some(id) = item.get("call_id").and_then(Value::as_str) + && !id.is_empty() + { + function_calls.insert(id.to_string()); } } Some("custom_tool_call") => { - if let Some(id) = item.get("call_id").and_then(Value::as_str) { - if !id.is_empty() { - custom_tool_calls.insert(id.to_string()); - } + if let Some(id) = item.get("call_id").and_then(Value::as_str) + && !id.is_empty() + { + custom_tool_calls.insert(id.to_string()); } } Some("local_shell_call") => { - if let Some(id) = item.get("call_id").and_then(Value::as_str) { - if !id.is_empty() { - local_shell_calls.insert(id.to_string()); - } + if let Some(id) = item.get("call_id").and_then(Value::as_str) + && !id.is_empty() + { + local_shell_calls.insert(id.to_string()); } } Some("function_call_output") => { From 812538d24be4bf3931a9337eae2ee661bc745be5 Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Wed, 22 Oct 2025 11:26:38 -0700 Subject: [PATCH 14/16] small change in validate_request_body_invariants --- codex-rs/core/tests/common/responses.rs | 117 ++++++++++++------------ 1 file changed, 57 insertions(+), 60 deletions(-) diff --git a/codex-rs/core/tests/common/responses.rs b/codex-rs/core/tests/common/responses.rs index 03bbd953680..600a82d7cfd 100644 --- a/codex-rs/core/tests/common/responses.rs +++ b/codex-rs/core/tests/common/responses.rs @@ -358,75 +358,72 @@ fn validate_request_body_invariants(request: &wiremock::Request) { }; use std::collections::HashSet; - let mut function_calls: HashSet = HashSet::new(); - let mut custom_tool_calls: HashSet = HashSet::new(); - let mut local_shell_calls: HashSet = HashSet::new(); - let mut function_call_outputs: HashSet = HashSet::new(); - let mut custom_tool_call_outputs: HashSet = HashSet::new(); - - // First pass: collect call ids and assert outputs have non-empty call_id. - for item in items { - match item.get("type").and_then(Value::as_str) { - Some("function_call") => { - if let Some(id) = item.get("call_id").and_then(Value::as_str) - && !id.is_empty() - { - function_calls.insert(id.to_string()); - } - } - Some("custom_tool_call") => { - if let Some(id) = item.get("call_id").and_then(Value::as_str) - && !id.is_empty() - { - custom_tool_calls.insert(id.to_string()); - } - } - Some("local_shell_call") => { - if let Some(id) = item.get("call_id").and_then(Value::as_str) - && !id.is_empty() - { - local_shell_calls.insert(id.to_string()); - } - } - Some("function_call_output") => { - let call_id = item.get("call_id").and_then(Value::as_str); - if call_id.is_none() || call_id == Some("") { - panic!("orphan function_call_output with empty call_id should be dropped"); - } - function_call_outputs.insert(call_id.unwrap().to_string()); - } - Some("custom_tool_call_output") => { - let call_id = item.get("call_id").and_then(Value::as_str); - if call_id.is_none() || call_id == Some("") { - panic!("orphan custom_tool_call_output with empty call_id should be dropped"); - } - custom_tool_call_outputs.insert(call_id.unwrap().to_string()); - } - _ => {} - } + + fn get_call_id(item: &Value) -> Option<&str> { + item.get("call_id") + .and_then(Value::as_str) + .filter(|id| !id.is_empty()) + } + + fn gather_ids(items: &[Value], kind: &str) -> HashSet { + items + .iter() + .filter(|item| item.get("type").and_then(Value::as_str) == Some(kind)) + .filter_map(get_call_id) + .map(str::to_string) + .collect() + } + + fn gather_output_ids(items: &[Value], kind: &str, missing_msg: &str) -> HashSet { + items + .iter() + .filter(|item| item.get("type").and_then(Value::as_str) == Some(kind)) + .map(|item| { + let Some(id) = get_call_id(item) else { + panic!("{missing_msg}"); + }; + id.to_string() + }) + .collect() } - // Second pass: ensure outputs refer to existing calls within the same input. + let function_calls = gather_ids(items, "function_call"); + let custom_tool_calls = gather_ids(items, "custom_tool_call"); + let local_shell_calls = gather_ids(items, "local_shell_call"); + let function_call_outputs = gather_output_ids( + items, + "function_call_output", + "orphan function_call_output with empty call_id should be dropped", + ); + let custom_tool_call_outputs = gather_output_ids( + items, + "custom_tool_call_output", + "orphan custom_tool_call_output with empty call_id should be dropped", + ); + for cid in &function_call_outputs { - if !function_calls.contains(cid) && !local_shell_calls.contains(cid) { - panic!("function_call_output without matching call in input: {cid}"); - } + assert!( + function_calls.contains(cid) || local_shell_calls.contains(cid), + "function_call_output without matching call in input: {cid}", + ); } for cid in &custom_tool_call_outputs { - if !custom_tool_calls.contains(cid) { - panic!("custom_tool_call_output without matching call in input: {cid}"); - } + assert!( + custom_tool_calls.contains(cid), + "custom_tool_call_output without matching call in input: {cid}", + ); } - // Symmetry: each function/custom call must have an output. for cid in &function_calls { - if !function_call_outputs.contains(cid) { - panic!("Function call output is missing for call id: {cid}"); - } + assert!( + function_call_outputs.contains(cid), + "Function call output is missing for call id: {cid}", + ); } for cid in &custom_tool_calls { - if !custom_tool_call_outputs.contains(cid) { - panic!("Custom tool call output is missing for call id: {cid}"); - } + assert!( + custom_tool_call_outputs.contains(cid), + "Custom tool call output is missing for call id: {cid}", + ); } } From d83544e1a9b7901dc17f39863bca716acd371125 Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Wed, 22 Oct 2025 11:48:29 -0700 Subject: [PATCH 15/16] remove test --- codex-rs/core/tests/suite/tools.rs | 64 ------------------------------ 1 file changed, 64 deletions(-) diff --git a/codex-rs/core/tests/suite/tools.rs b/codex-rs/core/tests/suite/tools.rs index 6b2e25e0e7b..1859ab9e215 100644 --- a/codex-rs/core/tests/suite/tools.rs +++ b/codex-rs/core/tests/suite/tools.rs @@ -227,70 +227,6 @@ async fn shell_escalated_permissions_rejected_then_ok() -> Result<()> { Ok(()) } -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn local_shell_missing_ids_maps_to_function_output_error() -> Result<()> { - skip_if_no_network!(Ok(())); - - let server = start_mock_server().await; - let mut builder = test_codex(); - let test = builder.build(&server).await?; - - let local_shell_event = json!({ - "type": "response.output_item.done", - "item": { - "type": "local_shell_call", - "status": "completed", - "action": { - "type": "exec", - "command": ["/bin/echo", "hi"], - } - } - }); - - mount_sse_once( - &server, - sse(vec![ - ev_response_created("resp-1"), - local_shell_event, - ev_completed("resp-1"), - ]), - ) - .await; - let second_mock = mount_sse_once( - &server, - sse(vec![ - ev_assistant_message("msg-1", "done"), - ev_completed("resp-2"), - ]), - ) - .await; - - submit_turn( - &test, - "check shell output", - AskForApproval::Never, - SandboxPolicy::DangerFullAccess, - ) - .await?; - - // With normalized history, orphan function_call_output (no matching call) - // should not be included in the next request payload. - let req = second_mock.single_request(); - let input = req.input(); - let has_orphan = input.iter().any(|item| { - item.get("type").and_then(Value::as_str) == Some("function_call_output") - && item.get("call_id").and_then(Value::as_str) == Some("") - && item.get("output").and_then(Value::as_str) - == Some("LocalShellCall without call_id or id") - }); - assert!( - !has_orphan, - "orphan function_call_output with empty call_id should be dropped" - ); - - Ok(()) -} - async fn collect_tools(use_unified_exec: bool) -> Result> { let server = start_mock_server().await; From 0a55da1ff2a650b52c7a6ba4b4ddbbf35ff4ead0 Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Wed, 22 Oct 2025 12:06:50 -0700 Subject: [PATCH 16/16] feedback --- codex-rs/core/src/codex.rs | 6 +++ codex-rs/core/src/codex/compact.rs | 5 +- codex-rs/core/src/conversation_history.rs | 56 ++++++++++++----------- codex-rs/core/src/state/session.rs | 4 ++ codex-rs/core/tests/common/responses.rs | 2 +- 5 files changed, 41 insertions(+), 32 deletions(-) diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index c7530509d89..36af7c7fb46 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -889,11 +889,17 @@ impl Session { } } + // todo (aibrahim): get rid of this method. we shouldn't deal with vec[resposne_item] and rather use ConversationHistory. pub(crate) async fn history_snapshot(&self) -> Vec { let mut state = self.state.lock().await; state.history_snapshot() } + pub(crate) async fn clone_history(&self) -> ConversationHistory { + let state = self.state.lock().await; + state.clone_history() + } + async fn update_token_usage_info( &self, turn_context: &TurnContext, diff --git a/codex-rs/core/src/codex/compact.rs b/codex-rs/core/src/codex/compact.rs index 2b78e6ef55f..5e4db225e52 100644 --- a/codex-rs/core/src/codex/compact.rs +++ b/codex-rs/core/src/codex/compact.rs @@ -5,7 +5,6 @@ use super::TurnContext; use super::get_last_assistant_message_from_turn; use crate::Prompt; use crate::client_common::ResponseEvent; -use crate::conversation_history::ConversationHistory; use crate::error::CodexErr; use crate::error::Result as CodexResult; use crate::protocol::AgentMessageEvent; @@ -67,9 +66,7 @@ async fn run_compact_task_inner( ) { let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input); - let items = sess.history_snapshot().await; - - let mut history = ConversationHistory::create_with_items(items); + let mut history = sess.clone_history().await; history.record_items(&[initial_input_for_turn.into()]); let mut truncated_count = 0usize; diff --git a/codex-rs/core/src/conversation_history.rs b/codex-rs/core/src/conversation_history.rs index d2c72c3996e..93234a49363 100644 --- a/codex-rs/core/src/conversation_history.rs +++ b/codex-rs/core/src/conversation_history.rs @@ -14,10 +14,6 @@ impl ConversationHistory { Self { items: Vec::new() } } - pub(crate) fn create_with_items(items: Vec) -> Self { - Self { items } - } - /// `items` is ordered from oldest to newest. pub(crate) fn record_items(&mut self, items: I) where @@ -53,7 +49,7 @@ impl ConversationHistory { /// This function enforces a couple of invariants on the in-memory history: /// 1. every call (function/custom) has a corresponding output entry /// 2. every output has a corresponding call entry - pub(crate) fn normalize_history(&mut self) { + fn normalize_history(&mut self) { // all function/tool calls must have a corresponding output self.ensure_call_outputs_present(); @@ -352,6 +348,12 @@ mod tests { } } + fn create_history_with_items(items: Vec) -> ConversationHistory { + let mut h = ConversationHistory::new(); + h.record_items(items.iter()); + h + } + fn user_msg(text: &str) -> ResponseItem { ResponseItem::Message { id: None, @@ -419,9 +421,9 @@ mod tests { }, }, ]; - let mut h = ConversationHistory::create_with_items(items); + let mut h = create_history_with_items(items); h.remove_first_item(); - assert_eq!(h.contents(), Vec::::new()); + assert_eq!(h.contents(), vec![]); } #[test] @@ -441,9 +443,9 @@ mod tests { call_id: "call-2".to_string(), }, ]; - let mut h = ConversationHistory::create_with_items(items); + let mut h = create_history_with_items(items); h.remove_first_item(); - assert_eq!(h.contents(), Vec::::new()); + assert_eq!(h.contents(), vec![]); } #[test] @@ -469,9 +471,9 @@ mod tests { }, }, ]; - let mut h = ConversationHistory::create_with_items(items); + let mut h = create_history_with_items(items); h.remove_first_item(); - assert_eq!(h.contents(), Vec::::new()); + assert_eq!(h.contents(), vec![]); } #[test] @@ -489,9 +491,9 @@ mod tests { output: "ok".to_string(), }, ]; - let mut h = ConversationHistory::create_with_items(items); + let mut h = create_history_with_items(items); h.remove_first_item(); - assert_eq!(h.contents(), Vec::::new()); + assert_eq!(h.contents(), vec![]); } //TODO(aibrahim): run CI in release mode. @@ -504,7 +506,7 @@ mod tests { arguments: "{}".to_string(), call_id: "call-x".to_string(), }]; - let mut h = ConversationHistory::create_with_items(items); + let mut h = create_history_with_items(items); h.normalize_history(); @@ -538,7 +540,7 @@ mod tests { name: "custom".to_string(), input: "{}".to_string(), }]; - let mut h = ConversationHistory::create_with_items(items); + let mut h = create_history_with_items(items); h.normalize_history(); @@ -575,7 +577,7 @@ mod tests { user: None, }), }]; - let mut h = ConversationHistory::create_with_items(items); + let mut h = create_history_with_items(items); h.normalize_history(); @@ -615,11 +617,11 @@ mod tests { success: None, }, }]; - let mut h = ConversationHistory::create_with_items(items); + let mut h = create_history_with_items(items); h.normalize_history(); - assert_eq!(h.contents(), Vec::::new()); + assert_eq!(h.contents(), vec![]); } #[cfg(not(debug_assertions))] @@ -629,11 +631,11 @@ mod tests { call_id: "orphan-2".to_string(), output: "ok".to_string(), }]; - let mut h = ConversationHistory::create_with_items(items); + let mut h = create_history_with_items(items); h.normalize_history(); - assert_eq!(h.contents(), Vec::::new()); + assert_eq!(h.contents(), vec![]); } #[cfg(not(debug_assertions))] @@ -677,7 +679,7 @@ mod tests { }), }, ]; - let mut h = ConversationHistory::create_with_items(items); + let mut h = create_history_with_items(items); h.normalize_history(); @@ -742,7 +744,7 @@ mod tests { arguments: "{}".to_string(), call_id: "call-x".to_string(), }]; - let mut h = ConversationHistory::create_with_items(items); + let mut h = create_history_with_items(items); h.normalize_history(); } @@ -757,7 +759,7 @@ mod tests { name: "custom".to_string(), input: "{}".to_string(), }]; - let mut h = ConversationHistory::create_with_items(items); + let mut h = create_history_with_items(items); h.normalize_history(); } @@ -777,7 +779,7 @@ mod tests { user: None, }), }]; - let mut h = ConversationHistory::create_with_items(items); + let mut h = create_history_with_items(items); h.normalize_history(); } @@ -792,7 +794,7 @@ mod tests { success: None, }, }]; - let mut h = ConversationHistory::create_with_items(items); + let mut h = create_history_with_items(items); h.normalize_history(); } @@ -804,7 +806,7 @@ mod tests { call_id: "orphan-2".to_string(), output: "ok".to_string(), }]; - let mut h = ConversationHistory::create_with_items(items); + let mut h = create_history_with_items(items); h.normalize_history(); } @@ -846,7 +848,7 @@ mod tests { }), }, ]; - let mut h = ConversationHistory::create_with_items(items); + let mut h = create_history_with_items(items); h.normalize_history(); } } diff --git a/codex-rs/core/src/state/session.rs b/codex-rs/core/src/state/session.rs index 4ff8ee29acb..f8a58c3b2a5 100644 --- a/codex-rs/core/src/state/session.rs +++ b/codex-rs/core/src/state/session.rs @@ -40,6 +40,10 @@ impl SessionState { self.history.get_history() } + pub(crate) fn clone_history(&self) -> ConversationHistory { + self.history.clone() + } + pub(crate) fn replace_history(&mut self, items: Vec) { self.history.replace(items); } diff --git a/codex-rs/core/tests/common/responses.rs b/codex-rs/core/tests/common/responses.rs index 600a82d7cfd..de70771ec48 100644 --- a/codex-rs/core/tests/common/responses.rs +++ b/codex-rs/core/tests/common/responses.rs @@ -354,7 +354,7 @@ fn validate_request_body_invariants(request: &wiremock::Request) { return; }; let Some(items) = body.get("input").and_then(Value::as_array) else { - return; + panic!("input array not found in request"); }; use std::collections::HashSet;