Skip to content

Commit b52bcdc

Browse files
committed
Address review comment
1 parent b505eb6 commit b52bcdc

File tree

4 files changed

+53
-36
lines changed

4 files changed

+53
-36
lines changed

libp2p/abc.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1581,6 +1581,7 @@ async def close_peer(self, peer_id: ID) -> None:
15811581
15821582
"""
15831583

1584+
@abstractmethod
15841585
def get_peer_health_summary(self, peer_id: ID) -> dict[str, Any]:
15851586
"""
15861587
Get health summary for a specific peer.
@@ -1599,6 +1600,7 @@ def get_peer_health_summary(self, peer_id: ID) -> dict[str, Any]:
15991600
"""
16001601
return {}
16011602

1603+
@abstractmethod
16021604
def get_global_health_summary(self) -> dict[str, Any]:
16031605
"""
16041606
Get global health summary across all peers.
@@ -1612,6 +1614,7 @@ def get_global_health_summary(self) -> dict[str, Any]:
16121614
"""
16131615
return {}
16141616

1617+
@abstractmethod
16151618
def export_health_metrics(self, format: str = "json") -> str:
16161619
"""
16171620
Export health metrics in specified format.
@@ -1630,6 +1633,7 @@ def export_health_metrics(self, format: str = "json") -> str:
16301633
"""
16311634
return "{}" if format == "json" else ""
16321635

1636+
@abstractmethod
16331637
async def get_health_monitor_status(self) -> dict[str, Any]:
16341638
"""
16351639
Get status information about the health monitoring service.
@@ -1967,6 +1971,7 @@ def get_connection_health(self, peer_id: ID) -> dict[str, Any]:
19671971
Returns empty dict if health monitoring is disabled or peer not found.
19681972
19691973
"""
1974+
...
19701975

19711976
@abstractmethod
19721977
def get_network_health_summary(self) -> dict[str, Any]:
@@ -1980,6 +1985,7 @@ def get_network_health_summary(self) -> dict[str, Any]:
19801985
Returns empty dict if health monitoring is disabled.
19811986
19821987
"""
1988+
...
19831989

19841990
@abstractmethod
19851991
def export_health_metrics(self, format: str = "json") -> str:
@@ -1998,6 +2004,7 @@ def export_health_metrics(self, format: str = "json") -> str:
19982004
Returns empty string or object if health monitoring is disabled.
19992005
20002006
"""
2007+
...
20012008

20022009
@abstractmethod
20032010
async def get_health_monitor_status(self) -> dict[str, Any]:
@@ -2018,6 +2025,7 @@ async def get_health_monitor_status(self) -> dict[str, Any]:
20182025
Returns {"enabled": False} if health monitoring is disabled.
20192026
20202027
"""
2028+
...
20212029

20222030

20232031
# -------------------------- peer-record interface.py --------------------------

libp2p/network/health/monitor.py

Lines changed: 41 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -277,40 +277,25 @@ def _should_replace_connection(self, peer_id: ID, conn: INetConn) -> bool:
277277
async def _replace_unhealthy_connection(
278278
self, peer_id: ID, old_conn: INetConn
279279
) -> None:
280-
"""Replace an unhealthy connection with a new one."""
280+
"""
281+
Replace an unhealthy connection with a new one.
282+
283+
This method establishes a new connection BEFORE closing the old one
284+
to avoid dropping below minimum connection threshold.
285+
"""
281286
try:
282287
logger.info(f"Replacing unhealthy connection for peer {peer_id}")
283288

284-
# Check if we have enough connections remaining
289+
# Check current connection count
285290
current_connections = self.swarm.connections.get(peer_id, [])
286-
remaining_after_removal = len(current_connections) - 1
287-
288-
# Only remove if we have more than the minimum required
289-
if remaining_after_removal < self.config.min_connections_per_peer:
290-
logger.warning(
291-
f"Not replacing connection to {peer_id}: would go below minimum "
292-
f"({remaining_after_removal} < "
293-
f"{self.config.min_connections_per_peer})"
294-
)
295-
return
296-
297-
# Clean up health tracking first
298-
self.swarm.cleanup_connection_health(peer_id, old_conn)
299-
300-
# Remove from active connections
301-
if (
302-
peer_id in self.swarm.connections
303-
and old_conn in self.swarm.connections[peer_id]
304-
):
305-
self.swarm.connections[peer_id].remove(old_conn)
291+
current_count = len(current_connections)
306292

307-
# Close the unhealthy connection
308-
try:
309-
await old_conn.close()
310-
except Exception as e:
311-
logger.debug(f"Error closing unhealthy connection: {e}")
293+
# Strategy: Try to establish new connection first, then close
294+
# old one. This prevents us from being stuck with a bad
295+
# connection at minimum threshold
312296

313-
# Try to establish a new connection to maintain connectivity
297+
# First, try to establish a new connection
298+
new_conn = None
314299
try:
315300
logger.info(f"Attempting to dial replacement connection to {peer_id}")
316301
new_conn = await self.swarm.dial_peer_replacement(peer_id)
@@ -322,12 +307,39 @@ async def _replace_unhealthy_connection(
322307
logger.warning(
323308
f"Failed to establish replacement connection to {peer_id}"
324309
)
325-
326310
except Exception as e:
327311
logger.error(
328312
f"Error establishing replacement connection to {peer_id}: {e}"
329313
)
330314

315+
# If we successfully established a new connection, or if we have enough
316+
# connections to safely remove the bad one, proceed with cleanup
317+
if new_conn or current_count > self.config.min_connections_per_peer:
318+
# Clean up health tracking
319+
self.swarm.cleanup_connection_health(peer_id, old_conn)
320+
321+
# Remove from active connections
322+
if (
323+
peer_id in self.swarm.connections
324+
and old_conn in self.swarm.connections[peer_id]
325+
):
326+
self.swarm.connections[peer_id].remove(old_conn)
327+
328+
# Close the unhealthy connection
329+
try:
330+
await old_conn.close()
331+
logger.info(f"Closed unhealthy connection to {peer_id}")
332+
except Exception as e:
333+
logger.debug(f"Error closing unhealthy connection: {e}")
334+
else:
335+
# We couldn't establish a new connection and we're at minimum
336+
# Keep the unhealthy connection rather than having no connection
337+
logger.warning(
338+
f"Keeping unhealthy connection to {peer_id}: "
339+
f"failed to establish replacement and at minimum threshold "
340+
f"({current_count} connections)"
341+
)
342+
331343
except Exception as e:
332344
logger.error(f"Error replacing connection to {peer_id}: {e}")
333345

libp2p/pubsub/pubsub.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@
5353
StreamClosed,
5454
StreamEOF,
5555
StreamError,
56-
StreamReset,
5756
)
5857
from libp2p.peer.id import (
5958
ID,

tests/core/transport/test_websocket.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,6 @@
55
from typing import Any
66

77
import pytest
8-
9-
if hasattr(builtins, "ExceptionGroup"):
10-
ExceptionGroup = builtins.ExceptionGroup # type: ignore[misc]
11-
else:
12-
# Fallback for older Python versions
13-
ExceptionGroup = Exception # type: ignore[misc]
148
from multiaddr import Multiaddr
159
import trio
1610

@@ -31,6 +25,10 @@
3125

3226
logger = logging.getLogger(__name__)
3327

28+
# ExceptionGroup type handling for Python 3.11+ compatibility
29+
# Use getattr to avoid type checker errors on Python 3.10
30+
ExceptionGroupType: type[BaseException] = getattr(builtins, "ExceptionGroup", Exception)
31+
3432
PLAINTEXT_PROTOCOL_ID = "/plaintext/1.0.0"
3533

3634

0 commit comments

Comments
 (0)