Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion codex-rs/app-server/src/codex_message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ use codex_core::protocol::ApplyPatchApprovalRequestEvent;
use codex_core::protocol::Event;
use codex_core::protocol::EventMsg;
use codex_core::protocol::ExecApprovalRequestEvent;
use codex_core::protocol::InputItem as CoreInputItem;
use codex_core::protocol::Op;
use codex_core::protocol::ReviewDecision;
use codex_login::ServerOptions as LoginServerOptions;
Expand All @@ -89,6 +88,7 @@ use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::InputMessageKind;
use codex_protocol::protocol::USER_MESSAGE_BEGIN;
use codex_protocol::user_input::UserInput as CoreInputItem;
use codex_utils_json_to_toml::json_to_toml;
use std::collections::HashMap;
use std::ffi::OsStr;
Expand Down
67 changes: 44 additions & 23 deletions codex-rs/core/src/codex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,15 @@ use crate::client_common::REVIEW_PROMPT;
use crate::event_mapping::map_response_item_to_event_messages;
use crate::function_tool::FunctionCallError;
use crate::review_format::format_review_findings_block;
use crate::state::ItemCollector;
use crate::terminal;
use crate::user_notification::UserNotifier;
use async_channel::Receiver;
use async_channel::Sender;
use codex_apply_patch::ApplyPatchAction;
use codex_protocol::ConversationId;
use codex_protocol::items::TurnItem;
use codex_protocol::items::UserMessageItem;
use codex_protocol::protocol::ConversationPathResponseEvent;
use codex_protocol::protocol::ExitedReviewModeEvent;
use codex_protocol::protocol::McpAuthStatus;
Expand Down Expand Up @@ -86,7 +89,6 @@ use crate::protocol::EventMsg;
use crate::protocol::ExecApprovalRequestEvent;
use crate::protocol::ExecCommandBeginEvent;
use crate::protocol::ExecCommandEndEvent;
use crate::protocol::InputItem;
use crate::protocol::ListCustomPromptsResponseEvent;
use crate::protocol::Op;
use crate::protocol::PatchApplyBeginEvent;
Expand Down Expand Up @@ -131,6 +133,7 @@ use codex_protocol::models::FunctionCallOutputPayload;
use codex_protocol::models::ResponseInputItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::InitialHistory;
use codex_protocol::user_input::UserInput;

pub mod compact;
use self::compact::build_compacted_history;
Expand Down Expand Up @@ -272,6 +275,7 @@ pub(crate) struct TurnContext {
pub(crate) tools_config: ToolsConfig,
pub(crate) is_review_mode: bool,
pub(crate) final_output_json_schema: Option<Value>,
pub(crate) item_collector: ItemCollector,
}

impl TurnContext {
Expand Down Expand Up @@ -360,6 +364,7 @@ impl Session {
provider: ModelProviderInfo,
session_configuration: &SessionConfiguration,
conversation_id: ConversationId,
tx_event: Sender<Event>,
) -> TurnContext {
let config = session_configuration.original_config_do_not_use.clone();
let model_family = find_family_for_model(&session_configuration.model)
Expand Down Expand Up @@ -404,6 +409,7 @@ impl Session {
tools_config,
is_review_mode: false,
final_output_json_schema: None,
item_collector: ItemCollector::new(tx_event, conversation_id, "turn_id".to_string()),
}
}

Expand Down Expand Up @@ -669,6 +675,7 @@ impl Session {
session_configuration.provider.clone(),
&session_configuration,
self.conversation_id,
self.get_tx_event(),
);
if let Some(final_schema) = updates.final_output_json_schema {
turn_context.final_output_json_schema = final_schema;
Expand Down Expand Up @@ -1161,7 +1168,7 @@ impl Session {
}

/// Returns the input if there was no task running to inject into
pub async fn inject_input(&self, input: Vec<InputItem>) -> Result<(), Vec<InputItem>> {
pub async fn inject_input(&self, input: Vec<UserInput>) -> Result<(), Vec<UserInput>> {
let mut active = self.active_turn.lock().await;
match active.as_mut() {
Some(at) => {
Expand Down Expand Up @@ -1369,6 +1376,11 @@ async fn submission_loop(sess: Arc<Session>, config: Arc<Config>, rx_sub: Receiv
}
}

current_context
.item_collector
.started_completed(TurnItem::UserMessage(UserMessageItem::new(&items)))
.await;

sess.spawn_task(Arc::clone(&current_context), sub.id, items, RegularTask)
.await;
previous_context = Some(current_context);
Expand Down Expand Up @@ -1480,7 +1492,7 @@ async fn submission_loop(sess: Arc<Session>, config: Arc<Config>, rx_sub: Receiv
let turn_context = sess.new_turn(SessionSettingsUpdate::default()).await;
// Attempt to inject input into current task
if let Err(items) = sess
.inject_input(vec![InputItem::Text {
.inject_input(vec![UserInput::Text {
text: compact::SUMMARIZATION_PROMPT.to_string(),
}])
.await
Expand Down Expand Up @@ -1633,10 +1645,15 @@ async fn spawn_review_thread(
cwd: parent_turn_context.cwd.clone(),
is_review_mode: true,
final_output_json_schema: None,
item_collector: ItemCollector::new(
sess.get_tx_event(),
sess.conversation_id,
sub_id.to_string(),
),
};

// Seed the child task with the review prompt as the initial user message.
let input: Vec<InputItem> = vec![InputItem::Text {
let input: Vec<UserInput> = vec![UserInput::Text {
text: review_prompt,
}];
let tc = Arc::new(review_turn_context);
Expand Down Expand Up @@ -1674,7 +1691,7 @@ pub(crate) async fn run_task(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
sub_id: String,
input: Vec<InputItem>,
input: Vec<UserInput>,
task_kind: TaskKind,
cancellation_token: CancellationToken,
) -> Option<String> {
Expand Down Expand Up @@ -2842,6 +2859,15 @@ mod tests {
otel_event_manager: otel_event_manager.clone(),
};

let turn_context = Session::make_turn_context(
Some(Arc::clone(&auth_manager)),
&otel_event_manager,
session_configuration.provider.clone(),
&session_configuration,
conversation_id,
tx_event.clone(),
);

let session = Session {
conversation_id,
tx_event,
Expand All @@ -2851,13 +2877,6 @@ mod tests {
next_internal_sub_id: AtomicU64::new(0),
};

let turn_context = Session::make_turn_context(
Some(Arc::clone(&auth_manager)),
&otel_event_manager,
session_configuration.provider.clone(),
&session_configuration,
conversation_id,
);
(session, turn_context)
}

Expand Down Expand Up @@ -2913,6 +2932,15 @@ mod tests {
otel_event_manager: otel_event_manager.clone(),
};

let turn_context = Arc::new(Session::make_turn_context(
Some(Arc::clone(&auth_manager)),
&otel_event_manager,
session_configuration.provider.clone(),
&session_configuration,
conversation_id,
tx_event.clone(),
));

let session = Arc::new(Session {
conversation_id,
tx_event,
Expand All @@ -2922,13 +2950,6 @@ mod tests {
next_internal_sub_id: AtomicU64::new(0),
});

let turn_context = Arc::new(Session::make_turn_context(
Some(Arc::clone(&auth_manager)),
&otel_event_manager,
session_configuration.provider.clone(),
&session_configuration,
conversation_id,
));
(session, turn_context, rx_event)
}

Expand All @@ -2949,7 +2970,7 @@ mod tests {
_session: Arc<SessionTaskContext>,
_ctx: Arc<TurnContext>,
_sub_id: String,
_input: Vec<InputItem>,
_input: Vec<UserInput>,
cancellation_token: CancellationToken,
) -> Option<String> {
if self.listen_to_cancellation_token {
Expand All @@ -2973,7 +2994,7 @@ mod tests {
async fn abort_regular_task_emits_turn_aborted_only() {
let (sess, tc, rx) = make_session_and_context_with_rx();
let sub_id = "sub-regular".to_string();
let input = vec![InputItem::Text {
let input = vec![UserInput::Text {
text: "hello".to_string(),
}];
sess.spawn_task(
Expand Down Expand Up @@ -3004,7 +3025,7 @@ mod tests {
async fn abort_gracefuly_emits_turn_aborted_only() {
let (sess, tc, rx) = make_session_and_context_with_rx();
let sub_id = "sub-regular".to_string();
let input = vec![InputItem::Text {
let input = vec![UserInput::Text {
text: "hello".to_string(),
}];
sess.spawn_task(
Expand Down Expand Up @@ -3032,7 +3053,7 @@ mod tests {
async fn abort_review_task_emits_exited_then_aborted_and_records_history() {
let (sess, tc, rx) = make_session_and_context_with_rx();
let sub_id = "sub-review".to_string();
let input = vec![InputItem::Text {
let input = vec![UserInput::Text {
text: "start review".to_string(),
}];
sess.spawn_task(
Expand Down
8 changes: 4 additions & 4 deletions codex-rs/core/src/codex/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use crate::protocol::CompactedItem;
use crate::protocol::ErrorEvent;
use crate::protocol::Event;
use crate::protocol::EventMsg;
use crate::protocol::InputItem;
use crate::protocol::InputMessageKind;
use crate::protocol::TaskStartedEvent;
use crate::protocol::TurnContextItem;
Expand All @@ -24,6 +23,7 @@ use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseInputItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::user_input::UserInput;
use futures::prelude::*;

pub const SUMMARIZATION_PROMPT: &str = include_str!("../../templates/compact/prompt.md");
Expand All @@ -41,7 +41,7 @@ pub(crate) async fn run_inline_auto_compact_task(
turn_context: Arc<TurnContext>,
) {
let sub_id = sess.next_internal_sub_id();
let input = vec![InputItem::Text {
let input = vec![UserInput::Text {
text: SUMMARIZATION_PROMPT.to_string(),
}];
run_compact_task_inner(sess, turn_context, sub_id, input).await;
Expand All @@ -51,7 +51,7 @@ pub(crate) async fn run_compact_task(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
sub_id: String,
input: Vec<InputItem>,
input: Vec<UserInput>,
) -> Option<String> {
let start_event = Event {
id: sub_id.clone(),
Expand All @@ -68,7 +68,7 @@ async fn run_compact_task_inner(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
sub_id: String,
input: Vec<InputItem>,
input: Vec<UserInput>,
) {
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input);
let mut turn_input = sess
Expand Down
4 changes: 3 additions & 1 deletion codex-rs/core/src/rollout/policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ pub(crate) fn should_persist_event_msg(ev: &EventMsg) -> bool {
| EventMsg::PlanUpdate(_)
| EventMsg::ShutdownComplete
| EventMsg::ViewImageToolCall(_)
| EventMsg::ConversationPath(_) => false,
| EventMsg::ConversationPath(_)
| EventMsg::ItemStarted(_)
| EventMsg::ItemCompleted(_) => false,
}
}
68 changes: 68 additions & 0 deletions codex-rs/core/src/state/item_collector.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
use async_channel::Sender;
use codex_protocol::ConversationId;
use codex_protocol::items::TurnItem;
use codex_protocol::protocol::Event;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::ItemCompletedEvent;
use codex_protocol::protocol::ItemStartedEvent;
use tracing::error;

#[derive(Debug)]
pub(crate) struct ItemCollector {
thread_id: ConversationId,
turn_id: String,
tx_event: Sender<Event>,
}

impl ItemCollector {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't pay too much attention to this. I imagine it'll change a lot when we add more interesting items

pub fn new(
tx_event: Sender<Event>,
thread_id: ConversationId,
turn_id: String,
) -> ItemCollector {
ItemCollector {
tx_event,
thread_id,
turn_id,
}
}

pub async fn started(&self, item: TurnItem) {
let err = self
.tx_event
.send(Event {
id: self.turn_id.clone(),
msg: EventMsg::ItemStarted(ItemStartedEvent {
thread_id: self.thread_id,
turn_id: self.turn_id.clone(),
item,
}),
})
.await;
if let Err(e) = err {
error!("failed to send item started event: {e}");
}
}

pub async fn completed(&self, item: TurnItem) {
let err = self
.tx_event
.send(Event {
id: self.turn_id.clone(),
msg: EventMsg::ItemCompleted(ItemCompletedEvent {
thread_id: self.thread_id,
turn_id: self.turn_id.clone(),
item,
}),
})
.await;
if let Err(e) = err {
error!("failed to send item completed event: {e}");
}
}

pub async fn started_completed(&self, item: TurnItem) {
self.started(item.clone()).await;
self.completed(item).await;
}
}
2 changes: 2 additions & 0 deletions codex-rs/core/src/state/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
mod item_collector;
mod service;
mod session;
mod turn;

pub(crate) use item_collector::ItemCollector;
pub(crate) use service::SessionServices;
pub(crate) use session::SessionState;
pub(crate) use turn::ActiveTurn;
Expand Down
4 changes: 2 additions & 2 deletions codex-rs/core/src/tasks/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use tokio_util::sync::CancellationToken;

use crate::codex::TurnContext;
use crate::codex::compact;
use crate::protocol::InputItem;
use crate::state::TaskKind;
use codex_protocol::user_input::UserInput;

use super::SessionTask;
use super::SessionTaskContext;
Expand All @@ -25,7 +25,7 @@ impl SessionTask for CompactTask {
session: Arc<SessionTaskContext>,
ctx: Arc<TurnContext>,
sub_id: String,
input: Vec<InputItem>,
input: Vec<UserInput>,
_cancellation_token: CancellationToken,
) -> Option<String> {
compact::run_compact_task(session.clone_session(), ctx, sub_id, input).await
Expand Down
Loading