Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 6 additions & 26 deletions codex-rs/app-server/src/codex_message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,8 @@ use codex_login::ShutdownHandle;
use codex_login::run_login_server;
use codex_protocol::ConversationId;
use codex_protocol::config_types::ForcedLoginMethod;
use codex_protocol::models::ContentItem;
use codex_protocol::items::TurnItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::InputMessageKind;
use codex_protocol::protocol::RateLimitSnapshot;
use codex_protocol::protocol::USER_MESSAGE_BEGIN;
use codex_protocol::user_input::UserInput as CoreInputItem;
Expand Down Expand Up @@ -940,18 +939,9 @@ impl CodexMessageProcessor {
},
))
.await;
let initial_messages = session_configured.initial_messages.map(|msgs| {
msgs.into_iter()
.filter(|event| {
// Don't send non-plain user messages (like user instructions
// or environment context) back so they don't get rendered.
if let EventMsg::UserMessage(user_message) = event {
return matches!(user_message.kind, Some(InputMessageKind::Plain));
}
true
})
.collect()
});
let initial_messages = session_configured
.initial_messages
.map(|msgs| msgs.into_iter().collect());

// Reply with conversation id + model and initial messages (when present)
let response = codex_app_server_protocol::ResumeConversationResponse {
Expand Down Expand Up @@ -1596,18 +1586,8 @@ fn extract_conversation_summary(
let preview = head
.iter()
.filter_map(|value| serde_json::from_value::<ResponseItem>(value.clone()).ok())
.find_map(|item| match item {
ResponseItem::Message { content, .. } => {
content.into_iter().find_map(|content| match content {
ContentItem::InputText { text } => {
match InputMessageKind::from(("user", &text)) {
InputMessageKind::Plain => Some(text),
_ => None,
}
}
_ => None,
})
}
.find_map(|item| match codex_core::parse_turn_item(&item) {
Some(TurnItem::UserMessage(user)) => Some(user.message()),
_ => None,
})?;

Expand Down
38 changes: 0 additions & 38 deletions codex-rs/app-server/tests/suite/codex_message_processor_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use codex_protocol::config_types::SandboxMode;
use codex_protocol::parse_command::ParsedCommand;
use codex_protocol::protocol::Event;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::InputMessageKind;
use pretty_assertions::assert_eq;
use std::env;
use tempfile::TempDir;
Expand Down Expand Up @@ -528,43 +527,6 @@ async fn test_send_user_turn_updates_sandbox_and_cwd_between_turns() {
.expect("sendUserTurn 2 timeout")
.expect("sendUserTurn 2 resp");

let mut env_message: Option<String> = None;
let second_cwd_str = second_cwd.to_string_lossy().into_owned();
for _ in 0..10 {
let notification = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("codex/event/user_message"),
)
.await
.expect("user_message timeout")
.expect("user_message notification");
let params = notification
.params
.clone()
.expect("user_message should include params");
let event: Event = serde_json::from_value(params).expect("deserialize user_message event");
if let EventMsg::UserMessage(user) = event.msg
&& matches!(user.kind, Some(InputMessageKind::EnvironmentContext))
&& user.message.contains(&second_cwd_str)
{
env_message = Some(user.message);
break;
}
}
let env_message = env_message.expect("expected environment context update");
assert!(
env_message.contains("<sandbox_mode>danger-full-access</sandbox_mode>"),
"env context should reflect new sandbox mode: {env_message}"
);
assert!(
env_message.contains("<network_access>enabled</network_access>"),
"env context should enable network access for danger-full-access policy: {env_message}"
);
assert!(
env_message.contains(&second_cwd_str),
"env context should include updated cwd: {env_message}"
);

let exec_begin_notification = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("codex/event/exec_command_begin"),
Expand Down
8 changes: 4 additions & 4 deletions codex-rs/core/src/chat_completions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,10 @@ pub(crate) async fn stream_chat_completions(
} = item
{
let mut text = String::new();
for c in items {
match c {
ReasoningItemContent::ReasoningText { text: t }
| ReasoningItemContent::Text { text: t } => text.push_str(t),
for entry in items {
match entry {
ReasoningItemContent::ReasoningText { text: segment }
| ReasoningItemContent::Text { text: segment } => text.push_str(segment),
}
}
if text.trim().is_empty() {
Expand Down
128 changes: 81 additions & 47 deletions codex-rs/core/src/codex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,22 @@ use std::sync::atomic::AtomicU64;

use crate::AuthManager;
use crate::client_common::REVIEW_PROMPT;
use crate::event_mapping::map_response_item_to_event_messages;
use crate::function_tool::FunctionCallError;
use crate::mcp::auth::McpAuthStatusEntry;
use crate::parse_command::parse_command;
use crate::parse_turn_item;
use crate::review_format::format_review_findings_block;
use crate::state::ItemCollector;
use crate::terminal;
use crate::user_notification::UserNotifier;
use async_channel::Receiver;
use async_channel::Sender;
use codex_apply_patch::ApplyPatchAction;
use codex_protocol::ConversationId;
use codex_protocol::items::TurnItem;
use codex_protocol::items::UserMessageItem;
use codex_protocol::protocol::ConversationPathResponseEvent;
use codex_protocol::protocol::ExitedReviewModeEvent;
use codex_protocol::protocol::ItemCompletedEvent;
use codex_protocol::protocol::ItemStartedEvent;
use codex_protocol::protocol::ReviewRequest;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::SessionSource;
Expand Down Expand Up @@ -270,7 +270,6 @@ pub(crate) struct TurnContext {
pub(crate) is_review_mode: bool,
pub(crate) final_output_json_schema: Option<Value>,
pub(crate) codex_linux_sandbox_exe: Option<PathBuf>,
pub(crate) item_collector: ItemCollector,
}

impl TurnContext {
Expand Down Expand Up @@ -359,7 +358,6 @@ impl Session {
provider: ModelProviderInfo,
session_configuration: &SessionConfiguration,
conversation_id: ConversationId,
tx_event: Sender<Event>,
sub_id: String,
) -> TurnContext {
let config = session_configuration.original_config_do_not_use.clone();
Expand Down Expand Up @@ -394,8 +392,6 @@ impl Session {
features: &config.features,
});

let item_collector = ItemCollector::new(tx_event, conversation_id, sub_id.clone());

TurnContext {
sub_id,
client,
Expand All @@ -409,7 +405,6 @@ impl Session {
is_review_mode: false,
final_output_json_schema: None,
codex_linux_sandbox_exe: config.codex_linux_sandbox_exe.clone(),
item_collector,
}
}

Expand Down Expand Up @@ -665,7 +660,6 @@ impl Session {
session_configuration.provider.clone(),
&session_configuration,
self.conversation_id,
self.get_tx_event(),
sub_id,
);
if let Some(final_schema) = updates.final_output_json_schema {
Expand Down Expand Up @@ -710,6 +704,59 @@ impl Session {
}
}

async fn emit_turn_item_started(&self, turn_context: &TurnContext, item: &TurnItem) {
self.send_event(
turn_context,
EventMsg::ItemStarted(ItemStartedEvent {
thread_id: self.conversation_id,
turn_id: turn_context.sub_id.clone(),
item: item.clone(),
}),
)
.await;
}

async fn emit_turn_item_completed(
&self,
turn_context: &TurnContext,
item: TurnItem,
emit_raw_agent_reasoning: bool,
) {
self.send_event(
turn_context,
EventMsg::ItemCompleted(ItemCompletedEvent {
thread_id: self.conversation_id,
turn_id: turn_context.sub_id.clone(),
item: item.clone(),
}),
)
.await;
self.emit_turn_item_legacy_events(turn_context, &item, emit_raw_agent_reasoning)
.await;
}

async fn emit_turn_item_started_completed(
&self,
turn_context: &TurnContext,
item: TurnItem,
emit_raw_agent_reasoning: bool,
) {
self.emit_turn_item_started(turn_context, &item).await;
self.emit_turn_item_completed(turn_context, item, emit_raw_agent_reasoning)
.await;
}

async fn emit_turn_item_legacy_events(
&self,
turn_context: &TurnContext,
item: &TurnItem,
emit_raw_agent_reasoning: bool,
) {
for event in item.as_legacy_events(emit_raw_agent_reasoning) {
self.send_event(turn_context, event).await;
}
}

/// Emit an exec approval request event and await the user's decision.
///
/// The request is keyed by `sub_id`/`call_id` so matching responses are delivered
Expand Down Expand Up @@ -946,24 +993,22 @@ impl Session {

/// Record a user input item to conversation history and also persist a
/// corresponding UserMessage EventMsg to rollout.
async fn record_input_and_rollout_usermsg(&self, response_input: &ResponseInputItem) {
async fn record_input_and_rollout_usermsg(
&self,
turn_context: &TurnContext,
response_input: &ResponseInputItem,
) {
let response_item: ResponseItem = response_input.clone().into();
// Add to conversation history and persist response item to rollout
self.record_conversation_items(std::slice::from_ref(&response_item))
.await;

// Derive user message events and persist only UserMessage to rollout
let msgs =
map_response_item_to_event_messages(&response_item, self.show_raw_agent_reasoning());
let user_msgs: Vec<RolloutItem> = msgs
.into_iter()
.filter_map(|m| match m {
EventMsg::UserMessage(ev) => Some(RolloutItem::EventMsg(EventMsg::UserMessage(ev))),
_ => None,
})
.collect();
if !user_msgs.is_empty() {
self.persist_rollout_items(&user_msgs).await;
let turn_item = parse_turn_item(&response_item);

if let Some(item @ TurnItem::UserMessage(_)) = turn_item {
self.emit_turn_item_started_completed(turn_context, item, false)
.await;
}
}

Expand Down Expand Up @@ -1158,19 +1203,8 @@ async fn submission_loop(sess: Arc<Session>, config: Arc<Config>, rx_sub: Receiv
{
sess.record_conversation_items(std::slice::from_ref(&env_item))
.await;
for msg in map_response_item_to_event_messages(
&env_item,
sess.show_raw_agent_reasoning(),
) {
sess.send_event(&current_context, msg).await;
}
}

current_context
.item_collector
.started_completed(TurnItem::UserMessage(UserMessageItem::new(&items)))
.await;

sess.spawn_task(Arc::clone(&current_context), items, RegularTask)
.await;
previous_context = Some(current_context);
Expand Down Expand Up @@ -1444,11 +1478,6 @@ async fn spawn_review_thread(
is_review_mode: true,
final_output_json_schema: None,
codex_linux_sandbox_exe: parent_turn_context.codex_linux_sandbox_exe.clone(),
item_collector: ItemCollector::new(
sess.get_tx_event(),
sess.conversation_id,
sub_id.to_string(),
),
};

// Seed the child task with the review prompt as the initial user message.
Expand Down Expand Up @@ -1506,7 +1535,7 @@ pub(crate) async fn run_task(
review_thread_history.extend(sess.build_initial_context(turn_context.as_ref()));
review_thread_history.push(initial_input_for_turn.into());
} else {
sess.record_input_and_rollout_usermsg(&initial_input_for_turn)
sess.record_input_and_rollout_usermsg(turn_context.as_ref(), &initial_input_for_turn)
.await;
}

Expand Down Expand Up @@ -2023,9 +2052,10 @@ async fn try_run_turn(
}
Ok(None) => {
let response = handle_non_tool_response_item(
Arc::clone(&sess),
sess.as_ref(),
Arc::clone(&turn_context),
item.clone(),
sess.show_raw_agent_reasoning(),
)
.await?;
add_completed(ProcessedResponseItem { item, response });
Expand Down Expand Up @@ -2144,25 +2174,31 @@ async fn try_run_turn(
}

async fn handle_non_tool_response_item(
sess: Arc<Session>,
sess: &Session,
turn_context: Arc<TurnContext>,
item: ResponseItem,
show_raw_agent_reasoning: bool,
) -> CodexResult<Option<ResponseInputItem>> {
debug!(?item, "Output item");

match &item {
ResponseItem::Message { .. }
| ResponseItem::Reasoning { .. }
| ResponseItem::WebSearchCall { .. } => {
let msgs = match &item {
let turn_item = match &item {
ResponseItem::Message { .. } if turn_context.is_review_mode => {
trace!("suppressing assistant Message in review mode");
Vec::new()
None
}
_ => map_response_item_to_event_messages(&item, sess.show_raw_agent_reasoning()),
_ => parse_turn_item(&item),
};
for msg in msgs {
sess.send_event(&turn_context, msg).await;
if let Some(turn_item) = turn_item {
sess.emit_turn_item_started_completed(
turn_context.as_ref(),
turn_item,
show_raw_agent_reasoning,
)
.await;
}
}
ResponseItem::FunctionCallOutput { .. } | ResponseItem::CustomToolCallOutput { .. } => {
Expand Down Expand Up @@ -2649,7 +2685,6 @@ mod tests {
session_configuration.provider.clone(),
&session_configuration,
conversation_id,
tx_event.clone(),
"turn_id".to_string(),
);

Expand Down Expand Up @@ -2718,7 +2753,6 @@ mod tests {
session_configuration.provider.clone(),
&session_configuration,
conversation_id,
tx_event.clone(),
"turn_id".to_string(),
));

Expand Down
Loading
Loading