Skip to content

Commit 5fefc0e

Browse files
committed
address review comment
1 parent a98f878 commit 5fefc0e

File tree

2 files changed

+86
-35
lines changed

2 files changed

+86
-35
lines changed

libp2p/network/health/monitor.py

Lines changed: 31 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ async def run(self) -> None:
4949
logger.info("Starting ConnectionHealthMonitor service")
5050

5151
# Only run if health monitoring is enabled
52-
if not self._is_health_monitoring_enabled():
52+
if not self._is_health_monitoring_enabled:
5353
logger.debug("Health monitoring disabled, skipping monitor service")
5454
return
5555

@@ -128,16 +128,29 @@ async def _check_connection_health(self, peer_id: ID, conn: INetConn) -> None:
128128
try:
129129
# Skip checks during connection warmup window
130130
warmup = getattr(self.config, "health_warmup_window", 0.0)
131-
if warmup and hasattr(conn, "established_at"):
132-
# Fallback to event_started if established_at not present
133-
try:
131+
if warmup:
132+
# Check if we have health data with established_at timestamp
133+
if self._has_health_data(peer_id, conn):
134134
import time
135135

136-
established_at = getattr(conn, "established_at")
137-
if established_at and (time.time() - established_at) < warmup:
136+
health = self.swarm.health_data[peer_id][conn]
137+
if (
138+
health.established_at
139+
and (time.time() - health.established_at) < warmup
140+
):
141+
logger.debug(
142+
f"Skipping health check for {peer_id} during warmup window"
143+
)
138144
return
139-
except Exception:
140-
pass
145+
else:
146+
# If no health data yet, this is likely a new connection
147+
# Initialize health tracking and skip the first check
148+
self.swarm.initialize_connection_health(peer_id, conn)
149+
logger.debug(
150+
f"Skipping health check for {peer_id} - "
151+
f"initializing health data"
152+
)
153+
return
141154

142155
# Ensure health tracking is initialized
143156
if not self._has_health_data(peer_id, conn):
@@ -299,24 +312,15 @@ async def _replace_unhealthy_connection(
299312

300313
# Try to establish a new connection to maintain connectivity
301314
try:
302-
# Get peer info for dialing
303-
peer_info = self.swarm.peerstore.peer_info(peer_id)
304-
if peer_info and peer_info.addrs:
305-
logger.info(f"Attempting to dial new connection to {peer_id}")
306-
new_conn = await self.swarm.dial_peer(peer_id)
307-
if new_conn:
308-
logger.info(
309-
f"Successfully established replacement connection to "
310-
f"{peer_id}"
311-
)
312-
else:
313-
logger.warning(
314-
f"Failed to establish replacement connection to {peer_id}"
315-
)
315+
logger.info(f"Attempting to dial replacement connection to {peer_id}")
316+
new_conn = await self.swarm.dial_peer_replacement(peer_id)
317+
if new_conn:
318+
logger.info(
319+
f"Successfully established replacement connection to {peer_id}"
320+
)
316321
else:
317322
logger.warning(
318-
f"No addresses available for {peer_id}, "
319-
f"cannot establish replacement"
323+
f"Failed to establish replacement connection to {peer_id}"
320324
)
321325

322326
except Exception as e:
@@ -327,9 +331,10 @@ async def _replace_unhealthy_connection(
327331
except Exception as e:
328332
logger.error(f"Error replacing connection to {peer_id}: {e}")
329333

334+
@property
330335
def _is_health_monitoring_enabled(self) -> bool:
331336
"""Check if health monitoring is enabled."""
332-
return self.swarm._is_health_monitoring_enabled()
337+
return self.swarm._is_health_monitoring_enabled
333338

334339
def _has_health_data(self, peer_id: ID, conn: INetConn) -> bool:
335340
"""Check if health data exists for a connection."""
@@ -341,7 +346,7 @@ def _has_health_data(self, peer_id: ID, conn: INetConn) -> bool:
341346

342347
async def get_monitoring_status(self) -> HealthMonitorStatus:
343348
"""Get current monitoring status and statistics."""
344-
if not self._is_health_monitoring_enabled():
349+
if not self._is_health_monitoring_enabled:
345350
return HealthMonitorStatus(enabled=False)
346351

347352
total_connections = sum(len(conns) for conns in self.swarm.connections.values())

libp2p/network/swarm.py

Lines changed: 55 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ async def run(self) -> None:
159159
self.transport.set_swarm(self)
160160

161161
# Start health monitoring service if enabled
162-
if self._is_health_monitoring_enabled():
162+
if self._is_health_monitoring_enabled:
163163
from libp2p.network.health.monitor import ConnectionHealthMonitor
164164

165165
self._health_monitor = ConnectionHealthMonitor(self)
@@ -414,6 +414,51 @@ async def dial_addr(self, addr: Multiaddr, peer_id: ID) -> INetConn:
414414
"""
415415
return await self._dial_with_retry(addr, peer_id)
416416

417+
async def dial_peer_replacement(self, peer_id: ID) -> INetConn | None:
418+
"""
419+
Create a new connection to peer_id for replacement purposes.
420+
This bypasses the existing connection check and always creates a new connection.
421+
422+
:param peer_id: peer ID to dial
423+
:raises SwarmException: raised when an error occurs
424+
:return: new network connection or None if no addresses available
425+
"""
426+
logger.debug("attempting to dial replacement connection to peer %s", peer_id)
427+
428+
try:
429+
# Get peer info from peer store
430+
addrs = self.peerstore.addrs(peer_id)
431+
except PeerStoreError:
432+
logger.warning(f"No known addresses to peer {peer_id} for replacement")
433+
return None
434+
435+
if not addrs:
436+
logger.warning(f"No addresses available for {peer_id} for replacement")
437+
return None
438+
439+
exceptions: list[SwarmException] = []
440+
441+
# Try all known addresses with retry logic
442+
for multiaddr in addrs:
443+
try:
444+
connection = await self._dial_with_retry(multiaddr, peer_id)
445+
logger.info(
446+
f"Successfully established replacement connection to {peer_id}"
447+
)
448+
return connection
449+
except SwarmException as e:
450+
exceptions.append(e)
451+
logger.debug(
452+
"encountered swarm exception when trying to connect to %s, "
453+
"trying next address...",
454+
multiaddr,
455+
exc_info=e,
456+
)
457+
458+
# All addresses failed
459+
logger.warning(f"Failed to establish replacement connection to {peer_id}")
460+
return None
461+
417462
async def new_stream(self, peer_id: ID) -> INetStream:
418463
"""
419464
Enhanced: Create a new stream with load balancing across multiple connections.
@@ -839,6 +884,7 @@ async def notify_all(self, notifier: Callable[[INotifee], Awaitable[None]]) -> N
839884

840885
# Health monitoring methods (conditional on health monitoring being enabled)
841886

887+
@property
842888
def _is_health_monitoring_enabled(self) -> bool:
843889
"""Check if health monitoring is enabled."""
844890
return (
@@ -849,7 +895,7 @@ def _is_health_monitoring_enabled(self) -> bool:
849895

850896
def initialize_connection_health(self, peer_id: ID, connection: INetConn) -> None:
851897
"""Initialize health tracking for a new connection."""
852-
if not self._is_health_monitoring_enabled():
898+
if not self._is_health_monitoring_enabled:
853899
return
854900

855901
from libp2p.network.health.data_structures import (
@@ -871,7 +917,7 @@ def initialize_connection_health(self, peer_id: ID, connection: INetConn) -> Non
871917

872918
def cleanup_connection_health(self, peer_id: ID, connection: INetConn) -> None:
873919
"""Clean up health tracking for a closed connection."""
874-
if not self._is_health_monitoring_enabled():
920+
if not self._is_health_monitoring_enabled:
875921
return
876922

877923
if peer_id in self.health_data and connection in self.health_data[peer_id]:
@@ -885,7 +931,7 @@ def record_connection_event(
885931
) -> None:
886932
"""Record a connection lifecycle event."""
887933
if (
888-
self._is_health_monitoring_enabled()
934+
self._is_health_monitoring_enabled
889935
and peer_id in self.health_data
890936
and connection in self.health_data[peer_id]
891937
):
@@ -896,15 +942,15 @@ def record_connection_error(
896942
) -> None:
897943
"""Record a connection error."""
898944
if (
899-
self._is_health_monitoring_enabled()
945+
self._is_health_monitoring_enabled
900946
and peer_id in self.health_data
901947
and connection in self.health_data[peer_id]
902948
):
903949
self.health_data[peer_id][connection].add_error(error)
904950

905951
def get_peer_health_summary(self, peer_id: ID) -> dict[str, Any]:
906952
"""Get health summary for a specific peer."""
907-
if not self._is_health_monitoring_enabled():
953+
if not self._is_health_monitoring_enabled:
908954
return {}
909955

910956
if peer_id not in self.health_data:
@@ -942,7 +988,7 @@ def get_peer_health_summary(self, peer_id: ID) -> dict[str, Any]:
942988

943989
def get_global_health_summary(self) -> dict[str, Any]:
944990
"""Get global health summary across all peers."""
945-
if not self._is_health_monitoring_enabled():
991+
if not self._is_health_monitoring_enabled:
946992
return {}
947993

948994
all_peers = list(self.health_data.keys())
@@ -975,7 +1021,7 @@ def get_global_health_summary(self) -> dict[str, Any]:
9751021

9761022
def export_health_metrics(self, format: str = "json") -> str:
9771023
"""Export health metrics in various formats."""
978-
if not self._is_health_monitoring_enabled():
1024+
if not self._is_health_monitoring_enabled:
9791025
return "{}" if format == "json" else ""
9801026

9811027
summary = self.get_global_health_summary()
@@ -1018,7 +1064,7 @@ def _format_prometheus_metrics(self, summary: dict[str, Any]) -> str:
10181064

10191065
async def get_health_monitor_status(self) -> dict[str, Any]:
10201066
"""Get status information about the health monitoring service."""
1021-
if not self._is_health_monitoring_enabled() or self._health_monitor is None:
1067+
if not self._is_health_monitoring_enabled or self._health_monitor is None:
10221068
return {"enabled": False}
10231069

10241070
status = await self._health_monitor.get_monitoring_status()

0 commit comments

Comments
 (0)