Skip to content
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 17 additions & 80 deletions codex-rs/core/src/codex.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::borrow::Cow;
use std::collections::HashMap;
use std::fmt::Debug;
use std::path::PathBuf;
Expand Down Expand Up @@ -829,7 +828,7 @@ impl Session {
history.record_items(std::iter::once(response_item));
}
RolloutItem::Compacted(compacted) => {
let snapshot = history.contents();
let snapshot = history.get_history();
let user_messages = collect_user_messages(&snapshot);
let rebuilt = build_compacted_history(
self.build_initial_context(turn_context),
Expand All @@ -841,7 +840,7 @@ impl Session {
_ => {}
}
}
history.contents()
history.get_history()
}

/// Append ResponseItems to the in-memory conversation history only.
Expand Down Expand Up @@ -891,7 +890,7 @@ impl Session {
}

pub(crate) async fn history_snapshot(&self) -> Vec<ResponseItem> {
let state = self.state.lock().await;
let mut state = self.state.lock().await;
state.history_snapshot()
}

Expand Down Expand Up @@ -988,16 +987,6 @@ impl Session {
self.send_event(turn_context, event).await;
}

/// Build the full turn input by concatenating the current conversation
/// history with additional items for this turn.
pub async fn turn_input_with_history(&self, extra: Vec<ResponseItem>) -> Vec<ResponseItem> {
let history = {
let state = self.state.lock().await;
state.history_snapshot()
};
[history, extra].concat()
}

/// Returns the input if there was no task running to inject into
pub async fn inject_input(&self, input: Vec<UserInput>) -> Result<(), Vec<UserInput>> {
let mut active = self.active_turn.lock().await;
Expand Down Expand Up @@ -1500,11 +1489,13 @@ pub(crate) async fn run_task(
// model sees a fresh conversation without the parent session's history.
// For normal turns, continue recording to the session history as before.
let is_review_mode = turn_context.is_review_mode;
let mut review_thread_history: Vec<ResponseItem> = Vec::new();

let mut review_thread_history: ConversationHistory = ConversationHistory::new();
if is_review_mode {
// Seed review threads with environment context so the model knows the working directory.
review_thread_history.extend(sess.build_initial_context(turn_context.as_ref()));
review_thread_history.push(initial_input_for_turn.into());
review_thread_history
.record_items(sess.build_initial_context(turn_context.as_ref()).iter());
review_thread_history.record_items(std::iter::once(&initial_input_for_turn.into()));
} else {
sess.record_input_and_rollout_usermsg(&initial_input_for_turn)
.await;
Expand Down Expand Up @@ -1539,12 +1530,12 @@ pub(crate) async fn run_task(
// represents an append-only log without duplicates.
let turn_input: Vec<ResponseItem> = if is_review_mode {
if !pending_input.is_empty() {
review_thread_history.extend(pending_input);
review_thread_history.record_items(&pending_input);
}
review_thread_history.clone()
review_thread_history.get_history()
} else {
sess.record_conversation_items(&pending_input).await;
sess.turn_input_with_history(pending_input).await
sess.history_snapshot().await
};

let turn_input_messages: Vec<String> = turn_input
Expand Down Expand Up @@ -1682,7 +1673,7 @@ pub(crate) async fn run_task(
if !items_to_record_in_conversation_history.is_empty() {
if is_review_mode {
review_thread_history
.extend(items_to_record_in_conversation_history.clone());
.record_items(items_to_record_in_conversation_history.iter());
} else {
sess.record_conversation_items(&items_to_record_in_conversation_history)
.await;
Expand Down Expand Up @@ -1901,61 +1892,6 @@ async fn try_run_turn(
task_kind: TaskKind,
cancellation_token: CancellationToken,
) -> CodexResult<TurnRunResult> {
// call_ids that are part of this response.
let completed_call_ids = prompt
.input
.iter()
.filter_map(|ri| match ri {
ResponseItem::FunctionCallOutput { call_id, .. } => Some(call_id),
ResponseItem::LocalShellCall {
call_id: Some(call_id),
..
} => Some(call_id),
ResponseItem::CustomToolCallOutput { call_id, .. } => Some(call_id),
_ => None,
})
.collect::<Vec<_>>();

// call_ids that were pending but are not part of this response.
// This usually happens because the user interrupted the model before we responded to one of its tool calls
// and then the user sent a follow-up message.
let missing_calls = {
prompt
.input
.iter()
.filter_map(|ri| match ri {
ResponseItem::FunctionCall { call_id, .. } => Some(call_id),
ResponseItem::LocalShellCall {
call_id: Some(call_id),
..
} => Some(call_id),
ResponseItem::CustomToolCall { call_id, .. } => Some(call_id),
_ => None,
})
.filter_map(|call_id| {
if completed_call_ids.contains(&call_id) {
None
} else {
Some(call_id.clone())
}
})
.map(|call_id| ResponseItem::CustomToolCallOutput {
call_id,
output: "aborted".to_string(),
})
.collect::<Vec<_>>()
};
let prompt: Cow<Prompt> = if missing_calls.is_empty() {
Cow::Borrowed(prompt)
} else {
// Add the synthetic aborted missing calls to the beginning of the input to ensure all call ids have responses.
let input = [missing_calls, prompt.input.clone()].concat();
Cow::Owned(Prompt {
input,
..prompt.clone()
})
};

let rollout_item = RolloutItem::TurnContext(TurnContextItem {
cwd: turn_context.cwd.clone(),
approval_policy: turn_context.approval_policy,
Expand All @@ -1964,11 +1900,12 @@ async fn try_run_turn(
effort: turn_context.client.get_reasoning_effort(),
summary: turn_context.client.get_reasoning_summary(),
});

sess.persist_rollout_items(&[rollout_item]).await;
let mut stream = turn_context
.client
.clone()
.stream_with_task_kind(prompt.as_ref(), task_kind)
.stream_with_task_kind(prompt, task_kind)
.or_cancel(&cancellation_token)
.await??;

Expand Down Expand Up @@ -2951,7 +2888,7 @@ mod tests {
rollout_items.push(RolloutItem::ResponseItem(assistant1.clone()));

let summary1 = "summary one";
let snapshot1 = live_history.contents();
let snapshot1 = live_history.get_history();
let user_messages1 = collect_user_messages(&snapshot1);
let rebuilt1 = build_compacted_history(
session.build_initial_context(turn_context),
Expand Down Expand Up @@ -2984,7 +2921,7 @@ mod tests {
rollout_items.push(RolloutItem::ResponseItem(assistant2.clone()));

let summary2 = "summary two";
let snapshot2 = live_history.contents();
let snapshot2 = live_history.get_history();
let user_messages2 = collect_user_messages(&snapshot2);
let rebuilt2 = build_compacted_history(
session.build_initial_context(turn_context),
Expand Down Expand Up @@ -3016,7 +2953,7 @@ mod tests {
live_history.record_items(std::iter::once(&assistant3));
rollout_items.push(RolloutItem::ResponseItem(assistant3.clone()));

(rollout_items, live_history.contents())
(rollout_items, live_history.get_history())
}

#[tokio::test]
Expand Down
18 changes: 14 additions & 4 deletions codex-rs/core/src/codex/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use super::TurnContext;
use super::get_last_assistant_message_from_turn;
use crate::Prompt;
use crate::client_common::ResponseEvent;
use crate::conversation_history::ConversationHistory;
use crate::error::CodexErr;
use crate::error::Result as CodexResult;
use crate::protocol::AgentMessageEvent;
Expand All @@ -24,6 +25,7 @@ use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::user_input::UserInput;
use futures::prelude::*;
use tracing::error;

pub const SUMMARIZATION_PROMPT: &str = include_str!("../../templates/compact/prompt.md");
const COMPACT_USER_MESSAGE_MAX_TOKENS: usize = 20_000;
Expand Down Expand Up @@ -64,9 +66,12 @@ async fn run_compact_task_inner(
input: Vec<UserInput>,
) {
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input);
let mut turn_input = sess
.turn_input_with_history(vec![initial_input_for_turn.clone().into()])
.await;

let items = sess.history_snapshot().await;

let mut history = ConversationHistory::create_with_items(items);
history.record_items(&[initial_input_for_turn.into()]);

let mut truncated_count = 0usize;

let max_retries = turn_context.client.get_provider().stream_max_retries();
Expand All @@ -83,6 +88,7 @@ async fn run_compact_task_inner(
sess.persist_rollout_items(&[rollout_item]).await;

loop {
let turn_input = history.get_history();
let prompt = Prompt {
input: turn_input.clone(),
..Default::default()
Expand All @@ -107,7 +113,11 @@ async fn run_compact_task_inner(
}
Err(e @ CodexErr::ContextWindowExceeded) => {
if turn_input.len() > 1 {
turn_input.remove(0);
// Trim from the beginning to preserve cache (prefix-based) and keep recent messages intact.
error!(
"Context window exceeded while compacting; removing oldest history item. Error: {e}"
);
history.remove_first_item();
truncated_count += 1;
retries = 0;
continue;
Expand Down
Loading
Loading