Skip to content

Commit 7b77126

Browse files
authored
Merge branch 'main' into feat/shortcut-overlay
2 parents dee7ccb + 0269096 commit 7b77126

21 files changed

+694
-217
lines changed

codex-rs/core/src/client.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -398,9 +398,15 @@ impl From<ResponseCompletedUsage> for TokenUsage {
398398
fn from(val: ResponseCompletedUsage) -> Self {
399399
TokenUsage {
400400
input_tokens: val.input_tokens,
401-
cached_input_tokens: val.input_tokens_details.map(|d| d.cached_tokens),
401+
cached_input_tokens: val
402+
.input_tokens_details
403+
.map(|d| d.cached_tokens)
404+
.unwrap_or(0),
402405
output_tokens: val.output_tokens,
403-
reasoning_output_tokens: val.output_tokens_details.map(|d| d.reasoning_tokens),
406+
reasoning_output_tokens: val
407+
.output_tokens_details
408+
.map(|d| d.reasoning_tokens)
409+
.unwrap_or(0),
404410
total_tokens: val.total_tokens,
405411
}
406412
}

codex-rs/core/src/codex.rs

Lines changed: 31 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ use crate::protocol::SessionConfiguredEvent;
9999
use crate::protocol::StreamErrorEvent;
100100
use crate::protocol::Submission;
101101
use crate::protocol::TaskCompleteEvent;
102+
use crate::protocol::TokenUsageInfo;
102103
use crate::protocol::TurnDiffEvent;
103104
use crate::protocol::WebSearchBeginEvent;
104105
use crate::rollout::RolloutRecorder;
@@ -261,6 +262,7 @@ struct State {
261262
pending_approvals: HashMap<String, oneshot::Sender<ReviewDecision>>,
262263
pending_input: Vec<ResponseInputItem>,
263264
history: ConversationHistory,
265+
token_info: Option<TokenUsageInfo>,
264266
}
265267

266268
/// Context for an initialized model agent
@@ -1767,15 +1769,23 @@ async fn try_run_turn(
17671769
response_id: _,
17681770
token_usage,
17691771
} => {
1770-
if let Some(token_usage) = token_usage {
1771-
sess.tx_event
1772-
.send(Event {
1773-
id: sub_id.to_string(),
1774-
msg: EventMsg::TokenCount(token_usage),
1775-
})
1776-
.await
1777-
.ok();
1778-
}
1772+
let info = {
1773+
let mut st = sess.state.lock_unchecked();
1774+
let info = TokenUsageInfo::new_or_append(
1775+
&st.token_info,
1776+
&token_usage,
1777+
turn_context.client.get_model_context_window(),
1778+
);
1779+
st.token_info = info.clone();
1780+
info
1781+
};
1782+
sess.tx_event
1783+
.send(Event {
1784+
id: sub_id.to_string(),
1785+
msg: EventMsg::TokenCount(crate::protocol::TokenCountEvent { info }),
1786+
})
1787+
.await
1788+
.ok();
17791789

17801790
let unified_diff = turn_diff_tracker.get_unified_diff();
17811791
if let Ok(Some(unified_diff)) = unified_diff {
@@ -2841,13 +2851,21 @@ async fn drain_to_completed(
28412851
response_id: _,
28422852
token_usage,
28432853
}) => {
2844-
// some providers don't return token usage, so we default
2845-
// TODO: consider approximate token usage
2846-
let token_usage = token_usage.unwrap_or_default();
2854+
let info = {
2855+
let mut st = sess.state.lock_unchecked();
2856+
let info = TokenUsageInfo::new_or_append(
2857+
&st.token_info,
2858+
&token_usage,
2859+
turn_context.client.get_model_context_window(),
2860+
);
2861+
st.token_info = info.clone();
2862+
info
2863+
};
2864+
28472865
sess.tx_event
28482866
.send(Event {
28492867
id: sub_id.to_string(),
2850-
msg: EventMsg::TokenCount(token_usage),
2868+
msg: EventMsg::TokenCount(crate::protocol::TokenCountEvent { info }),
28512869
})
28522870
.await
28532871
.ok();

codex-rs/core/src/config.rs

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -732,23 +732,24 @@ impl Config {
732732
.or(config_profile.model)
733733
.or(cfg.model)
734734
.unwrap_or_else(default_model);
735-
let model_family = find_family_for_model(&model).unwrap_or_else(|| {
736-
let supports_reasoning_summaries =
737-
cfg.model_supports_reasoning_summaries.unwrap_or(false);
738-
let reasoning_summary_format = cfg
739-
.model_reasoning_summary_format
740-
.unwrap_or(ReasoningSummaryFormat::None);
741-
ModelFamily {
742-
slug: model.clone(),
743-
family: model.clone(),
744-
needs_special_apply_patch_instructions: false,
745-
supports_reasoning_summaries,
746-
reasoning_summary_format,
747-
uses_local_shell_tool: false,
748-
apply_patch_tool_type: None,
749-
}
735+
736+
let mut model_family = find_family_for_model(&model).unwrap_or_else(|| ModelFamily {
737+
slug: model.clone(),
738+
family: model.clone(),
739+
needs_special_apply_patch_instructions: false,
740+
supports_reasoning_summaries: false,
741+
reasoning_summary_format: ReasoningSummaryFormat::None,
742+
uses_local_shell_tool: false,
743+
apply_patch_tool_type: None,
750744
});
751745

746+
if let Some(supports_reasoning_summaries) = cfg.model_supports_reasoning_summaries {
747+
model_family.supports_reasoning_summaries = supports_reasoning_summaries;
748+
}
749+
if let Some(model_reasoning_summary_format) = cfg.model_reasoning_summary_format {
750+
model_family.reasoning_summary_format = model_reasoning_summary_format;
751+
}
752+
752753
let openai_model_info = get_model_info(&model_family);
753754
let model_context_window = cfg
754755
.model_context_window

codex-rs/exec/src/event_processor_with_human_output.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -189,8 +189,14 @@ impl EventProcessor for EventProcessorWithHumanOutput {
189189
}
190190
return CodexStatus::InitiateShutdown;
191191
}
192-
EventMsg::TokenCount(token_usage) => {
193-
ts_println!(self, "tokens used: {}", token_usage.blended_total());
192+
EventMsg::TokenCount(ev) => {
193+
if let Some(usage_info) = ev.info {
194+
ts_println!(
195+
self,
196+
"tokens used: {}",
197+
usage_info.total_token_usage.blended_total()
198+
);
199+
}
194200
}
195201
EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta }) => {
196202
if !self.answer_started {

codex-rs/login/src/server.rs

Lines changed: 95 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,14 @@
11
use std::io::Cursor;
2+
use std::io::Read;
3+
use std::io::Write;
24
use std::io::{self};
5+
use std::net::SocketAddr;
6+
use std::net::TcpStream;
37
use std::path::Path;
48
use std::path::PathBuf;
59
use std::sync::Arc;
610
use std::thread;
11+
use std::time::Duration;
712

813
use crate::pkce::PkceCodes;
914
use crate::pkce::generate_pkce;
@@ -85,7 +90,7 @@ pub fn run_login_server(opts: ServerOptions) -> io::Result<LoginServer> {
8590
let pkce = generate_pkce();
8691
let state = opts.force_state.clone().unwrap_or_else(generate_state);
8792

88-
let server = Server::http(format!("127.0.0.1:{}", opts.port)).map_err(io::Error::other)?;
93+
let server = bind_server(opts.port)?;
8994
let actual_port = match server.server_addr().to_ip() {
9095
Some(addr) => addr.port(),
9196
None => {
@@ -145,19 +150,24 @@ pub fn run_login_server(opts: ServerOptions) -> io::Result<LoginServer> {
145150
let response =
146151
process_request(&url_raw, &opts, &redirect_uri, &pkce, actual_port, &state).await;
147152

148-
let is_login_complete = matches!(response, HandledRequest::ResponseAndExit(_));
149-
match response {
150-
HandledRequest::Response(r) | HandledRequest::ResponseAndExit(r) => {
151-
let _ = tokio::task::spawn_blocking(move || req.respond(r)).await;
153+
let exit_result = match response {
154+
HandledRequest::Response(response) => {
155+
let _ = tokio::task::spawn_blocking(move || req.respond(response)).await;
156+
None
157+
}
158+
HandledRequest::ResponseAndExit { response, result } => {
159+
let _ = tokio::task::spawn_blocking(move || req.respond(response)).await;
160+
Some(result)
152161
}
153162
HandledRequest::RedirectWithHeader(header) => {
154163
let redirect = Response::empty(302).with_header(header);
155164
let _ = tokio::task::spawn_blocking(move || req.respond(redirect)).await;
165+
None
156166
}
157-
}
167+
};
158168

159-
if is_login_complete {
160-
break Ok(());
169+
if let Some(result) = exit_result {
170+
break result;
161171
}
162172
}
163173
}
@@ -181,7 +191,10 @@ pub fn run_login_server(opts: ServerOptions) -> io::Result<LoginServer> {
181191
enum HandledRequest {
182192
Response(Response<Cursor<Vec<u8>>>),
183193
RedirectWithHeader(Header),
184-
ResponseAndExit(Response<Cursor<Vec<u8>>>),
194+
ResponseAndExit {
195+
response: Response<Cursor<Vec<u8>>>,
196+
result: io::Result<()>,
197+
},
185198
}
186199

187200
async fn process_request(
@@ -276,8 +289,18 @@ async fn process_request(
276289
) {
277290
resp.add_header(h);
278291
}
279-
HandledRequest::ResponseAndExit(resp)
292+
HandledRequest::ResponseAndExit {
293+
response: resp,
294+
result: Ok(()),
295+
}
280296
}
297+
"/cancel" => HandledRequest::ResponseAndExit {
298+
response: Response::from_string("Login cancelled"),
299+
result: Err(io::Error::new(
300+
io::ErrorKind::Interrupted,
301+
"Login cancelled",
302+
)),
303+
},
281304
_ => HandledRequest::Response(Response::from_string("Not Found").with_status_code(404)),
282305
}
283306
}
@@ -316,6 +339,68 @@ fn generate_state() -> String {
316339
base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(bytes)
317340
}
318341

342+
fn send_cancel_request(port: u16) -> io::Result<()> {
343+
let addr: SocketAddr = format!("127.0.0.1:{port}")
344+
.parse()
345+
.map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, err))?;
346+
let mut stream = TcpStream::connect_timeout(&addr, Duration::from_secs(2))?;
347+
stream.set_read_timeout(Some(Duration::from_secs(2)))?;
348+
stream.set_write_timeout(Some(Duration::from_secs(2)))?;
349+
350+
stream.write_all(b"GET /cancel HTTP/1.1\r\n")?;
351+
stream.write_all(format!("Host: 127.0.0.1:{port}\r\n").as_bytes())?;
352+
stream.write_all(b"Connection: close\r\n\r\n")?;
353+
354+
let mut buf = [0u8; 64];
355+
let _ = stream.read(&mut buf);
356+
Ok(())
357+
}
358+
359+
fn bind_server(port: u16) -> io::Result<Server> {
360+
let bind_address = format!("127.0.0.1:{port}");
361+
let mut cancel_attempted = false;
362+
let mut attempts = 0;
363+
const MAX_ATTEMPTS: u32 = 10;
364+
const RETRY_DELAY: Duration = Duration::from_millis(200);
365+
366+
loop {
367+
match Server::http(&bind_address) {
368+
Ok(server) => return Ok(server),
369+
Err(err) => {
370+
attempts += 1;
371+
let is_addr_in_use = err
372+
.downcast_ref::<io::Error>()
373+
.map(|io_err| io_err.kind() == io::ErrorKind::AddrInUse)
374+
.unwrap_or(false);
375+
376+
// If the address is in use, there is probably another instance of the login server
377+
// running. Attempt to cancel it and retry.
378+
if is_addr_in_use {
379+
if !cancel_attempted {
380+
cancel_attempted = true;
381+
if let Err(cancel_err) = send_cancel_request(port) {
382+
eprintln!("Failed to cancel previous login server: {cancel_err}");
383+
}
384+
}
385+
386+
thread::sleep(RETRY_DELAY);
387+
388+
if attempts >= MAX_ATTEMPTS {
389+
return Err(io::Error::new(
390+
io::ErrorKind::AddrInUse,
391+
format!("Port {bind_address} is already in use"),
392+
));
393+
}
394+
395+
continue;
396+
}
397+
398+
return Err(io::Error::other(err));
399+
}
400+
}
401+
}
402+
}
403+
319404
struct ExchangedTokens {
320405
id_token: String,
321406
access_token: String,

codex-rs/login/tests/suite/login_server_e2e.rs

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
#![allow(clippy::unwrap_used)]
2+
use std::io;
23
use std::net::SocketAddr;
34
use std::net::TcpListener;
45
use std::thread;
6+
use std::time::Duration;
57

68
use base64::Engine;
79
use codex_login::ServerOptions;
@@ -177,3 +179,67 @@ async fn creates_missing_codex_home_dir() {
177179
"auth.json should be created even if parent dir was missing"
178180
);
179181
}
182+
183+
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
184+
async fn cancels_previous_login_server_when_port_is_in_use() {
185+
if std::env::var(CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR).is_ok() {
186+
println!(
187+
"Skipping test because it cannot execute when network is disabled in a Codex sandbox."
188+
);
189+
return;
190+
}
191+
192+
let (issuer_addr, _issuer_handle) = start_mock_issuer();
193+
let issuer = format!("http://{}:{}", issuer_addr.ip(), issuer_addr.port());
194+
195+
let first_tmp = tempdir().unwrap();
196+
let first_codex_home = first_tmp.path().to_path_buf();
197+
198+
let first_opts = ServerOptions {
199+
codex_home: first_codex_home,
200+
client_id: codex_login::CLIENT_ID.to_string(),
201+
issuer: issuer.clone(),
202+
port: 0,
203+
open_browser: false,
204+
force_state: Some("cancel_state".to_string()),
205+
originator: "test_originator".to_string(),
206+
};
207+
208+
let first_server = run_login_server(first_opts).unwrap();
209+
let login_port = first_server.actual_port;
210+
let first_server_task = tokio::spawn(async move { first_server.block_until_done().await });
211+
212+
tokio::time::sleep(Duration::from_millis(100)).await;
213+
214+
let second_tmp = tempdir().unwrap();
215+
let second_codex_home = second_tmp.path().to_path_buf();
216+
217+
let second_opts = ServerOptions {
218+
codex_home: second_codex_home,
219+
client_id: codex_login::CLIENT_ID.to_string(),
220+
issuer,
221+
port: login_port,
222+
open_browser: false,
223+
force_state: Some("cancel_state_2".to_string()),
224+
originator: "test_originator".to_string(),
225+
};
226+
227+
let second_server = run_login_server(second_opts).unwrap();
228+
assert_eq!(second_server.actual_port, login_port);
229+
230+
let cancel_result = first_server_task
231+
.await
232+
.expect("first login server task panicked")
233+
.expect_err("login server should report cancellation");
234+
assert_eq!(cancel_result.kind(), io::ErrorKind::Interrupted);
235+
236+
let client = reqwest::Client::new();
237+
let cancel_url = format!("http://127.0.0.1:{login_port}/cancel");
238+
let resp = client.get(cancel_url).send().await.unwrap();
239+
assert!(resp.status().is_success());
240+
241+
second_server
242+
.block_until_done()
243+
.await
244+
.expect_err("second login server should report cancellation");
245+
}

0 commit comments

Comments
 (0)