diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index 0d9fe4b089..90ec928375 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -77,7 +77,6 @@ use codex_core::protocol::ApplyPatchApprovalRequestEvent; use codex_core::protocol::Event; use codex_core::protocol::EventMsg; use codex_core::protocol::ExecApprovalRequestEvent; -use codex_core::protocol::InputItem as CoreInputItem; use codex_core::protocol::Op; use codex_core::protocol::ReviewDecision; use codex_login::ServerOptions as LoginServerOptions; @@ -89,6 +88,7 @@ use codex_protocol::models::ContentItem; use codex_protocol::models::ResponseItem; use codex_protocol::protocol::InputMessageKind; use codex_protocol::protocol::USER_MESSAGE_BEGIN; +use codex_protocol::user_input::UserInput as CoreInputItem; use codex_utils_json_to_toml::json_to_toml; use std::collections::HashMap; use std::ffi::OsStr; diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 170e00c71e..6c3b6ee472 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -10,12 +10,15 @@ use crate::event_mapping::map_response_item_to_event_messages; use crate::function_tool::FunctionCallError; use crate::parse_command::parse_command; use crate::review_format::format_review_findings_block; +use crate::state::ItemCollector; use crate::terminal; use crate::user_notification::UserNotifier; use async_channel::Receiver; use async_channel::Sender; use codex_apply_patch::ApplyPatchAction; use codex_protocol::ConversationId; +use codex_protocol::items::TurnItem; +use codex_protocol::items::UserMessageItem; use codex_protocol::protocol::ConversationPathResponseEvent; use codex_protocol::protocol::ExitedReviewModeEvent; use codex_protocol::protocol::McpAuthStatus; @@ -77,7 +80,6 @@ use crate::protocol::ErrorEvent; use crate::protocol::Event; use crate::protocol::EventMsg; use crate::protocol::ExecApprovalRequestEvent; -use crate::protocol::InputItem; use crate::protocol::ListCustomPromptsResponseEvent; use crate::protocol::Op; use crate::protocol::RateLimitSnapshot; @@ -122,6 +124,7 @@ use codex_protocol::models::FunctionCallOutputPayload; use codex_protocol::models::ResponseInputItem; use codex_protocol::models::ResponseItem; use codex_protocol::protocol::InitialHistory; +use codex_protocol::user_input::UserInput; pub mod compact; use self::compact::build_compacted_history; @@ -264,6 +267,7 @@ pub(crate) struct TurnContext { pub(crate) is_review_mode: bool, pub(crate) final_output_json_schema: Option, pub(crate) codex_linux_sandbox_exe: Option, + pub(crate) item_collector: ItemCollector, } impl TurnContext { @@ -352,6 +356,7 @@ impl Session { provider: ModelProviderInfo, session_configuration: &SessionConfiguration, conversation_id: ConversationId, + tx_event: Sender, ) -> TurnContext { let config = session_configuration.original_config_do_not_use.clone(); let model_family = find_family_for_model(&session_configuration.model) @@ -397,6 +402,7 @@ impl Session { is_review_mode: false, final_output_json_schema: None, codex_linux_sandbox_exe: config.codex_linux_sandbox_exe.clone(), + item_collector: ItemCollector::new(tx_event, conversation_id, "turn_id".to_string()), } } @@ -656,6 +662,7 @@ impl Session { session_configuration.provider.clone(), &session_configuration, self.conversation_id, + self.get_tx_event(), ); if let Some(final_schema) = updates.final_output_json_schema { turn_context.final_output_json_schema = final_schema; @@ -986,7 +993,7 @@ impl Session { } /// Returns the input if there was no task running to inject into - pub async fn inject_input(&self, input: Vec) -> Result<(), Vec> { + pub async fn inject_input(&self, input: Vec) -> Result<(), Vec> { let mut active = self.active_turn.lock().await; match active.as_mut() { Some(at) => { @@ -1157,6 +1164,11 @@ async fn submission_loop(sess: Arc, config: Arc, rx_sub: Receiv } } + current_context + .item_collector + .started_completed(TurnItem::UserMessage(UserMessageItem::new(&items))) + .await; + sess.spawn_task(Arc::clone(¤t_context), sub.id, items, RegularTask) .await; previous_context = Some(current_context); @@ -1268,7 +1280,7 @@ async fn submission_loop(sess: Arc, config: Arc, rx_sub: Receiv let turn_context = sess.new_turn(SessionSettingsUpdate::default()).await; // Attempt to inject input into current task if let Err(items) = sess - .inject_input(vec![InputItem::Text { + .inject_input(vec![UserInput::Text { text: compact::SUMMARIZATION_PROMPT.to_string(), }]) .await @@ -1422,10 +1434,15 @@ async fn spawn_review_thread( is_review_mode: true, final_output_json_schema: None, codex_linux_sandbox_exe: parent_turn_context.codex_linux_sandbox_exe.clone(), + item_collector: ItemCollector::new( + sess.get_tx_event(), + sess.conversation_id, + sub_id.to_string(), + ), }; // Seed the child task with the review prompt as the initial user message. - let input: Vec = vec![InputItem::Text { + let input: Vec = vec![UserInput::Text { text: review_prompt, }]; let tc = Arc::new(review_turn_context); @@ -1463,7 +1480,7 @@ pub(crate) async fn run_task( sess: Arc, turn_context: Arc, sub_id: String, - input: Vec, + input: Vec, task_kind: TaskKind, cancellation_token: CancellationToken, ) -> Option { @@ -2624,6 +2641,15 @@ mod tests { tool_approvals: Mutex::new(ApprovalStore::default()), }; + let turn_context = Session::make_turn_context( + Some(Arc::clone(&auth_manager)), + &otel_event_manager, + session_configuration.provider.clone(), + &session_configuration, + conversation_id, + tx_event.clone(), + ); + let session = Session { conversation_id, tx_event, @@ -2633,13 +2659,6 @@ mod tests { next_internal_sub_id: AtomicU64::new(0), }; - let turn_context = Session::make_turn_context( - Some(Arc::clone(&auth_manager)), - &otel_event_manager, - session_configuration.provider.clone(), - &session_configuration, - conversation_id, - ); (session, turn_context) } @@ -2690,6 +2709,15 @@ mod tests { tool_approvals: Mutex::new(ApprovalStore::default()), }; + let turn_context = Arc::new(Session::make_turn_context( + Some(Arc::clone(&auth_manager)), + &otel_event_manager, + session_configuration.provider.clone(), + &session_configuration, + conversation_id, + tx_event.clone(), + )); + let session = Arc::new(Session { conversation_id, tx_event, @@ -2699,13 +2727,6 @@ mod tests { next_internal_sub_id: AtomicU64::new(0), }); - let turn_context = Arc::new(Session::make_turn_context( - Some(Arc::clone(&auth_manager)), - &otel_event_manager, - session_configuration.provider.clone(), - &session_configuration, - conversation_id, - )); (session, turn_context, rx_event) } @@ -2726,7 +2747,7 @@ mod tests { _session: Arc, _ctx: Arc, _sub_id: String, - _input: Vec, + _input: Vec, cancellation_token: CancellationToken, ) -> Option { if self.listen_to_cancellation_token { @@ -2750,7 +2771,7 @@ mod tests { async fn abort_regular_task_emits_turn_aborted_only() { let (sess, tc, rx) = make_session_and_context_with_rx(); let sub_id = "sub-regular".to_string(); - let input = vec![InputItem::Text { + let input = vec![UserInput::Text { text: "hello".to_string(), }]; sess.spawn_task( @@ -2781,7 +2802,7 @@ mod tests { async fn abort_gracefuly_emits_turn_aborted_only() { let (sess, tc, rx) = make_session_and_context_with_rx(); let sub_id = "sub-regular".to_string(); - let input = vec![InputItem::Text { + let input = vec![UserInput::Text { text: "hello".to_string(), }]; sess.spawn_task( @@ -2809,7 +2830,7 @@ mod tests { async fn abort_review_task_emits_exited_then_aborted_and_records_history() { let (sess, tc, rx) = make_session_and_context_with_rx(); let sub_id = "sub-review".to_string(); - let input = vec![InputItem::Text { + let input = vec![UserInput::Text { text: "start review".to_string(), }]; sess.spawn_task( diff --git a/codex-rs/core/src/codex/compact.rs b/codex-rs/core/src/codex/compact.rs index 93bbfa79c6..a6bb8a8fb4 100644 --- a/codex-rs/core/src/codex/compact.rs +++ b/codex-rs/core/src/codex/compact.rs @@ -12,7 +12,6 @@ use crate::protocol::CompactedItem; use crate::protocol::ErrorEvent; use crate::protocol::Event; use crate::protocol::EventMsg; -use crate::protocol::InputItem; use crate::protocol::InputMessageKind; use crate::protocol::TaskStartedEvent; use crate::protocol::TurnContextItem; @@ -24,6 +23,7 @@ use codex_protocol::models::ContentItem; use codex_protocol::models::ResponseInputItem; use codex_protocol::models::ResponseItem; use codex_protocol::protocol::RolloutItem; +use codex_protocol::user_input::UserInput; use futures::prelude::*; pub const SUMMARIZATION_PROMPT: &str = include_str!("../../templates/compact/prompt.md"); @@ -41,7 +41,7 @@ pub(crate) async fn run_inline_auto_compact_task( turn_context: Arc, ) { let sub_id = sess.next_internal_sub_id(); - let input = vec![InputItem::Text { + let input = vec![UserInput::Text { text: SUMMARIZATION_PROMPT.to_string(), }]; run_compact_task_inner(sess, turn_context, sub_id, input).await; @@ -51,7 +51,7 @@ pub(crate) async fn run_compact_task( sess: Arc, turn_context: Arc, sub_id: String, - input: Vec, + input: Vec, ) -> Option { let start_event = Event { id: sub_id.clone(), @@ -68,7 +68,7 @@ async fn run_compact_task_inner( sess: Arc, turn_context: Arc, sub_id: String, - input: Vec, + input: Vec, ) { let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input); let mut turn_input = sess diff --git a/codex-rs/core/src/rollout/policy.rs b/codex-rs/core/src/rollout/policy.rs index 8fc39e79f5..fdf1f5ccb2 100644 --- a/codex-rs/core/src/rollout/policy.rs +++ b/codex-rs/core/src/rollout/policy.rs @@ -71,6 +71,8 @@ pub(crate) fn should_persist_event_msg(ev: &EventMsg) -> bool { | EventMsg::PlanUpdate(_) | EventMsg::ShutdownComplete | EventMsg::ViewImageToolCall(_) - | EventMsg::ConversationPath(_) => false, + | EventMsg::ConversationPath(_) + | EventMsg::ItemStarted(_) + | EventMsg::ItemCompleted(_) => false, } } diff --git a/codex-rs/core/src/state/item_collector.rs b/codex-rs/core/src/state/item_collector.rs new file mode 100644 index 0000000000..bd1c4329d8 --- /dev/null +++ b/codex-rs/core/src/state/item_collector.rs @@ -0,0 +1,68 @@ +use async_channel::Sender; +use codex_protocol::ConversationId; +use codex_protocol::items::TurnItem; +use codex_protocol::protocol::Event; +use codex_protocol::protocol::EventMsg; +use codex_protocol::protocol::ItemCompletedEvent; +use codex_protocol::protocol::ItemStartedEvent; +use tracing::error; + +#[derive(Debug)] +pub(crate) struct ItemCollector { + thread_id: ConversationId, + turn_id: String, + tx_event: Sender, +} + +impl ItemCollector { + pub fn new( + tx_event: Sender, + thread_id: ConversationId, + turn_id: String, + ) -> ItemCollector { + ItemCollector { + tx_event, + thread_id, + turn_id, + } + } + + pub async fn started(&self, item: TurnItem) { + let err = self + .tx_event + .send(Event { + id: self.turn_id.clone(), + msg: EventMsg::ItemStarted(ItemStartedEvent { + thread_id: self.thread_id, + turn_id: self.turn_id.clone(), + item, + }), + }) + .await; + if let Err(e) = err { + error!("failed to send item started event: {e}"); + } + } + + pub async fn completed(&self, item: TurnItem) { + let err = self + .tx_event + .send(Event { + id: self.turn_id.clone(), + msg: EventMsg::ItemCompleted(ItemCompletedEvent { + thread_id: self.thread_id, + turn_id: self.turn_id.clone(), + item, + }), + }) + .await; + if let Err(e) = err { + error!("failed to send item completed event: {e}"); + } + } + + pub async fn started_completed(&self, item: TurnItem) { + self.started(item.clone()).await; + self.completed(item).await; + } +} diff --git a/codex-rs/core/src/state/mod.rs b/codex-rs/core/src/state/mod.rs index 642433a786..7ba5f37083 100644 --- a/codex-rs/core/src/state/mod.rs +++ b/codex-rs/core/src/state/mod.rs @@ -1,7 +1,9 @@ +mod item_collector; mod service; mod session; mod turn; +pub(crate) use item_collector::ItemCollector; pub(crate) use service::SessionServices; pub(crate) use session::SessionState; pub(crate) use turn::ActiveTurn; diff --git a/codex-rs/core/src/tasks/compact.rs b/codex-rs/core/src/tasks/compact.rs index 12e61b1f65..a27e68ddd8 100644 --- a/codex-rs/core/src/tasks/compact.rs +++ b/codex-rs/core/src/tasks/compact.rs @@ -5,8 +5,8 @@ use tokio_util::sync::CancellationToken; use crate::codex::TurnContext; use crate::codex::compact; -use crate::protocol::InputItem; use crate::state::TaskKind; +use codex_protocol::user_input::UserInput; use super::SessionTask; use super::SessionTaskContext; @@ -25,7 +25,7 @@ impl SessionTask for CompactTask { session: Arc, ctx: Arc, sub_id: String, - input: Vec, + input: Vec, _cancellation_token: CancellationToken, ) -> Option { compact::run_compact_task(session.clone_session(), ctx, sub_id, input).await diff --git a/codex-rs/core/src/tasks/mod.rs b/codex-rs/core/src/tasks/mod.rs index 15ec419f9b..6a5dff6ab9 100644 --- a/codex-rs/core/src/tasks/mod.rs +++ b/codex-rs/core/src/tasks/mod.rs @@ -17,13 +17,13 @@ use crate::codex::Session; use crate::codex::TurnContext; use crate::protocol::Event; use crate::protocol::EventMsg; -use crate::protocol::InputItem; use crate::protocol::TaskCompleteEvent; use crate::protocol::TurnAbortReason; use crate::protocol::TurnAbortedEvent; use crate::state::ActiveTurn; use crate::state::RunningTask; use crate::state::TaskKind; +use codex_protocol::user_input::UserInput; pub(crate) use compact::CompactTask; pub(crate) use regular::RegularTask; @@ -56,7 +56,7 @@ pub(crate) trait SessionTask: Send + Sync + 'static { session: Arc, ctx: Arc, sub_id: String, - input: Vec, + input: Vec, cancellation_token: CancellationToken, ) -> Option; @@ -70,7 +70,7 @@ impl Session { self: &Arc, turn_context: Arc, sub_id: String, - input: Vec, + input: Vec, task: T, ) { self.abort_all_tasks(TurnAbortReason::Replaced).await; diff --git a/codex-rs/core/src/tasks/regular.rs b/codex-rs/core/src/tasks/regular.rs index 3a3fa267ce..a79d842c40 100644 --- a/codex-rs/core/src/tasks/regular.rs +++ b/codex-rs/core/src/tasks/regular.rs @@ -5,8 +5,8 @@ use tokio_util::sync::CancellationToken; use crate::codex::TurnContext; use crate::codex::run_task; -use crate::protocol::InputItem; use crate::state::TaskKind; +use codex_protocol::user_input::UserInput; use super::SessionTask; use super::SessionTaskContext; @@ -25,7 +25,7 @@ impl SessionTask for RegularTask { session: Arc, ctx: Arc, sub_id: String, - input: Vec, + input: Vec, cancellation_token: CancellationToken, ) -> Option { let sess = session.clone_session(); diff --git a/codex-rs/core/src/tasks/review.rs b/codex-rs/core/src/tasks/review.rs index 6b4b2175db..e4dfeec69f 100644 --- a/codex-rs/core/src/tasks/review.rs +++ b/codex-rs/core/src/tasks/review.rs @@ -6,8 +6,8 @@ use tokio_util::sync::CancellationToken; use crate::codex::TurnContext; use crate::codex::exit_review_mode; use crate::codex::run_task; -use crate::protocol::InputItem; use crate::state::TaskKind; +use codex_protocol::user_input::UserInput; use super::SessionTask; use super::SessionTaskContext; @@ -26,7 +26,7 @@ impl SessionTask for ReviewTask { session: Arc, ctx: Arc, sub_id: String, - input: Vec, + input: Vec, cancellation_token: CancellationToken, ) -> Option { let sess = session.clone_session(); diff --git a/codex-rs/core/src/tools/handlers/view_image.rs b/codex-rs/core/src/tools/handlers/view_image.rs index 2396e19ced..43cada47b2 100644 --- a/codex-rs/core/src/tools/handlers/view_image.rs +++ b/codex-rs/core/src/tools/handlers/view_image.rs @@ -5,13 +5,13 @@ use tokio::fs; use crate::function_tool::FunctionCallError; use crate::protocol::Event; use crate::protocol::EventMsg; -use crate::protocol::InputItem; use crate::protocol::ViewImageToolCallEvent; use crate::tools::context::ToolInvocation; use crate::tools::context::ToolOutput; use crate::tools::context::ToolPayload; use crate::tools::registry::ToolHandler; use crate::tools::registry::ToolKind; +use codex_protocol::user_input::UserInput; pub struct ViewImageHandler; @@ -67,7 +67,7 @@ impl ToolHandler for ViewImageHandler { let event_path = abs_path.clone(); session - .inject_input(vec![InputItem::LocalImage { path: abs_path }]) + .inject_input(vec![UserInput::LocalImage { path: abs_path }]) .await .map_err(|_| { FunctionCallError::RespondToModel( diff --git a/codex-rs/core/tests/common/lib.rs b/codex-rs/core/tests/common/lib.rs index 5944fa9481..0064d0e5a4 100644 --- a/codex-rs/core/tests/common/lib.rs +++ b/codex-rs/core/tests/common/lib.rs @@ -134,6 +134,14 @@ where wait_for_event_with_timeout(codex, predicate, Duration::from_secs(1)).await } +pub async fn wait_for_event_match(codex: &CodexConversation, matcher: F) -> T +where + F: Fn(&codex_core::protocol::EventMsg) -> Option, +{ + let ev = wait_for_event(codex, |ev| matcher(ev).is_some()).await; + matcher(&ev).unwrap() +} + pub async fn wait_for_event_with_timeout( codex: &CodexConversation, mut predicate: F, diff --git a/codex-rs/core/tests/suite/abort_tasks.rs b/codex-rs/core/tests/suite/abort_tasks.rs index 5122f6611e..dcc65fc742 100644 --- a/codex-rs/core/tests/suite/abort_tasks.rs +++ b/codex-rs/core/tests/suite/abort_tasks.rs @@ -1,8 +1,8 @@ use std::time::Duration; use codex_core::protocol::EventMsg; -use codex_core::protocol::InputItem; use codex_core::protocol::Op; +use codex_protocol::user_input::UserInput; use core_test_support::responses::ev_completed; use core_test_support::responses::ev_function_call; use core_test_support::responses::mount_sse_once; @@ -42,7 +42,7 @@ async fn interrupt_long_running_tool_emits_turn_aborted() { // Kick off a turn that triggers the function call. codex .submit(Op::UserInput { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: "start sleep".into(), }], }) diff --git a/codex-rs/core/tests/suite/approvals.rs b/codex-rs/core/tests/suite/approvals.rs index 0ce1183edd..2d90d98f3f 100644 --- a/codex-rs/core/tests/suite/approvals.rs +++ b/codex-rs/core/tests/suite/approvals.rs @@ -6,11 +6,11 @@ use codex_core::protocol::ApplyPatchApprovalRequestEvent; use codex_core::protocol::AskForApproval; use codex_core::protocol::EventMsg; use codex_core::protocol::ExecApprovalRequestEvent; -use codex_core::protocol::InputItem; use codex_core::protocol::Op; use codex_core::protocol::SandboxPolicy; use codex_protocol::config_types::ReasoningSummary; use codex_protocol::protocol::ReviewDecision; +use codex_protocol::user_input::UserInput; use core_test_support::responses::ev_apply_patch_function_call; use core_test_support::responses::ev_assistant_message; use core_test_support::responses::ev_completed; @@ -374,7 +374,7 @@ async fn submit_turn( test.codex .submit(Op::UserTurn { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: prompt.into(), }], final_output_json_schema: None, diff --git a/codex-rs/core/tests/suite/client.rs b/codex-rs/core/tests/suite/client.rs index 7bc22449a8..ed16cdb35d 100644 --- a/codex-rs/core/tests/suite/client.rs +++ b/codex-rs/core/tests/suite/client.rs @@ -17,13 +17,13 @@ use codex_core::built_in_model_providers; use codex_core::error::CodexErr; use codex_core::model_family::find_family_for_model; use codex_core::protocol::EventMsg; -use codex_core::protocol::InputItem; use codex_core::protocol::Op; use codex_core::protocol::SessionSource; use codex_otel::otel_event_manager::OtelEventManager; use codex_protocol::ConversationId; use codex_protocol::models::ReasoningItemReasoningSummary; use codex_protocol::models::WebSearchAction; +use codex_protocol::user_input::UserInput; use core_test_support::load_default_config_for_test; use core_test_support::load_sse_fixture_with_id; use core_test_support::responses; @@ -263,7 +263,7 @@ async fn resume_includes_initial_messages_and_sends_prior_items() { // 2) Submit new input; the request body must include the prior item followed by the new user input. codex .submit(Op::UserInput { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: "hello".into(), }], }) @@ -335,7 +335,7 @@ async fn includes_conversation_id_and_model_headers_in_request() { codex .submit(Op::UserInput { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: "hello".into(), }], }) @@ -390,7 +390,7 @@ async fn includes_base_instructions_override_in_request() { codex .submit(Op::UserInput { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: "hello".into(), }], }) @@ -450,7 +450,7 @@ async fn chatgpt_auth_sends_correct_request() { codex .submit(Op::UserInput { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: "hello".into(), }], }) @@ -540,7 +540,7 @@ async fn prefers_apikey_when_config_prefers_apikey_even_with_chatgpt_tokens() { codex .submit(Op::UserInput { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: "hello".into(), }], }) @@ -579,7 +579,7 @@ async fn includes_user_instructions_message_in_request() { codex .submit(Op::UserInput { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: "hello".into(), }], }) @@ -795,7 +795,7 @@ async fn token_count_includes_rate_limits_snapshot() { codex .submit(Op::UserInput { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: "hello".into(), }], }) @@ -946,7 +946,7 @@ async fn usage_limit_error_emits_rate_limit_event() -> anyhow::Result<()> { let submission_id = codex .submit(Op::UserInput { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: "hello".into(), }], }) @@ -1016,7 +1016,7 @@ async fn context_window_error_sets_total_tokens_to_model_window() -> anyhow::Res codex .submit(Op::UserInput { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: "seed turn".into(), }], }) @@ -1026,7 +1026,7 @@ async fn context_window_error_sets_total_tokens_to_model_window() -> anyhow::Res codex .submit(Op::UserInput { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: "trigger context window".into(), }], }) @@ -1146,7 +1146,7 @@ async fn azure_overrides_assign_properties_used_for_responses_url() { codex .submit(Op::UserInput { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: "hello".into(), }], }) @@ -1223,7 +1223,7 @@ async fn env_var_overrides_loaded_auth() { codex .submit(Op::UserInput { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: "hello".into(), }], }) @@ -1301,7 +1301,7 @@ async fn history_dedupes_streamed_and_final_messages_across_turns() { // Turn 1: user sends U1; wait for completion. codex .submit(Op::UserInput { - items: vec![InputItem::Text { text: "U1".into() }], + items: vec![UserInput::Text { text: "U1".into() }], }) .await .unwrap(); @@ -1310,7 +1310,7 @@ async fn history_dedupes_streamed_and_final_messages_across_turns() { // Turn 2: user sends U2; wait for completion. codex .submit(Op::UserInput { - items: vec![InputItem::Text { text: "U2".into() }], + items: vec![UserInput::Text { text: "U2".into() }], }) .await .unwrap(); @@ -1319,7 +1319,7 @@ async fn history_dedupes_streamed_and_final_messages_across_turns() { // Turn 3: user sends U3; wait for completion. codex .submit(Op::UserInput { - items: vec![InputItem::Text { text: "U3".into() }], + items: vec![UserInput::Text { text: "U3".into() }], }) .await .unwrap(); diff --git a/codex-rs/core/tests/suite/compact.rs b/codex-rs/core/tests/suite/compact.rs index bd760b8749..17d72abe7e 100644 --- a/codex-rs/core/tests/suite/compact.rs +++ b/codex-rs/core/tests/suite/compact.rs @@ -5,10 +5,10 @@ use codex_core::NewConversation; use codex_core::built_in_model_providers; use codex_core::protocol::ErrorEvent; use codex_core::protocol::EventMsg; -use codex_core::protocol::InputItem; use codex_core::protocol::Op; use codex_core::protocol::RolloutItem; use codex_core::protocol::RolloutLine; +use codex_protocol::user_input::UserInput; use core_test_support::load_default_config_for_test; use core_test_support::skip_if_no_network; use core_test_support::wait_for_event; @@ -108,7 +108,7 @@ async fn summarize_context_three_requests_and_instructions() { // 1) Normal user input – should hit server once. codex .submit(Op::UserInput { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: "hello world".into(), }], }) @@ -123,7 +123,7 @@ async fn summarize_context_three_requests_and_instructions() { // 3) Next user input – third hit; history should include only the summary. codex .submit(Op::UserInput { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: THIRD_USER_MSG.into(), }], }) @@ -324,7 +324,7 @@ async fn auto_compact_runs_after_token_limit_hit() { codex .submit(Op::UserInput { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: FIRST_AUTO_MSG.into(), }], }) @@ -335,7 +335,7 @@ async fn auto_compact_runs_after_token_limit_hit() { codex .submit(Op::UserInput { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: SECOND_AUTO_MSG.into(), }], }) @@ -468,7 +468,7 @@ async fn auto_compact_persists_rollout_entries() { codex .submit(Op::UserInput { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: FIRST_AUTO_MSG.into(), }], }) @@ -478,7 +478,7 @@ async fn auto_compact_persists_rollout_entries() { codex .submit(Op::UserInput { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: SECOND_AUTO_MSG.into(), }], }) @@ -580,7 +580,7 @@ async fn auto_compact_stops_after_failed_attempt() { codex .submit(Op::UserInput { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: FIRST_AUTO_MSG.into(), }], }) @@ -674,7 +674,7 @@ async fn manual_compact_retries_after_context_window_error() { codex .submit(Op::UserInput { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: "first turn".into(), }], }) @@ -802,7 +802,7 @@ async fn auto_compact_allows_multiple_attempts_when_interleaved_with_other_turn_ codex .submit(Op::UserInput { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: MULTI_AUTO_MSG.into(), }], }) @@ -913,7 +913,7 @@ async fn auto_compact_triggers_after_function_call_over_95_percent_usage() { codex .submit(Op::UserInput { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: FUNCTION_CALL_LIMIT_MSG.into(), }], }) diff --git a/codex-rs/core/tests/suite/compact_resume_fork.rs b/codex-rs/core/tests/suite/compact_resume_fork.rs index 8197bed10f..4261a30510 100644 --- a/codex-rs/core/tests/suite/compact_resume_fork.rs +++ b/codex-rs/core/tests/suite/compact_resume_fork.rs @@ -20,9 +20,9 @@ use codex_core::config::Config; use codex_core::config::OPENAI_DEFAULT_MODEL; use codex_core::protocol::ConversationPathResponseEvent; use codex_core::protocol::EventMsg; -use codex_core::protocol::InputItem; use codex_core::protocol::Op; use codex_core::spawn::CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR; +use codex_protocol::user_input::UserInput; use core_test_support::load_default_config_for_test; use core_test_support::responses::ev_assistant_message; use core_test_support::responses::ev_completed; @@ -777,7 +777,7 @@ async fn start_test_conversation( async fn user_turn(conversation: &Arc, text: &str) { conversation .submit(Op::UserInput { - items: vec![InputItem::Text { text: text.into() }], + items: vec![UserInput::Text { text: text.into() }], }) .await .expect("submit user turn"); diff --git a/codex-rs/core/tests/suite/fork_conversation.rs b/codex-rs/core/tests/suite/fork_conversation.rs index 44c0cd8195..28f5d0175d 100644 --- a/codex-rs/core/tests/suite/fork_conversation.rs +++ b/codex-rs/core/tests/suite/fork_conversation.rs @@ -9,10 +9,10 @@ use codex_core::content_items_to_text; use codex_core::is_session_prefix_message; use codex_core::protocol::ConversationPathResponseEvent; use codex_core::protocol::EventMsg; -use codex_core::protocol::InputItem; use codex_core::protocol::Op; use codex_core::protocol::RolloutItem; use codex_core::protocol::RolloutLine; +use codex_protocol::user_input::UserInput; use core_test_support::load_default_config_for_test; use core_test_support::skip_if_no_network; use core_test_support::wait_for_event; @@ -71,7 +71,7 @@ async fn fork_conversation_twice_drops_to_first_message() { for text in ["first", "second", "third"] { codex .submit(Op::UserInput { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: text.to_string(), }], }) diff --git a/codex-rs/core/tests/suite/grep_files.rs b/codex-rs/core/tests/suite/grep_files.rs index 31195f7e3b..f8097558d9 100644 --- a/codex-rs/core/tests/suite/grep_files.rs +++ b/codex-rs/core/tests/suite/grep_files.rs @@ -4,10 +4,10 @@ use anyhow::Result; use codex_core::model_family::find_family_for_model; use codex_core::protocol::AskForApproval; use codex_core::protocol::EventMsg; -use codex_core::protocol::InputItem; use codex_core::protocol::Op; use codex_core::protocol::SandboxPolicy; use codex_protocol::config_types::ReasoningSummary; +use codex_protocol::user_input::UserInput; use core_test_support::responses; use core_test_support::responses::ev_assistant_message; use core_test_support::responses::ev_completed; @@ -149,7 +149,7 @@ async fn submit_turn(test: &TestCodex, prompt: &str) -> Result<()> { test.codex .submit(Op::UserTurn { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: prompt.into(), }], final_output_json_schema: None, diff --git a/codex-rs/core/tests/suite/items.rs b/codex-rs/core/tests/suite/items.rs new file mode 100644 index 0000000000..a64f51187a --- /dev/null +++ b/codex-rs/core/tests/suite/items.rs @@ -0,0 +1,68 @@ +#![cfg(not(target_os = "windows"))] + +use anyhow::Ok; +use codex_core::protocol::EventMsg; +use codex_core::protocol::Op; +use codex_protocol::items::TurnItem; +use codex_protocol::user_input::UserInput; +use core_test_support::responses; +use core_test_support::responses::ev_completed; +use core_test_support::responses::ev_response_created; +use core_test_support::responses::sse; +use core_test_support::responses::start_mock_server; +use core_test_support::skip_if_no_network; +use core_test_support::test_codex::TestCodex; +use core_test_support::test_codex::test_codex; +use core_test_support::wait_for_event_match; +use pretty_assertions::assert_eq; +use wiremock::matchers::any; + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn user_message_item_is_emitted() -> anyhow::Result<()> { + skip_if_no_network!(Ok(())); + + let server = start_mock_server().await; + + let TestCodex { codex, .. } = test_codex().build(&server).await?; + + let first_response = sse(vec![ev_response_created("resp-1"), ev_completed("resp-1")]); + responses::mount_sse_once_match(&server, any(), first_response).await; + + codex + .submit(Op::UserInput { + items: (vec![UserInput::Text { + text: "please inspect sample.txt".into(), + }]), + }) + .await?; + + let started = wait_for_event_match(&codex, |ev| match ev { + EventMsg::ItemStarted(e) => Some(e.clone()), + _ => None, + }) + .await; + + let completed = wait_for_event_match(&codex, |ev| match ev { + EventMsg::ItemCompleted(e) => Some(e.clone()), + _ => None, + }) + .await; + + let TurnItem::UserMessage(started_item) = started.item; + let TurnItem::UserMessage(completed_item) = completed.item; + + assert_eq!(started_item.id, completed_item.id); + assert_eq!( + started_item.content, + vec![UserInput::Text { + text: "please inspect sample.txt".into(), + }] + ); + assert_eq!( + completed_item.content, + vec![UserInput::Text { + text: "please inspect sample.txt".into(), + }] + ); + Ok(()) +} diff --git a/codex-rs/core/tests/suite/json_result.rs b/codex-rs/core/tests/suite/json_result.rs index e8d91fae8b..cdfa28e0ae 100644 --- a/codex-rs/core/tests/suite/json_result.rs +++ b/codex-rs/core/tests/suite/json_result.rs @@ -2,10 +2,10 @@ use codex_core::protocol::AskForApproval; use codex_core::protocol::EventMsg; -use codex_core::protocol::InputItem; use codex_core::protocol::Op; use codex_core::protocol::SandboxPolicy; use codex_protocol::config_types::ReasoningSummary; +use codex_protocol::user_input::UserInput; use core_test_support::responses; use core_test_support::skip_if_no_network; use core_test_support::test_codex::TestCodex; @@ -74,7 +74,7 @@ async fn codex_returns_json_result(model: String) -> anyhow::Result<()> { // 1) Normal user input – should hit server once. codex .submit(Op::UserTurn { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: "hello world".into(), }], final_output_json_schema: Some(serde_json::from_str(SCHEMA)?), diff --git a/codex-rs/core/tests/suite/list_dir.rs b/codex-rs/core/tests/suite/list_dir.rs index 1aa5a6484b..2a04d3075d 100644 --- a/codex-rs/core/tests/suite/list_dir.rs +++ b/codex-rs/core/tests/suite/list_dir.rs @@ -2,10 +2,10 @@ use codex_core::protocol::AskForApproval; use codex_core::protocol::EventMsg; -use codex_core::protocol::InputItem; use codex_core::protocol::Op; use codex_core::protocol::SandboxPolicy; use codex_protocol::config_types::ReasoningSummary; +use codex_protocol::user_input::UserInput; use core_test_support::responses; use core_test_support::responses::ev_assistant_message; use core_test_support::responses::ev_completed; @@ -66,7 +66,7 @@ async fn list_dir_tool_returns_entries() -> anyhow::Result<()> { codex .submit(Op::UserTurn { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: "list directory contents".into(), }], final_output_json_schema: None, @@ -171,7 +171,7 @@ async fn list_dir_tool_depth_one_omits_children() -> anyhow::Result<()> { codex .submit(Op::UserTurn { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: "list directory contents depth one".into(), }], final_output_json_schema: None, @@ -283,7 +283,7 @@ async fn list_dir_tool_depth_two_includes_children_only() -> anyhow::Result<()> codex .submit(Op::UserTurn { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: "list directory contents depth two".into(), }], final_output_json_schema: None, @@ -398,7 +398,7 @@ async fn list_dir_tool_depth_three_includes_grandchildren() -> anyhow::Result<() codex .submit(Op::UserTurn { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: "list directory contents depth three".into(), }], final_output_json_schema: None, diff --git a/codex-rs/core/tests/suite/mod.rs b/codex-rs/core/tests/suite/mod.rs index 440eb1e893..22442d10d7 100644 --- a/codex-rs/core/tests/suite/mod.rs +++ b/codex-rs/core/tests/suite/mod.rs @@ -11,6 +11,7 @@ mod compact_resume_fork; mod exec; mod fork_conversation; mod grep_files; +mod items; mod json_result; mod list_dir; mod live_cli; diff --git a/codex-rs/core/tests/suite/model_tools.rs b/codex-rs/core/tests/suite/model_tools.rs index 71e64f9e3a..0b749d4741 100644 --- a/codex-rs/core/tests/suite/model_tools.rs +++ b/codex-rs/core/tests/suite/model_tools.rs @@ -7,8 +7,8 @@ use codex_core::built_in_model_providers; use codex_core::features::Feature; use codex_core::model_family::find_family_for_model; use codex_core::protocol::EventMsg; -use codex_core::protocol::InputItem; use codex_core::protocol::Op; +use codex_protocol::user_input::UserInput; use core_test_support::load_default_config_for_test; use core_test_support::load_sse_fixture_with_id; use core_test_support::responses; @@ -74,7 +74,7 @@ async fn collect_tool_identifiers_for_model(model: &str) -> Vec { codex .submit(Op::UserInput { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: "hello tools".into(), }], }) diff --git a/codex-rs/core/tests/suite/otel.rs b/codex-rs/core/tests/suite/otel.rs index 63ad3ccaea..ec6d567cb6 100644 --- a/codex-rs/core/tests/suite/otel.rs +++ b/codex-rs/core/tests/suite/otel.rs @@ -1,9 +1,9 @@ use codex_protocol::protocol::AskForApproval; use codex_protocol::protocol::EventMsg; -use codex_protocol::protocol::InputItem; use codex_protocol::protocol::Op; use codex_protocol::protocol::ReviewDecision; use codex_protocol::protocol::SandboxPolicy; +use codex_protocol::user_input::UserInput; use core_test_support::responses::ev_assistant_message; use core_test_support::responses::ev_completed; use core_test_support::responses::ev_custom_tool_call; @@ -31,7 +31,7 @@ async fn responses_api_emits_api_request_event() { codex .submit(Op::UserInput { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: "hello".into(), }], }) @@ -77,7 +77,7 @@ async fn process_sse_emits_tracing_for_output_item() { codex .submit(Op::UserInput { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: "hello".into(), }], }) @@ -121,7 +121,7 @@ async fn process_sse_emits_failed_event_on_parse_error() { codex .submit(Op::UserInput { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: "hello".into(), }], }) @@ -166,7 +166,7 @@ async fn process_sse_records_failed_event_when_stream_closes_without_completed() codex .submit(Op::UserInput { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: "hello".into(), }], }) @@ -223,7 +223,7 @@ async fn process_sse_failed_event_records_response_error_message() { codex .submit(Op::UserInput { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: "hello".into(), }], }) @@ -278,7 +278,7 @@ async fn process_sse_failed_event_logs_parse_error() { codex .submit(Op::UserInput { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: "hello".into(), }], }) @@ -328,7 +328,7 @@ async fn process_sse_failed_event_logs_missing_error() { codex .submit(Op::UserInput { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: "hello".into(), }], }) @@ -378,7 +378,7 @@ async fn process_sse_failed_event_logs_response_completed_parse_error() { codex .submit(Op::UserInput { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: "hello".into(), }], }) @@ -433,7 +433,7 @@ async fn process_sse_emits_completed_telemetry() { codex .submit(Op::UserInput { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: "hello".into(), }], }) @@ -493,7 +493,7 @@ async fn handle_response_item_records_tool_result_for_custom_tool_call() { codex .submit(Op::UserInput { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: "hello".into(), }], }) @@ -557,7 +557,7 @@ async fn handle_response_item_records_tool_result_for_function_call() { codex .submit(Op::UserInput { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: "hello".into(), }], }) @@ -631,7 +631,7 @@ async fn handle_response_item_records_tool_result_for_local_shell_missing_ids() codex .submit(Op::UserInput { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: "hello".into(), }], }) @@ -689,7 +689,7 @@ async fn handle_response_item_records_tool_result_for_local_shell_call() { codex .submit(Op::UserInput { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: "hello".into(), }], }) @@ -787,7 +787,7 @@ async fn handle_container_exec_autoapprove_from_config_records_tool_decision() { codex .submit(Op::UserInput { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: "hello".into(), }], }) @@ -833,7 +833,7 @@ async fn handle_container_exec_user_approved_records_tool_decision() { codex .submit(Op::UserInput { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: "approved".into(), }], }) @@ -895,7 +895,7 @@ async fn handle_container_exec_user_approved_for_session_records_tool_decision() codex .submit(Op::UserInput { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: "persist".into(), }], }) @@ -957,7 +957,7 @@ async fn handle_sandbox_error_user_approves_retry_records_tool_decision() { codex .submit(Op::UserInput { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: "retry".into(), }], }) @@ -1019,7 +1019,7 @@ async fn handle_container_exec_user_denies_records_tool_decision() { codex .submit(Op::UserInput { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: "deny".into(), }], }) @@ -1081,7 +1081,7 @@ async fn handle_sandbox_error_user_approves_for_session_records_tool_decision() codex .submit(Op::UserInput { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: "persist".into(), }], }) @@ -1143,7 +1143,7 @@ async fn handle_sandbox_error_user_denies_records_tool_decision() { codex .submit(Op::UserInput { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: "deny".into(), }], }) diff --git a/codex-rs/core/tests/suite/prompt_caching.rs b/codex-rs/core/tests/suite/prompt_caching.rs index caa4cb68de..3add7bbfaa 100644 --- a/codex-rs/core/tests/suite/prompt_caching.rs +++ b/codex-rs/core/tests/suite/prompt_caching.rs @@ -9,13 +9,13 @@ use codex_core::features::Feature; use codex_core::model_family::find_family_for_model; use codex_core::protocol::AskForApproval; use codex_core::protocol::EventMsg; -use codex_core::protocol::InputItem; use codex_core::protocol::Op; use codex_core::protocol::SandboxPolicy; use codex_core::protocol_config_types::ReasoningEffort; use codex_core::protocol_config_types::ReasoningSummary; use codex_core::shell::Shell; use codex_core::shell::default_user_shell; +use codex_protocol::user_input::UserInput; use core_test_support::load_default_config_for_test; use core_test_support::load_sse_fixture_with_id; use core_test_support::skip_if_no_network; @@ -115,7 +115,7 @@ async fn codex_mini_latest_tools() { codex .submit(Op::UserInput { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: "hello 1".into(), }], }) @@ -125,7 +125,7 @@ async fn codex_mini_latest_tools() { codex .submit(Op::UserInput { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: "hello 2".into(), }], }) @@ -199,7 +199,7 @@ async fn prompt_tools_are_consistent_across_requests() { codex .submit(Op::UserInput { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: "hello 1".into(), }], }) @@ -209,7 +209,7 @@ async fn prompt_tools_are_consistent_across_requests() { codex .submit(Op::UserInput { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: "hello 2".into(), }], }) @@ -319,7 +319,7 @@ async fn prefixes_context_and_instructions_once_and_consistently_across_requests codex .submit(Op::UserInput { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: "hello 1".into(), }], }) @@ -329,7 +329,7 @@ async fn prefixes_context_and_instructions_once_and_consistently_across_requests codex .submit(Op::UserInput { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: "hello 2".into(), }], }) @@ -439,7 +439,7 @@ async fn overrides_turn_context_but_keeps_cached_prefix_and_key_constant() { // First turn codex .submit(Op::UserInput { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: "hello 1".into(), }], }) @@ -468,7 +468,7 @@ async fn overrides_turn_context_but_keeps_cached_prefix_and_key_constant() { // Second turn after overrides codex .submit(Op::UserInput { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: "hello 2".into(), }], }) @@ -567,7 +567,7 @@ async fn per_turn_overrides_keep_cached_prefix_and_key_constant() { // First turn codex .submit(Op::UserInput { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: "hello 1".into(), }], }) @@ -580,7 +580,7 @@ async fn per_turn_overrides_keep_cached_prefix_and_key_constant() { let writable = TempDir::new().unwrap(); codex .submit(Op::UserTurn { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: "hello 2".into(), }], cwd: new_cwd.path().to_path_buf(), @@ -696,7 +696,7 @@ async fn send_user_turn_with_no_changes_does_not_send_environment_context() { codex .submit(Op::UserTurn { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: "hello 1".into(), }], cwd: default_cwd.clone(), @@ -713,7 +713,7 @@ async fn send_user_turn_with_no_changes_does_not_send_environment_context() { codex .submit(Op::UserTurn { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: "hello 2".into(), }], cwd: default_cwd.clone(), @@ -810,7 +810,7 @@ async fn send_user_turn_with_changes_sends_environment_context() { codex .submit(Op::UserTurn { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: "hello 1".into(), }], cwd: default_cwd.clone(), @@ -827,7 +827,7 @@ async fn send_user_turn_with_changes_sends_environment_context() { codex .submit(Op::UserTurn { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: "hello 2".into(), }], cwd: default_cwd.clone(), diff --git a/codex-rs/core/tests/suite/read_file.rs b/codex-rs/core/tests/suite/read_file.rs index fc5a94f91b..a74bd8b2a9 100644 --- a/codex-rs/core/tests/suite/read_file.rs +++ b/codex-rs/core/tests/suite/read_file.rs @@ -2,10 +2,10 @@ use codex_core::protocol::AskForApproval; use codex_core::protocol::EventMsg; -use codex_core::protocol::InputItem; use codex_core::protocol::Op; use codex_core::protocol::SandboxPolicy; use codex_protocol::config_types::ReasoningSummary; +use codex_protocol::user_input::UserInput; use core_test_support::responses; use core_test_support::responses::ev_assistant_message; use core_test_support::responses::ev_completed; @@ -64,7 +64,7 @@ async fn read_file_tool_returns_requested_lines() -> anyhow::Result<()> { codex .submit(Op::UserTurn { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: "please inspect sample.txt".into(), }], final_output_json_schema: None, diff --git a/codex-rs/core/tests/suite/review.rs b/codex-rs/core/tests/suite/review.rs index f3eeb3a310..422acd922f 100644 --- a/codex-rs/core/tests/suite/review.rs +++ b/codex-rs/core/tests/suite/review.rs @@ -11,7 +11,6 @@ use codex_core::protocol::ConversationPathResponseEvent; use codex_core::protocol::ENVIRONMENT_CONTEXT_OPEN_TAG; use codex_core::protocol::EventMsg; use codex_core::protocol::ExitedReviewModeEvent; -use codex_core::protocol::InputItem; use codex_core::protocol::Op; use codex_core::protocol::ReviewCodeLocation; use codex_core::protocol::ReviewFinding; @@ -20,6 +19,7 @@ use codex_core::protocol::ReviewOutputEvent; use codex_core::protocol::ReviewRequest; use codex_core::protocol::RolloutItem; use codex_core::protocol::RolloutLine; +use codex_protocol::user_input::UserInput; use core_test_support::load_default_config_for_test; use core_test_support::load_sse_fixture_with_id_from_str; use core_test_support::skip_if_no_network; @@ -566,7 +566,7 @@ async fn review_history_does_not_leak_into_parent_session() { let followup = "back to parent".to_string(); codex .submit(Op::UserInput { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: followup.clone(), }], }) diff --git a/codex-rs/core/tests/suite/rmcp_client.rs b/codex-rs/core/tests/suite/rmcp_client.rs index ab702b5d91..609ae87118 100644 --- a/codex-rs/core/tests/suite/rmcp_client.rs +++ b/codex-rs/core/tests/suite/rmcp_client.rs @@ -14,10 +14,10 @@ use codex_core::features::Feature; use codex_core::protocol::AskForApproval; use codex_core::protocol::EventMsg; -use codex_core::protocol::InputItem; use codex_core::protocol::Op; use codex_core::protocol::SandboxPolicy; use codex_protocol::config_types::ReasoningSummary; +use codex_protocol::user_input::UserInput; use core_test_support::responses; use core_test_support::responses::mount_sse_once_match; use core_test_support::skip_if_no_network; @@ -104,7 +104,7 @@ async fn stdio_server_round_trip() -> anyhow::Result<()> { fixture .codex .submit(Op::UserTurn { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: "call the rmcp echo tool".into(), }], final_output_json_schema: None, @@ -240,7 +240,7 @@ async fn stdio_server_propagates_whitelisted_env_vars() -> anyhow::Result<()> { fixture .codex .submit(Op::UserTurn { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: "call the rmcp echo tool".into(), }], final_output_json_schema: None, @@ -391,7 +391,7 @@ async fn streamable_http_tool_call_round_trip() -> anyhow::Result<()> { fixture .codex .submit(Op::UserTurn { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: "call the rmcp streamable http echo tool".into(), }], final_output_json_schema: None, @@ -574,7 +574,7 @@ async fn streamable_http_with_oauth_round_trip() -> anyhow::Result<()> { fixture .codex .submit(Op::UserTurn { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: "call the rmcp streamable http oauth echo tool".into(), }], final_output_json_schema: None, diff --git a/codex-rs/core/tests/suite/shell_serialization.rs b/codex-rs/core/tests/suite/shell_serialization.rs index 21141ec535..50d0c99980 100644 --- a/codex-rs/core/tests/suite/shell_serialization.rs +++ b/codex-rs/core/tests/suite/shell_serialization.rs @@ -5,10 +5,10 @@ use codex_core::features::Feature; use codex_core::model_family::find_family_for_model; use codex_core::protocol::AskForApproval; use codex_core::protocol::EventMsg; -use codex_core::protocol::InputItem; use codex_core::protocol::Op; use codex_core::protocol::SandboxPolicy; use codex_protocol::config_types::ReasoningSummary; +use codex_protocol::user_input::UserInput; use core_test_support::assert_regex_match; use core_test_support::responses::ev_apply_patch_function_call; use core_test_support::responses::ev_assistant_message; @@ -35,7 +35,7 @@ async fn submit_turn(test: &TestCodex, prompt: &str, sandbox_policy: SandboxPoli test.codex .submit(Op::UserTurn { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: prompt.into(), }], final_output_json_schema: None, diff --git a/codex-rs/core/tests/suite/stream_error_allows_next_turn.rs b/codex-rs/core/tests/suite/stream_error_allows_next_turn.rs index 9c32c351e0..03f1be0f18 100644 --- a/codex-rs/core/tests/suite/stream_error_allows_next_turn.rs +++ b/codex-rs/core/tests/suite/stream_error_allows_next_turn.rs @@ -3,8 +3,8 @@ use std::time::Duration; use codex_core::ModelProviderInfo; use codex_core::WireApi; use codex_core::protocol::EventMsg; -use codex_core::protocol::InputItem; use codex_core::protocol::Op; +use codex_protocol::user_input::UserInput; use core_test_support::load_sse_fixture_with_id; use core_test_support::skip_if_no_network; use core_test_support::test_codex::TestCodex; @@ -87,7 +87,7 @@ async fn continue_after_stream_error() { codex .submit(Op::UserInput { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: "first message".into(), }], }) @@ -114,7 +114,7 @@ async fn continue_after_stream_error() { // error above, this submission would be rejected/queued indefinitely. codex .submit(Op::UserInput { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: "follow up".into(), }], }) diff --git a/codex-rs/core/tests/suite/stream_no_completed.rs b/codex-rs/core/tests/suite/stream_no_completed.rs index 4ef2b04fa1..19c2e24f0f 100644 --- a/codex-rs/core/tests/suite/stream_no_completed.rs +++ b/codex-rs/core/tests/suite/stream_no_completed.rs @@ -6,8 +6,8 @@ use std::time::Duration; use codex_core::ModelProviderInfo; use codex_core::WireApi; use codex_core::protocol::EventMsg; -use codex_core::protocol::InputItem; use codex_core::protocol::Op; +use codex_protocol::user_input::UserInput; use core_test_support::load_sse_fixture; use core_test_support::load_sse_fixture_with_id; use core_test_support::skip_if_no_network; @@ -94,7 +94,7 @@ async fn retries_on_early_close() { codex .submit(Op::UserInput { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: "hello".into(), }], }) diff --git a/codex-rs/core/tests/suite/tool_harness.rs b/codex-rs/core/tests/suite/tool_harness.rs index 68bd76bd8e..d9a7b0ead7 100644 --- a/codex-rs/core/tests/suite/tool_harness.rs +++ b/codex-rs/core/tests/suite/tool_harness.rs @@ -7,11 +7,11 @@ use codex_core::features::Feature; use codex_core::model_family::find_family_for_model; use codex_core::protocol::AskForApproval; use codex_core::protocol::EventMsg; -use codex_core::protocol::InputItem; use codex_core::protocol::Op; use codex_core::protocol::SandboxPolicy; use codex_protocol::config_types::ReasoningSummary; use codex_protocol::plan_tool::StepStatus; +use codex_protocol::user_input::UserInput; use core_test_support::assert_regex_match; use core_test_support::responses; use core_test_support::responses::ev_apply_patch_function_call; @@ -74,7 +74,7 @@ async fn shell_tool_executes_command_and_streams_output() -> anyhow::Result<()> codex .submit(Op::UserTurn { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: "please run the shell command".into(), }], final_output_json_schema: None, @@ -143,7 +143,7 @@ async fn update_plan_tool_emits_plan_update_event() -> anyhow::Result<()> { codex .submit(Op::UserTurn { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: "please update the plan".into(), }], final_output_json_schema: None, @@ -226,7 +226,7 @@ async fn update_plan_tool_rejects_malformed_payload() -> anyhow::Result<()> { codex .submit(Op::UserTurn { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: "please update the plan".into(), }], final_output_json_schema: None, @@ -324,7 +324,7 @@ async fn apply_patch_tool_executes_and_emits_patch_events() -> anyhow::Result<() codex .submit(Op::UserTurn { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: "please apply a patch".into(), }], final_output_json_schema: None, @@ -425,7 +425,7 @@ async fn apply_patch_reports_parse_diagnostics() -> anyhow::Result<()> { codex .submit(Op::UserTurn { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: "please apply a patch".into(), }], final_output_json_schema: None, diff --git a/codex-rs/core/tests/suite/tool_parallelism.rs b/codex-rs/core/tests/suite/tool_parallelism.rs index b4e3d1c9ad..a1e96fa02f 100644 --- a/codex-rs/core/tests/suite/tool_parallelism.rs +++ b/codex-rs/core/tests/suite/tool_parallelism.rs @@ -7,10 +7,10 @@ use std::time::Instant; use codex_core::model_family::find_family_for_model; use codex_core::protocol::AskForApproval; use codex_core::protocol::EventMsg; -use codex_core::protocol::InputItem; use codex_core::protocol::Op; use codex_core::protocol::SandboxPolicy; use codex_protocol::config_types::ReasoningSummary; +use codex_protocol::user_input::UserInput; use core_test_support::responses::ev_assistant_message; use core_test_support::responses::ev_completed; use core_test_support::responses::ev_function_call; @@ -28,7 +28,7 @@ async fn run_turn(test: &TestCodex, prompt: &str) -> anyhow::Result<()> { test.codex .submit(Op::UserTurn { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: prompt.into(), }], final_output_json_schema: None, diff --git a/codex-rs/core/tests/suite/tools.rs b/codex-rs/core/tests/suite/tools.rs index ec07b0cdbd..46dd0ba811 100644 --- a/codex-rs/core/tests/suite/tools.rs +++ b/codex-rs/core/tests/suite/tools.rs @@ -6,10 +6,10 @@ use codex_core::features::Feature; use codex_core::model_family::find_family_for_model; use codex_core::protocol::AskForApproval; use codex_core::protocol::EventMsg; -use codex_core::protocol::InputItem; use codex_core::protocol::Op; use codex_core::protocol::SandboxPolicy; use codex_protocol::config_types::ReasoningSummary; +use codex_protocol::user_input::UserInput; use core_test_support::assert_regex_match; use core_test_support::responses::ev_assistant_message; use core_test_support::responses::ev_completed; @@ -38,7 +38,7 @@ async fn submit_turn( test.codex .submit(Op::UserTurn { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: prompt.into(), }], final_output_json_schema: None, diff --git a/codex-rs/core/tests/suite/unified_exec.rs b/codex-rs/core/tests/suite/unified_exec.rs index 6298ab06de..78a1abf16b 100644 --- a/codex-rs/core/tests/suite/unified_exec.rs +++ b/codex-rs/core/tests/suite/unified_exec.rs @@ -6,10 +6,10 @@ use anyhow::Result; use codex_core::features::Feature; use codex_core::protocol::AskForApproval; use codex_core::protocol::EventMsg; -use codex_core::protocol::InputItem; use codex_core::protocol::Op; use codex_core::protocol::SandboxPolicy; use codex_protocol::config_types::ReasoningSummary; +use codex_protocol::user_input::UserInput; use core_test_support::responses::ev_assistant_message; use core_test_support::responses::ev_completed; use core_test_support::responses::ev_function_call; @@ -118,7 +118,7 @@ async fn unified_exec_reuses_session_via_stdin() -> Result<()> { codex .submit(Op::UserTurn { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: "run unified exec".into(), }], final_output_json_schema: None, @@ -254,7 +254,7 @@ PY codex .submit(Op::UserTurn { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: "exercise lag handling".into(), }], final_output_json_schema: None, @@ -360,7 +360,7 @@ async fn unified_exec_timeout_and_followup_poll() -> Result<()> { codex .submit(Op::UserTurn { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: "check timeout".into(), }], final_output_json_schema: None, diff --git a/codex-rs/core/tests/suite/user_notification.rs b/codex-rs/core/tests/suite/user_notification.rs index 3390f4a65a..11deb70f3d 100644 --- a/codex-rs/core/tests/suite/user_notification.rs +++ b/codex-rs/core/tests/suite/user_notification.rs @@ -3,8 +3,8 @@ use std::os::unix::fs::PermissionsExt; use codex_core::protocol::EventMsg; -use codex_core::protocol::InputItem; use codex_core::protocol::Op; +use codex_protocol::user_input::UserInput; use core_test_support::fs_wait; use core_test_support::responses; use core_test_support::skip_if_no_network; @@ -52,7 +52,7 @@ echo -n "${@: -1}" > $(dirname "${0}")/notify.txt"#, // 1) Normal user input – should hit server once. codex .submit(Op::UserInput { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: "hello world".into(), }], }) diff --git a/codex-rs/core/tests/suite/view_image.rs b/codex-rs/core/tests/suite/view_image.rs index bdb67ad631..e0dd2ab051 100644 --- a/codex-rs/core/tests/suite/view_image.rs +++ b/codex-rs/core/tests/suite/view_image.rs @@ -4,10 +4,10 @@ use base64::Engine; use base64::engine::general_purpose::STANDARD as BASE64_STANDARD; use codex_core::protocol::AskForApproval; use codex_core::protocol::EventMsg; -use codex_core::protocol::InputItem; use codex_core::protocol::Op; use codex_core::protocol::SandboxPolicy; use codex_protocol::config_types::ReasoningSummary; +use codex_protocol::user_input::UserInput; use core_test_support::responses; use core_test_support::responses::ev_assistant_message; use core_test_support::responses::ev_completed; @@ -90,7 +90,7 @@ async fn view_image_tool_attaches_local_image() -> anyhow::Result<()> { codex .submit(Op::UserTurn { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: "please add the screenshot".into(), }], final_output_json_schema: None, @@ -189,7 +189,7 @@ async fn view_image_tool_errors_when_path_is_directory() -> anyhow::Result<()> { codex .submit(Op::UserTurn { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: "please attach the folder".into(), }], final_output_json_schema: None, @@ -254,7 +254,7 @@ async fn view_image_tool_errors_when_file_missing() -> anyhow::Result<()> { codex .submit(Op::UserTurn { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: "please attach the missing image".into(), }], final_output_json_schema: None, diff --git a/codex-rs/exec/src/event_processor_with_human_output.rs b/codex-rs/exec/src/event_processor_with_human_output.rs index f9aa3f8598..9a85c5de71 100644 --- a/codex-rs/exec/src/event_processor_with_human_output.rs +++ b/codex-rs/exec/src/event_processor_with_human_output.rs @@ -517,6 +517,8 @@ impl EventProcessor for EventProcessorWithHumanOutput { EventMsg::AgentMessageDelta(_) => {} EventMsg::AgentReasoningDelta(_) => {} EventMsg::AgentReasoningRawContentDelta(_) => {} + EventMsg::ItemStarted(_) => {} + EventMsg::ItemCompleted(_) => {} } CodexStatus::Running } diff --git a/codex-rs/exec/src/lib.rs b/codex-rs/exec/src/lib.rs index afd9d44a73..f66c1db2df 100644 --- a/codex-rs/exec/src/lib.rs +++ b/codex-rs/exec/src/lib.rs @@ -23,12 +23,12 @@ use codex_core::git_info::get_git_repo_root; use codex_core::protocol::AskForApproval; use codex_core::protocol::Event; use codex_core::protocol::EventMsg; -use codex_core::protocol::InputItem; use codex_core::protocol::Op; use codex_core::protocol::SessionSource; use codex_core::protocol::TaskCompleteEvent; use codex_ollama::DEFAULT_OSS_MODEL; use codex_protocol::config_types::SandboxMode; +use codex_protocol::user_input::UserInput; use event_processor_with_human_output::EventProcessorWithHumanOutput; use event_processor_with_jsonl_output::EventProcessorWithJsonOutput; use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge; @@ -328,9 +328,9 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option) -> any // Send images first, if any. if !images.is_empty() { - let items: Vec = images + let items: Vec = images .into_iter() - .map(|path| InputItem::LocalImage { path }) + .map(|path| UserInput::LocalImage { path }) .collect(); let initial_images_event_id = conversation.submit(Op::UserInput { items }).await?; info!("Sent images with event ID: {initial_images_event_id}"); @@ -349,7 +349,7 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option) -> any } // Send the prompt. - let items: Vec = vec![InputItem::Text { text: prompt }]; + let items: Vec = vec![UserInput::Text { text: prompt }]; let initial_prompt_task_id = conversation .submit(Op::UserTurn { items, diff --git a/codex-rs/mcp-server/src/codex_tool_runner.rs b/codex-rs/mcp-server/src/codex_tool_runner.rs index 370dde0e82..a59755008d 100644 --- a/codex-rs/mcp-server/src/codex_tool_runner.rs +++ b/codex-rs/mcp-server/src/codex_tool_runner.rs @@ -18,11 +18,11 @@ use codex_core::protocol::ApplyPatchApprovalRequestEvent; use codex_core::protocol::Event; use codex_core::protocol::EventMsg; use codex_core::protocol::ExecApprovalRequestEvent; -use codex_core::protocol::InputItem; use codex_core::protocol::Op; use codex_core::protocol::Submission; use codex_core::protocol::TaskCompleteEvent; use codex_protocol::ConversationId; +use codex_protocol::user_input::UserInput; use mcp_types::CallToolResult; use mcp_types::ContentBlock; use mcp_types::RequestId; @@ -91,7 +91,7 @@ pub async fn run_codex_tool_session( let submission = Submission { id: sub_id.clone(), op: Op::UserInput { - items: vec![InputItem::Text { + items: vec![UserInput::Text { text: initial_prompt.clone(), }], }, @@ -127,7 +127,7 @@ pub async fn run_codex_tool_session_reply( .insert(request_id.clone(), conversation_id); if let Err(e) = conversation .submit(Op::UserInput { - items: vec![InputItem::Text { text: prompt }], + items: vec![UserInput::Text { text: prompt }], }) .await { @@ -284,6 +284,8 @@ async fn run_codex_tool_session_inner( | EventMsg::ShutdownComplete | EventMsg::ViewImageToolCall(_) | EventMsg::EnteredReviewMode(_) + | EventMsg::ItemStarted(_) + | EventMsg::ItemCompleted(_) | EventMsg::ExitedReviewMode(_) => { // For now, we do not do anything extra for these // events. Note that diff --git a/codex-rs/otel/src/otel_event_manager.rs b/codex-rs/otel/src/otel_event_manager.rs index 666d83f822..486683dae0 100644 --- a/codex-rs/otel/src/otel_event_manager.rs +++ b/codex-rs/otel/src/otel_event_manager.rs @@ -6,9 +6,9 @@ use codex_protocol::config_types::ReasoningEffort; use codex_protocol::config_types::ReasoningSummary; use codex_protocol::models::ResponseItem; use codex_protocol::protocol::AskForApproval; -use codex_protocol::protocol::InputItem; use codex_protocol::protocol::ReviewDecision; use codex_protocol::protocol::SandboxPolicy; +use codex_protocol::user_input::UserInput; use eventsource_stream::Event as StreamEvent; use eventsource_stream::EventStreamError as StreamError; use reqwest::Error; @@ -308,11 +308,11 @@ impl OtelEventManager { ); } - pub fn user_prompt(&self, items: &[InputItem]) { + pub fn user_prompt(&self, items: &[UserInput]) { let prompt = items .iter() .flat_map(|item| match item { - InputItem::Text { text } => Some(text.as_str()), + UserInput::Text { text } => Some(text.as_str()), _ => None, }) .collect::(); diff --git a/codex-rs/protocol/Cargo.toml b/codex-rs/protocol/Cargo.toml index f21d326610..0393c85427 100644 --- a/codex-rs/protocol/Cargo.toml +++ b/codex-rs/protocol/Cargo.toml @@ -29,7 +29,7 @@ ts-rs = { workspace = true, features = [ "serde-json-impl", "no-serde-warnings", ] } -uuid = { workspace = true, features = ["serde", "v7"] } +uuid = { workspace = true, features = ["serde", "v7", "v4"] } [dev-dependencies] anyhow = { workspace = true } diff --git a/codex-rs/protocol/src/items.rs b/codex-rs/protocol/src/items.rs new file mode 100644 index 0000000000..e1efc9525b --- /dev/null +++ b/codex-rs/protocol/src/items.rs @@ -0,0 +1,33 @@ +use crate::user_input::UserInput; +use schemars::JsonSchema; +use serde::Deserialize; +use serde::Serialize; +use ts_rs::TS; + +#[derive(Debug, Clone, Deserialize, Serialize, TS, JsonSchema)] +pub enum TurnItem { + UserMessage(UserMessageItem), +} + +#[derive(Debug, Clone, Deserialize, Serialize, TS, JsonSchema)] +pub struct UserMessageItem { + pub id: String, + pub content: Vec, +} + +impl UserMessageItem { + pub fn new(content: &[UserInput]) -> Self { + Self { + id: uuid::Uuid::new_v4().to_string(), + content: content.to_vec(), + } + } +} + +impl TurnItem { + pub fn id(&self) -> String { + match self { + TurnItem::UserMessage(item) => item.id.clone(), + } + } +} diff --git a/codex-rs/protocol/src/lib.rs b/codex-rs/protocol/src/lib.rs index 11ab0b3fd6..e79eff3f56 100644 --- a/codex-rs/protocol/src/lib.rs +++ b/codex-rs/protocol/src/lib.rs @@ -2,9 +2,11 @@ mod conversation_id; pub use conversation_id::ConversationId; pub mod config_types; pub mod custom_prompts; +pub mod items; pub mod message_history; pub mod models; pub mod num_format; pub mod parse_command; pub mod plan_tool; pub mod protocol; +pub mod user_input; diff --git a/codex-rs/protocol/src/models.rs b/codex-rs/protocol/src/models.rs index 285ae666f9..4e99455bd2 100644 --- a/codex-rs/protocol/src/models.rs +++ b/codex-rs/protocol/src/models.rs @@ -8,7 +8,7 @@ use serde::Serialize; use serde::ser::Serializer; use ts_rs::TS; -use crate::protocol::InputItem; +use crate::user_input::UserInput; use schemars::JsonSchema; #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, JsonSchema, TS)] @@ -206,16 +206,16 @@ pub enum ReasoningItemContent { Text { text: String }, } -impl From> for ResponseInputItem { - fn from(items: Vec) -> Self { +impl From> for ResponseInputItem { + fn from(items: Vec) -> Self { Self::Message { role: "user".to_string(), content: items .into_iter() .filter_map(|c| match c { - InputItem::Text { text } => Some(ContentItem::InputText { text }), - InputItem::Image { image_url } => Some(ContentItem::InputImage { image_url }), - InputItem::LocalImage { path } => match std::fs::read(&path) { + UserInput::Text { text } => Some(ContentItem::InputText { text }), + UserInput::Image { image_url } => Some(ContentItem::InputImage { image_url }), + UserInput::LocalImage { path } => match std::fs::read(&path) { Ok(bytes) => { let mime = mime_guess::from_path(&path) .first() diff --git a/codex-rs/protocol/src/protocol.rs b/codex-rs/protocol/src/protocol.rs index e22981745c..c683bb235c 100644 --- a/codex-rs/protocol/src/protocol.rs +++ b/codex-rs/protocol/src/protocol.rs @@ -14,12 +14,14 @@ use crate::ConversationId; use crate::config_types::ReasoningEffort as ReasoningEffortConfig; use crate::config_types::ReasoningSummary as ReasoningSummaryConfig; use crate::custom_prompts::CustomPrompt; +use crate::items::TurnItem; use crate::message_history::HistoryEntry; use crate::models::ContentItem; use crate::models::ResponseItem; use crate::num_format::format_with_separators; use crate::parse_command::ParsedCommand; use crate::plan_tool::UpdatePlanArgs; +use crate::user_input::UserInput; use mcp_types::CallToolResult; use mcp_types::Resource as McpResource; use mcp_types::ResourceTemplate as McpResourceTemplate; @@ -62,14 +64,14 @@ pub enum Op { /// Input from the user UserInput { /// User input items, see `InputItem` - items: Vec, + items: Vec, }, /// Similar to [`Op::UserInput`], but contains additional context required /// for a turn of a [`crate::codex_conversation::CodexConversation`]. UserTurn { /// User input items, see `InputItem` - items: Vec, + items: Vec, /// `cwd` to use with the [`SandboxPolicy`] and potentially tool calls /// such as `local_shell`. @@ -403,28 +405,8 @@ impl SandboxPolicy { } } -/// User input -#[non_exhaustive] -#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, JsonSchema)] -#[serde(tag = "type", rename_all = "snake_case")] -pub enum InputItem { - Text { - text: String, - }, - /// Pre‑encoded data: URI image. - Image { - image_url: String, - }, - - /// Local image path provided by the user. This will be converted to an - /// `Image` variant (base64 data URL) during request serialization. - LocalImage { - path: std::path::PathBuf, - }, -} - /// Event Queue Entry - events from agent -#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)] +#[derive(Debug, Clone, Deserialize, Serialize)] pub struct Event { /// Submission `id` that this event is correlated with. pub id: String, @@ -538,6 +520,23 @@ pub enum EventMsg { /// Exited review mode with an optional final result to apply. ExitedReviewMode(ExitedReviewModeEvent), + + ItemStarted(ItemStartedEvent), + ItemCompleted(ItemCompletedEvent), +} + +#[derive(Debug, Clone, Deserialize, Serialize, TS, JsonSchema)] +pub struct ItemStartedEvent { + pub thread_id: ConversationId, + pub turn_id: String, + pub item: TurnItem, +} + +#[derive(Debug, Clone, Deserialize, Serialize, TS, JsonSchema)] +pub struct ItemCompletedEvent { + pub thread_id: ConversationId, + pub turn_id: String, + pub item: TurnItem, } #[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS)] diff --git a/codex-rs/protocol/src/user_input.rs b/codex-rs/protocol/src/user_input.rs new file mode 100644 index 0000000000..881b996514 --- /dev/null +++ b/codex-rs/protocol/src/user_input.rs @@ -0,0 +1,24 @@ +use schemars::JsonSchema; +use serde::Deserialize; +use serde::Serialize; +use ts_rs::TS; + +/// User input +#[non_exhaustive] +#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, TS, JsonSchema)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum UserInput { + Text { + text: String, + }, + /// Pre‑encoded data: URI image. + Image { + image_url: String, + }, + + /// Local image path provided by the user. This will be converted to an + /// `Image` variant (base64 data URL) during request serialization. + LocalImage { + path: std::path::PathBuf, + }, +} diff --git a/codex-rs/tui/src/chatwidget.rs b/codex-rs/tui/src/chatwidget.rs index 8dc6454e4b..ff1fee20dc 100644 --- a/codex-rs/tui/src/chatwidget.rs +++ b/codex-rs/tui/src/chatwidget.rs @@ -23,7 +23,6 @@ use codex_core::protocol::ExecApprovalRequestEvent; use codex_core::protocol::ExecCommandBeginEvent; use codex_core::protocol::ExecCommandEndEvent; use codex_core::protocol::ExitedReviewModeEvent; -use codex_core::protocol::InputItem; use codex_core::protocol::InputMessageKind; use codex_core::protocol::ListCustomPromptsResponseEvent; use codex_core::protocol::McpListToolsResponseEvent; @@ -45,6 +44,7 @@ use codex_core::protocol::WebSearchBeginEvent; use codex_core::protocol::WebSearchEndEvent; use codex_protocol::ConversationId; use codex_protocol::parse_command::ParsedCommand; +use codex_protocol::user_input::UserInput; use crossterm::event::KeyCode; use crossterm::event::KeyEvent; use crossterm::event::KeyEventKind; @@ -1316,14 +1316,14 @@ impl ChatWidget { self.capture_ghost_snapshot(); - let mut items: Vec = Vec::new(); + let mut items: Vec = Vec::new(); if !text.is_empty() { - items.push(InputItem::Text { text: text.clone() }); + items.push(UserInput::Text { text: text.clone() }); } for path in image_paths { - items.push(InputItem::LocalImage { path }); + items.push(UserInput::LocalImage { path }); } self.codex_op_tx @@ -1510,6 +1510,7 @@ impl ChatWidget { self.on_entered_review_mode(review_request) } EventMsg::ExitedReviewMode(review) => self.on_exited_review_mode(review), + EventMsg::ItemStarted(_) | EventMsg::ItemCompleted(_) => {} } }