Skip to content

Commit 467f145

Browse files
feat: implement flood_publish option in GossipSub and add corresponding tests
1 parent 8bd69d6 commit 467f145

File tree

2 files changed

+124
-47
lines changed

2 files changed

+124
-47
lines changed

libp2p/pubsub/gossipsub.py

Lines changed: 52 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -140,8 +140,14 @@ def __init__(
140140
self.mcache = MessageCache(gossip_window, gossip_history)
141141

142142
# Whether to flood publish to all peers instead of following gossipsub
143-
# mesh/fanout logic. Kept as an option primarily for tests and
144-
# compatibility with the test factory. Default is False.
143+
# mesh/fanout logic when acting as the original publisher.
144+
# When enabled, this behaves as a hybrid between FloodSub and GossipSub:
145+
# - When this node is the original publisher: Message is sent to ALL peers
146+
# who are subscribed to the topic (flood publishing behavior)
147+
# - When this node is forwarding a message: Regular GossipSub behavior is used
148+
# This provides better reliability at publication time with a reasonable
149+
# bandwidth cost since it only affects the original publisher.
150+
# Default is False.
145151
self.flood_publish = flood_publish
146152

147153
# Create heartbeat timer
@@ -306,43 +312,52 @@ def _get_peers_to_send(
306312
if topic not in self.pubsub.peer_topics:
307313
continue
308314

309-
# direct peers
310-
_direct_peers: set[ID] = {_peer for _peer in self.direct_peers}
311-
send_to.update(_direct_peers)
312-
313-
# floodsub peers
314-
floodsub_peers: set[ID] = {
315-
peer_id
316-
for peer_id in self.pubsub.peer_topics[topic]
317-
if peer_id in self.peer_protocol
318-
and self.peer_protocol[peer_id] == floodsub.PROTOCOL_ID
319-
}
320-
send_to.update(floodsub_peers)
321-
322-
# gossipsub peers
323-
gossipsub_peers: set[ID] = set()
324-
if topic in self.mesh:
325-
gossipsub_peers = self.mesh[topic]
315+
# If flood_publish is enabled and we are the original publisher,
316+
# send to all peers in the topic (flood publishing behavior)
317+
if self.flood_publish and msg_forwarder == self.pubsub.my_id:
318+
for peer in self.pubsub.peer_topics[topic]:
319+
# TODO: add score threshold check when peer scoring is implemented
320+
# if direct peer then skip score check
321+
send_to.add(peer)
326322
else:
327-
# When we publish to a topic that we have not subscribe to, we randomly
328-
# pick `self.degree` number of peers who have subscribed to the topic
329-
# and add them as our `fanout` peers.
330-
topic_in_fanout: bool = topic in self.fanout
331-
fanout_peers: set[ID] = self.fanout[topic] if topic_in_fanout else set()
332-
fanout_size = len(fanout_peers)
333-
if not topic_in_fanout or (
334-
topic_in_fanout and fanout_size < self.degree
335-
):
336-
if topic in self.pubsub.peer_topics:
337-
# Combine fanout peers with selected peers
338-
fanout_peers.update(
339-
self._get_in_topic_gossipsub_peers_from_minus(
340-
topic, self.degree - fanout_size, fanout_peers
323+
# Regular GossipSub routing logic
324+
# direct peers
325+
_direct_peers: set[ID] = {_peer for _peer in self.direct_peers}
326+
send_to.update(_direct_peers)
327+
328+
# floodsub peers
329+
floodsub_peers: set[ID] = {
330+
peer_id
331+
for peer_id in self.pubsub.peer_topics[topic]
332+
if peer_id in self.peer_protocol
333+
and self.peer_protocol[peer_id] == floodsub.PROTOCOL_ID
334+
}
335+
send_to.update(floodsub_peers)
336+
337+
# gossipsub peers
338+
gossipsub_peers: set[ID] = set()
339+
if topic in self.mesh:
340+
gossipsub_peers = self.mesh[topic]
341+
else:
342+
# When we publish to a topic that we have not subscribe to, we randomly
343+
# pick `self.degree` number of peers who have subscribed to the topic
344+
# and add them as our `fanout` peers.
345+
topic_in_fanout: bool = topic in self.fanout
346+
fanout_peers: set[ID] = self.fanout[topic] if topic_in_fanout else set()
347+
fanout_size = len(fanout_peers)
348+
if not topic_in_fanout or (
349+
topic_in_fanout and fanout_size < self.degree
350+
):
351+
if topic in self.pubsub.peer_topics:
352+
# Combine fanout peers with selected peers
353+
fanout_peers.update(
354+
self._get_in_topic_gossipsub_peers_from_minus(
355+
topic, self.degree - fanout_size, fanout_peers
356+
)
341357
)
342-
)
343-
self.fanout[topic] = fanout_peers
344-
gossipsub_peers = fanout_peers
345-
send_to.update(gossipsub_peers)
358+
self.fanout[topic] = fanout_peers
359+
gossipsub_peers = fanout_peers
360+
send_to.update(gossipsub_peers)
346361
# Excludes `msg_forwarder` and `origin`
347362
yield from send_to.difference([msg_forwarder, origin])
348363

tests/core/pubsub/test_gossipsub.py

Lines changed: 72 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -602,6 +602,8 @@ async def test_sparse_connect():
602602

603603
@pytest.mark.trio
604604
async def test_flood_publish():
605+
"""Test that with flood_publish disabled, message propagation still works
606+
in a fully connected network topology."""
605607
async with PubsubFactory.create_batch_with_gossipsub(
606608
6,
607609
degree=2,
@@ -629,18 +631,78 @@ async def test_flood_publish():
629631
await pubsubs_gsub[0].publish(topic, msg_content)
630632

631633
# wait for messages to propagate
632-
await trio.sleep(0.5)
634+
await trio.sleep(1)
633635

634-
print(routers[0].mesh[topic])
635-
if routers[0].pubsub:
636-
print(routers[0].pubsub.peer_topics)
636+
# Debug info - only log if needed
637+
# print(f"Mesh for topic: {routers[0].mesh[topic]}")
638+
# if routers[0].pubsub:
639+
# print(f"Peer topics: {routers[0].pubsub.peer_topics}")
637640

638-
# verify all nodes received the message
639-
for queue in queues:
640-
msg = await queue.get()
641-
assert msg.data == msg_content, (
642-
f"node did not receive expected message: {msg.data}"
643-
)
641+
# verify all nodes received the message with timeout
642+
for i, queue in enumerate(queues):
643+
try:
644+
with trio.fail_after(5):
645+
msg = await queue.get()
646+
assert msg.data == msg_content, f"node {i} received wrong message: {msg.data}"
647+
except trio.TooSlowError:
648+
pytest.fail(f"Node {i} did not receive the message (timeout)")
649+
650+
# Test passed if all nodes received the message
651+
print("Basic flood test passed - all nodes received the message")
652+
653+
654+
@pytest.mark.trio
655+
async def test_flood_publish_enabled():
656+
"""Test that with flood_publish enabled, all nodes receive the message
657+
even with a sparse network topology."""
658+
# Create a network with flood_publish enabled
659+
async with PubsubFactory.create_batch_with_gossipsub(
660+
6,
661+
degree=2,
662+
degree_low=1,
663+
degree_high=3,
664+
flood_publish=True, # Enable flood_publish
665+
) as pubsubs_gsub:
666+
routers: list[GossipSub] = []
667+
for pubsub in pubsubs_gsub:
668+
assert isinstance(pubsub.router, GossipSub)
669+
routers.append(pubsub.router)
670+
hosts = [ps.host for ps in pubsubs_gsub]
671+
672+
topic = "flood_test_topic"
673+
queues = [await pubsub.subscribe(topic) for pubsub in pubsubs_gsub]
674+
675+
# Create a sparse topology - only connect to a few nodes
676+
# We only connect nodes in a chain, which would normally
677+
# prevent complete message propagation without flood_publish
678+
await connect(hosts[0], hosts[1])
679+
await connect(hosts[1], hosts[2])
680+
await connect(hosts[2], hosts[3])
681+
await connect(hosts[3], hosts[4])
682+
await connect(hosts[4], hosts[5])
683+
684+
# wait for connections to be established
685+
await trio.sleep(1)
686+
687+
# publish a message from the first host
688+
msg_content = b"flood_publish_msg"
689+
await pubsubs_gsub[0].publish(topic, msg_content)
690+
691+
# wait for messages to propagate
692+
await trio.sleep(2)
693+
694+
# verify all nodes received the message with timeout
695+
for i, queue in enumerate(queues):
696+
try:
697+
with trio.fail_after(5):
698+
msg = await queue.get()
699+
assert msg.data == msg_content, f"node {i} received wrong message: {msg.data}"
700+
print(f"Node {i} received message correctly")
701+
except trio.TooSlowError:
702+
pytest.fail(f"Node {i} did not receive the message (timeout)")
703+
704+
# Test passed if all nodes received the message
705+
print("Flood publish test passed - all nodes received the message")
644706

645707

646708
@pytest.mark.trio

0 commit comments

Comments
 (0)