Skip to content

Commit 5ce37b3

Browse files
sanityclaude
andauthored
fix: implement persistent keep-alive timer to prevent connection timeouts (#1660)
Co-authored-by: Claude <[email protected]>
1 parent 840927f commit 5ce37b3

File tree

1 file changed

+68
-9
lines changed

1 file changed

+68
-9
lines changed

crates/core/src/transport/peer_connection.rs

Lines changed: 68 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ pub(crate) struct PeerConnection {
102102
failure_count: usize,
103103
first_failure_time: Option<std::time::Instant>,
104104
last_packet_report_time: Instant,
105+
keep_alive_handle: Option<JoinHandle<()>>,
105106
}
106107

107108
impl std::fmt::Debug for PeerConnection {
@@ -112,6 +113,15 @@ impl std::fmt::Debug for PeerConnection {
112113
}
113114
}
114115

116+
impl Drop for PeerConnection {
117+
fn drop(&mut self) {
118+
if let Some(handle) = self.keep_alive_handle.take() {
119+
tracing::debug!(remote = ?self.remote_conn.remote_addr, "Cancelling keep-alive task");
120+
handle.abort();
121+
}
122+
}
123+
}
124+
115125
#[cfg(test)]
116126
type PeerConnectionMock = (
117127
PeerConnection,
@@ -128,6 +138,57 @@ type RemoteConnectionMock = (
128138

129139
impl PeerConnection {
130140
pub(super) fn new(remote_conn: RemoteConnection) -> Self {
141+
const KEEP_ALIVE_INTERVAL: Duration = Duration::from_secs(10);
142+
143+
// Start the keep-alive task before creating Self
144+
let remote_addr = remote_conn.remote_addr;
145+
let outbound_packets = remote_conn.outbound_packets.clone();
146+
let outbound_key = remote_conn.outbound_symmetric_key.clone();
147+
let last_packet_id = remote_conn.last_packet_id.clone();
148+
149+
let keep_alive_handle = tokio::spawn(async move {
150+
let mut interval = tokio::time::interval(KEEP_ALIVE_INTERVAL);
151+
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
152+
153+
// Skip the first immediate tick
154+
interval.tick().await;
155+
156+
loop {
157+
interval.tick().await;
158+
159+
tracing::trace!(remote = ?remote_addr, "Keep-alive timer tick - sending NoOp");
160+
161+
// Create a NoOp packet
162+
let packet_id = last_packet_id.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
163+
let noop_packet = match SymmetricMessage::serialize_msg_to_packet_data(
164+
packet_id,
165+
SymmetricMessagePayload::NoOp,
166+
&outbound_key,
167+
vec![], // No receipts for keep-alive
168+
) {
169+
Ok(packet) => packet.prepared_send(),
170+
Err(e) => {
171+
tracing::error!(?e, "Failed to create keep-alive packet");
172+
break;
173+
}
174+
};
175+
176+
// Send the keep-alive packet
177+
if outbound_packets
178+
.send((remote_addr, noop_packet))
179+
.await
180+
.is_err()
181+
{
182+
tracing::debug!(remote = ?remote_addr, "Keep-alive task stopping - channel closed");
183+
break;
184+
}
185+
}
186+
187+
tracing::debug!(remote = ?remote_addr, "Keep-alive task exiting");
188+
});
189+
190+
tracing::info!(remote = ?remote_addr, "PeerConnection created with persistent keep-alive task");
191+
131192
Self {
132193
remote_conn,
133194
received_tracker: ReceivedPacketTracker::new(),
@@ -137,6 +198,7 @@ impl PeerConnection {
137198
failure_count: 0,
138199
first_failure_time: None,
139200
last_packet_report_time: Instant::now(),
201+
keep_alive_handle: Some(keep_alive_handle),
140202
}
141203
}
142204

@@ -226,14 +288,13 @@ impl PeerConnection {
226288
// listen for incoming messages or receipts or wait until is time to do anything else again
227289
let mut resend_check = Some(tokio::time::sleep(tokio::time::Duration::from_millis(10)));
228290

229-
const KEEP_ALIVE_INTERVAL: Duration = Duration::from_secs(10);
230291
const KILL_CONNECTION_AFTER: Duration = Duration::from_secs(30);
231-
232-
let mut keep_alive = tokio::time::interval(KEEP_ALIVE_INTERVAL);
233-
keep_alive.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
234-
keep_alive.tick().await;
235292
let mut last_received = std::time::Instant::now();
236293

294+
// Check for timeout periodically
295+
let mut timeout_check = tokio::time::interval(Duration::from_secs(5));
296+
timeout_check.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
297+
237298
const FAILURE_TIME_WINDOW: Duration = Duration::from_secs(30);
238299
loop {
239300
// tracing::trace!(remote = ?self.remote_conn.remote_addr, "waiting for inbound messages");
@@ -422,13 +483,11 @@ impl PeerConnection {
422483
};
423484
res.map_err(|e| TransportError::Other(e.into()))??
424485
}
425-
_ = keep_alive.tick() => {
486+
_ = timeout_check.tick() => {
426487
if last_received.elapsed() > KILL_CONNECTION_AFTER {
427-
tracing::warn!(remote = ?self.remote_conn.remote_addr, "connection timed out");
488+
tracing::warn!(remote = ?self.remote_conn.remote_addr, "connection timed out - no packets received for {:?}", last_received.elapsed());
428489
return Err(TransportError::ConnectionClosed(self.remote_addr()));
429490
}
430-
tracing::trace!(remote = ?self.remote_conn.remote_addr, "sending keep-alive");
431-
self.noop(vec![]).await?;
432491
}
433492
_ = resend_check.take().unwrap_or(tokio::time::sleep(Duration::from_millis(10))) => {
434493
loop {

0 commit comments

Comments
 (0)