Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion codex-rs/app-server/src/codex_message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ use codex_core::protocol::ApplyPatchApprovalRequestEvent;
use codex_core::protocol::Event;
use codex_core::protocol::EventMsg;
use codex_core::protocol::ExecApprovalRequestEvent;
use codex_core::protocol::InputItem as CoreInputItem;
use codex_core::protocol::Op;
use codex_core::protocol::ReviewDecision;
use codex_login::ServerOptions as LoginServerOptions;
Expand All @@ -89,6 +88,7 @@ use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::InputMessageKind;
use codex_protocol::protocol::USER_MESSAGE_BEGIN;
use codex_protocol::user_input::UserInput as CoreInputItem;
use codex_utils_json_to_toml::json_to_toml;
use std::collections::HashMap;
use std::ffi::OsStr;
Expand Down
67 changes: 44 additions & 23 deletions codex-rs/core/src/codex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,15 @@ use crate::event_mapping::map_response_item_to_event_messages;
use crate::function_tool::FunctionCallError;
use crate::parse_command::parse_command;
use crate::review_format::format_review_findings_block;
use crate::state::ItemCollector;
use crate::terminal;
use crate::user_notification::UserNotifier;
use async_channel::Receiver;
use async_channel::Sender;
use codex_apply_patch::ApplyPatchAction;
use codex_protocol::ConversationId;
use codex_protocol::items::TurnItem;
use codex_protocol::items::UserMessageItem;
use codex_protocol::protocol::ConversationPathResponseEvent;
use codex_protocol::protocol::ExitedReviewModeEvent;
use codex_protocol::protocol::McpAuthStatus;
Expand Down Expand Up @@ -77,7 +80,6 @@ use crate::protocol::ErrorEvent;
use crate::protocol::Event;
use crate::protocol::EventMsg;
use crate::protocol::ExecApprovalRequestEvent;
use crate::protocol::InputItem;
use crate::protocol::ListCustomPromptsResponseEvent;
use crate::protocol::Op;
use crate::protocol::RateLimitSnapshot;
Expand Down Expand Up @@ -122,6 +124,7 @@ use codex_protocol::models::FunctionCallOutputPayload;
use codex_protocol::models::ResponseInputItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::InitialHistory;
use codex_protocol::user_input::UserInput;

pub mod compact;
use self::compact::build_compacted_history;
Expand Down Expand Up @@ -264,6 +267,7 @@ pub(crate) struct TurnContext {
pub(crate) is_review_mode: bool,
pub(crate) final_output_json_schema: Option<Value>,
pub(crate) codex_linux_sandbox_exe: Option<PathBuf>,
pub(crate) item_collector: ItemCollector,
}

impl TurnContext {
Expand Down Expand Up @@ -352,6 +356,7 @@ impl Session {
provider: ModelProviderInfo,
session_configuration: &SessionConfiguration,
conversation_id: ConversationId,
tx_event: Sender<Event>,
) -> TurnContext {
let config = session_configuration.original_config_do_not_use.clone();
let model_family = find_family_for_model(&session_configuration.model)
Expand Down Expand Up @@ -397,6 +402,7 @@ impl Session {
is_review_mode: false,
final_output_json_schema: None,
codex_linux_sandbox_exe: config.codex_linux_sandbox_exe.clone(),
item_collector: ItemCollector::new(tx_event, conversation_id, "turn_id".to_string()),
}
}

Expand Down Expand Up @@ -656,6 +662,7 @@ impl Session {
session_configuration.provider.clone(),
&session_configuration,
self.conversation_id,
self.get_tx_event(),
);
if let Some(final_schema) = updates.final_output_json_schema {
turn_context.final_output_json_schema = final_schema;
Expand Down Expand Up @@ -986,7 +993,7 @@ impl Session {
}

/// Returns the input if there was no task running to inject into
pub async fn inject_input(&self, input: Vec<InputItem>) -> Result<(), Vec<InputItem>> {
pub async fn inject_input(&self, input: Vec<UserInput>) -> Result<(), Vec<UserInput>> {
let mut active = self.active_turn.lock().await;
match active.as_mut() {
Some(at) => {
Expand Down Expand Up @@ -1157,6 +1164,11 @@ async fn submission_loop(sess: Arc<Session>, config: Arc<Config>, rx_sub: Receiv
}
}

current_context
.item_collector
.started_completed(TurnItem::UserMessage(UserMessageItem::new(&items)))
.await;

sess.spawn_task(Arc::clone(&current_context), sub.id, items, RegularTask)
.await;
previous_context = Some(current_context);
Expand Down Expand Up @@ -1268,7 +1280,7 @@ async fn submission_loop(sess: Arc<Session>, config: Arc<Config>, rx_sub: Receiv
let turn_context = sess.new_turn(SessionSettingsUpdate::default()).await;
// Attempt to inject input into current task
if let Err(items) = sess
.inject_input(vec![InputItem::Text {
.inject_input(vec![UserInput::Text {
text: compact::SUMMARIZATION_PROMPT.to_string(),
}])
.await
Expand Down Expand Up @@ -1422,10 +1434,15 @@ async fn spawn_review_thread(
is_review_mode: true,
final_output_json_schema: None,
codex_linux_sandbox_exe: parent_turn_context.codex_linux_sandbox_exe.clone(),
item_collector: ItemCollector::new(
sess.get_tx_event(),
sess.conversation_id,
sub_id.to_string(),
),
};

// Seed the child task with the review prompt as the initial user message.
let input: Vec<InputItem> = vec![InputItem::Text {
let input: Vec<UserInput> = vec![UserInput::Text {
text: review_prompt,
}];
let tc = Arc::new(review_turn_context);
Expand Down Expand Up @@ -1463,7 +1480,7 @@ pub(crate) async fn run_task(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
sub_id: String,
input: Vec<InputItem>,
input: Vec<UserInput>,
task_kind: TaskKind,
cancellation_token: CancellationToken,
) -> Option<String> {
Expand Down Expand Up @@ -2624,6 +2641,15 @@ mod tests {
tool_approvals: Mutex::new(ApprovalStore::default()),
};

let turn_context = Session::make_turn_context(
Some(Arc::clone(&auth_manager)),
&otel_event_manager,
session_configuration.provider.clone(),
&session_configuration,
conversation_id,
tx_event.clone(),
);

let session = Session {
conversation_id,
tx_event,
Expand All @@ -2633,13 +2659,6 @@ mod tests {
next_internal_sub_id: AtomicU64::new(0),
};

let turn_context = Session::make_turn_context(
Some(Arc::clone(&auth_manager)),
&otel_event_manager,
session_configuration.provider.clone(),
&session_configuration,
conversation_id,
);
(session, turn_context)
}

Expand Down Expand Up @@ -2690,6 +2709,15 @@ mod tests {
tool_approvals: Mutex::new(ApprovalStore::default()),
};

let turn_context = Arc::new(Session::make_turn_context(
Some(Arc::clone(&auth_manager)),
&otel_event_manager,
session_configuration.provider.clone(),
&session_configuration,
conversation_id,
tx_event.clone(),
));

let session = Arc::new(Session {
conversation_id,
tx_event,
Expand All @@ -2699,13 +2727,6 @@ mod tests {
next_internal_sub_id: AtomicU64::new(0),
});

let turn_context = Arc::new(Session::make_turn_context(
Some(Arc::clone(&auth_manager)),
&otel_event_manager,
session_configuration.provider.clone(),
&session_configuration,
conversation_id,
));
(session, turn_context, rx_event)
}

Expand All @@ -2726,7 +2747,7 @@ mod tests {
_session: Arc<SessionTaskContext>,
_ctx: Arc<TurnContext>,
_sub_id: String,
_input: Vec<InputItem>,
_input: Vec<UserInput>,
cancellation_token: CancellationToken,
) -> Option<String> {
if self.listen_to_cancellation_token {
Expand All @@ -2750,7 +2771,7 @@ mod tests {
async fn abort_regular_task_emits_turn_aborted_only() {
let (sess, tc, rx) = make_session_and_context_with_rx();
let sub_id = "sub-regular".to_string();
let input = vec![InputItem::Text {
let input = vec![UserInput::Text {
text: "hello".to_string(),
}];
sess.spawn_task(
Expand Down Expand Up @@ -2781,7 +2802,7 @@ mod tests {
async fn abort_gracefuly_emits_turn_aborted_only() {
let (sess, tc, rx) = make_session_and_context_with_rx();
let sub_id = "sub-regular".to_string();
let input = vec![InputItem::Text {
let input = vec![UserInput::Text {
text: "hello".to_string(),
}];
sess.spawn_task(
Expand Down Expand Up @@ -2809,7 +2830,7 @@ mod tests {
async fn abort_review_task_emits_exited_then_aborted_and_records_history() {
let (sess, tc, rx) = make_session_and_context_with_rx();
let sub_id = "sub-review".to_string();
let input = vec![InputItem::Text {
let input = vec![UserInput::Text {
text: "start review".to_string(),
}];
sess.spawn_task(
Expand Down
8 changes: 4 additions & 4 deletions codex-rs/core/src/codex/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use crate::protocol::CompactedItem;
use crate::protocol::ErrorEvent;
use crate::protocol::Event;
use crate::protocol::EventMsg;
use crate::protocol::InputItem;
use crate::protocol::InputMessageKind;
use crate::protocol::TaskStartedEvent;
use crate::protocol::TurnContextItem;
Expand All @@ -24,6 +23,7 @@ use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseInputItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::user_input::UserInput;
use futures::prelude::*;

pub const SUMMARIZATION_PROMPT: &str = include_str!("../../templates/compact/prompt.md");
Expand All @@ -41,7 +41,7 @@ pub(crate) async fn run_inline_auto_compact_task(
turn_context: Arc<TurnContext>,
) {
let sub_id = sess.next_internal_sub_id();
let input = vec![InputItem::Text {
let input = vec![UserInput::Text {
text: SUMMARIZATION_PROMPT.to_string(),
}];
run_compact_task_inner(sess, turn_context, sub_id, input).await;
Expand All @@ -51,7 +51,7 @@ pub(crate) async fn run_compact_task(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
sub_id: String,
input: Vec<InputItem>,
input: Vec<UserInput>,
) -> Option<String> {
let start_event = Event {
id: sub_id.clone(),
Expand All @@ -68,7 +68,7 @@ async fn run_compact_task_inner(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
sub_id: String,
input: Vec<InputItem>,
input: Vec<UserInput>,
) {
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input);
let mut turn_input = sess
Expand Down
4 changes: 3 additions & 1 deletion codex-rs/core/src/rollout/policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ pub(crate) fn should_persist_event_msg(ev: &EventMsg) -> bool {
| EventMsg::PlanUpdate(_)
| EventMsg::ShutdownComplete
| EventMsg::ViewImageToolCall(_)
| EventMsg::ConversationPath(_) => false,
| EventMsg::ConversationPath(_)
| EventMsg::ItemStarted(_)
| EventMsg::ItemCompleted(_) => false,
}
}
68 changes: 68 additions & 0 deletions codex-rs/core/src/state/item_collector.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
use async_channel::Sender;
use codex_protocol::ConversationId;
use codex_protocol::items::TurnItem;
use codex_protocol::protocol::Event;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::ItemCompletedEvent;
use codex_protocol::protocol::ItemStartedEvent;
use tracing::error;

#[derive(Debug)]
pub(crate) struct ItemCollector {
thread_id: ConversationId,
turn_id: String,
tx_event: Sender<Event>,
}

impl ItemCollector {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't pay too much attention to this. I imagine it'll change a lot when we add more interesting items

pub fn new(
tx_event: Sender<Event>,
thread_id: ConversationId,
turn_id: String,
) -> ItemCollector {
ItemCollector {
tx_event,
thread_id,
turn_id,
}
}

pub async fn started(&self, item: TurnItem) {
let err = self
.tx_event
.send(Event {
id: self.turn_id.clone(),
msg: EventMsg::ItemStarted(ItemStartedEvent {
thread_id: self.thread_id,
turn_id: self.turn_id.clone(),
item,
}),
})
.await;
if let Err(e) = err {
error!("failed to send item started event: {e}");
}
}

pub async fn completed(&self, item: TurnItem) {
let err = self
.tx_event
.send(Event {
id: self.turn_id.clone(),
msg: EventMsg::ItemCompleted(ItemCompletedEvent {
thread_id: self.thread_id,
turn_id: self.turn_id.clone(),
item,
}),
})
.await;
if let Err(e) = err {
error!("failed to send item completed event: {e}");
}
}

pub async fn started_completed(&self, item: TurnItem) {
self.started(item.clone()).await;
self.completed(item).await;
}
}
2 changes: 2 additions & 0 deletions codex-rs/core/src/state/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
mod item_collector;
mod service;
mod session;
mod turn;

pub(crate) use item_collector::ItemCollector;
pub(crate) use service::SessionServices;
pub(crate) use session::SessionState;
pub(crate) use turn::ActiveTurn;
Expand Down
4 changes: 2 additions & 2 deletions codex-rs/core/src/tasks/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use tokio_util::sync::CancellationToken;

use crate::codex::TurnContext;
use crate::codex::compact;
use crate::protocol::InputItem;
use crate::state::TaskKind;
use codex_protocol::user_input::UserInput;

use super::SessionTask;
use super::SessionTaskContext;
Expand All @@ -25,7 +25,7 @@ impl SessionTask for CompactTask {
session: Arc<SessionTaskContext>,
ctx: Arc<TurnContext>,
sub_id: String,
input: Vec<InputItem>,
input: Vec<UserInput>,
_cancellation_token: CancellationToken,
) -> Option<String> {
compact::run_compact_task(session.clone_session(), ctx, sub_id, input).await
Expand Down
Loading
Loading