@@ -6,6 +6,7 @@ use std::io;
66use std:: path:: Path ;
77use std:: path:: PathBuf ;
88use std:: process:: ExitStatus ;
9+ use std:: sync:: Arc ;
910use std:: time:: Duration ;
1011use std:: time:: Instant ;
1112
@@ -84,6 +85,45 @@ pub struct StdoutStream {
8485 pub tx_event : Sender < Event > ,
8586}
8687
88+ type DeltaEventFn = dyn Fn ( & str , ExecOutputStream , Vec < u8 > ) -> EventMsg + Send + Sync ;
89+
90+ #[ derive( Clone ) ]
91+ pub struct DeltaEventBuilder {
92+ inner : Arc < DeltaEventFn > ,
93+ }
94+
95+ impl DeltaEventBuilder {
96+ pub fn exec_command ( ) -> Self {
97+ Self {
98+ inner : Arc :: new ( |call_id, stream, chunk| {
99+ EventMsg :: ExecCommandOutputDelta ( ExecCommandOutputDeltaEvent {
100+ call_id : call_id. to_string ( ) ,
101+ stream,
102+ chunk,
103+ is_user_shell_command : false ,
104+ } )
105+ } ) ,
106+ }
107+ }
108+
109+ pub fn user_command ( ) -> Self {
110+ Self {
111+ inner : Arc :: new ( |call_id, stream, chunk| {
112+ EventMsg :: ExecCommandOutputDelta ( ExecCommandOutputDeltaEvent {
113+ call_id : call_id. to_string ( ) ,
114+ stream,
115+ chunk,
116+ is_user_shell_command : true ,
117+ } )
118+ } ) ,
119+ }
120+ }
121+
122+ pub fn build ( & self , call_id : & str , stream : ExecOutputStream , chunk : Vec < u8 > ) -> EventMsg {
123+ ( self . inner ) ( call_id, stream, chunk)
124+ }
125+ }
126+
87127pub async fn process_exec_tool_call (
88128 params : ExecParams ,
89129 sandbox_type : SandboxType ,
@@ -138,6 +178,7 @@ pub(crate) async fn execute_exec_env(
138178 env : ExecEnv ,
139179 sandbox_policy : & SandboxPolicy ,
140180 stdout_stream : Option < StdoutStream > ,
181+ delta_event_builder : Option < DeltaEventBuilder > ,
141182) -> Result < ExecToolCallOutput > {
142183 let ExecEnv {
143184 command,
@@ -161,7 +202,15 @@ pub(crate) async fn execute_exec_env(
161202 } ;
162203
163204 let start = Instant :: now ( ) ;
164- let raw_output_result = exec ( params, sandbox, sandbox_policy, stdout_stream) . await ;
205+ let delta_event_builder = delta_event_builder. unwrap_or_else ( DeltaEventBuilder :: exec_command) ;
206+ let raw_output_result = exec (
207+ params,
208+ sandbox,
209+ sandbox_policy,
210+ stdout_stream,
211+ delta_event_builder. clone ( ) ,
212+ )
213+ . await ;
165214 let duration = start. elapsed ( ) ;
166215 finalize_exec_result ( raw_output_result, sandbox, duration)
167216}
@@ -434,6 +483,7 @@ async fn exec(
434483 sandbox : SandboxType ,
435484 sandbox_policy : & SandboxPolicy ,
436485 stdout_stream : Option < StdoutStream > ,
486+ delta_event_builder : DeltaEventBuilder ,
437487) -> Result < RawExecToolCallOutput > {
438488 #[ cfg( target_os = "windows" ) ]
439489 if sandbox == SandboxType :: WindowsRestrictedToken {
@@ -465,7 +515,7 @@ async fn exec(
465515 env,
466516 )
467517 . await ?;
468- consume_truncated_output ( child, timeout, stdout_stream) . await
518+ consume_truncated_output ( child, timeout, stdout_stream, delta_event_builder ) . await
469519}
470520
471521/// Consumes the output of a child process, truncating it so it is suitable for
@@ -474,6 +524,7 @@ async fn consume_truncated_output(
474524 mut child : Child ,
475525 timeout : Duration ,
476526 stdout_stream : Option < StdoutStream > ,
527+ delta_event_builder : DeltaEventBuilder ,
477528) -> Result < RawExecToolCallOutput > {
478529 // Both stdout and stderr were configured with `Stdio::piped()`
479530 // above, therefore `take()` should normally return `Some`. If it doesn't
@@ -497,12 +548,14 @@ async fn consume_truncated_output(
497548 stdout_stream. clone ( ) ,
498549 false ,
499550 Some ( agg_tx. clone ( ) ) ,
551+ delta_event_builder. clone ( ) ,
500552 ) ) ;
501553 let stderr_handle = tokio:: spawn ( read_capped (
502554 BufReader :: new ( stderr_reader) ,
503555 stdout_stream. clone ( ) ,
504556 true ,
505557 Some ( agg_tx. clone ( ) ) ,
558+ delta_event_builder. clone ( ) ,
506559 ) ) ;
507560
508561 let ( exit_status, timed_out) = tokio:: select! {
@@ -554,6 +607,7 @@ async fn read_capped<R: AsyncRead + Unpin + Send + 'static>(
554607 stream : Option < StdoutStream > ,
555608 is_stderr : bool ,
556609 aggregate_tx : Option < Sender < Vec < u8 > > > ,
610+ delta_event_builder : DeltaEventBuilder ,
557611) -> io:: Result < StreamOutput < Vec < u8 > > > {
558612 let mut buf = Vec :: with_capacity ( AGGREGATE_BUFFER_INITIAL_CAPACITY ) ;
559613 let mut tmp = [ 0u8 ; READ_CHUNK_SIZE ] ;
@@ -571,15 +625,15 @@ async fn read_capped<R: AsyncRead + Unpin + Send + 'static>(
571625 && emitted_deltas < MAX_EXEC_OUTPUT_DELTAS_PER_CALL
572626 {
573627 let chunk = tmp[ ..n] . to_vec ( ) ;
574- let msg = EventMsg :: ExecCommandOutputDelta ( ExecCommandOutputDeltaEvent {
575- call_id : stream. call_id . clone ( ) ,
576- stream : if is_stderr {
628+ let msg = delta_event_builder . build (
629+ & stream. call_id ,
630+ if is_stderr {
577631 ExecOutputStream :: Stderr
578632 } else {
579633 ExecOutputStream :: Stdout
580634 } ,
581635 chunk,
582- } ) ;
636+ ) ;
583637 let event = Event {
584638 id : stream. sub_id . clone ( ) ,
585639 msg,
0 commit comments