Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
0a73b70
draft meshWSPriority
algorandskiy Jul 18, 2025
bd1e846
generalize
algorandskiy Jul 24, 2025
4ba908c
debug logging fix
algorandskiy Aug 11, 2025
a1d6ad9
rename ps metadata key prop
algorandskiy Aug 15, 2025
e935a44
p2p: no ws streams for non-dialied outgoing peers
algorandskiy Aug 15, 2025
dd1293d
provide gossipSubParams D values
algorandskiy Aug 19, 2025
6f4e8e8
enable peer exchange
algorandskiy Aug 19, 2025
7c8cc68
allow p2p meshThreadInner called for relays with GossipFanout=0
algorandskiy Aug 25, 2025
7187dfe
fix some network flakiness
algorandskiy Aug 25, 2025
ca0ead5
fix linter
algorandskiy Aug 25, 2025
2507146
remove unused noopMeshPubSubFilteredCreator
algorandskiy Aug 26, 2025
706cab0
Merge remote-tracking branch 'upstream/master' into pavel/ws-relays-p…
algorandskiy Sep 15, 2025
f6f9d06
CR: enforce outgoing conn in dialNode for hybrid relays only
algorandskiy Sep 15, 2025
580d7ce
fix linter
algorandskiy Sep 15, 2025
e62c8dc
hybrid mesh: delay p2p backup
algorandskiy Sep 18, 2025
f3fa8ad
fixes to backup logic
algorandskiy Sep 19, 2025
8a367f8
add networkAdvanceMonitor
algorandskiy Sep 17, 2025
1bc13d1
generalize conn perf logic and use for p2p
algorandskiy Sep 18, 2025
bd39dc9
add conn perf monitor tests
algorandskiy Sep 25, 2025
e874534
Merge remote-tracking branch 'upstream/master' into pavel/ws-relays-p…
algorandskiy Sep 25, 2025
73522ea
CR: use defer
algorandskiy Oct 1, 2025
5337292
CR feedback: fix P2PNetwork.meshThreadInner
algorandskiy Oct 15, 2025
0450f23
CR feedback: call p2p mesh thread even if no p2p conns needed to upda…
algorandskiy Oct 17, 2025
4127019
CR feedback: count only self-initiated conns in DialPeersUntilTargetC…
algorandskiy Oct 17, 2025
967b5af
fix linter
algorandskiy Oct 17, 2025
9465158
Merge remote-tracking branch 'upstream/master' into pavel/ws-relays-p…
algorandskiy Oct 17, 2025
834d13f
CR: fan in meshUpdateRequests in a loop
algorandskiy Oct 21, 2025
94825d5
Merge remote-tracking branch 'upstream/master' into pavel/ws-relays-p…
algorandskiy Oct 21, 2025
c7a355f
Merge remote-tracking branch 'upstream/master' into pavel/ws-relays-p…
algorandskiy Oct 24, 2025
7378c85
fix linter
algorandskiy Oct 24, 2025
5c46204
fix linter
algorandskiy Oct 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions network/hybridNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,12 @@ func NewHybridP2PNetwork(log logging.Logger, cfg config.Local, datadir string, p
var childP2PNetMeshCreator MeshCreator = meshCreator
var hybridMeshCreator MeshCreator = noopMeshCreator{}
_, isHybridMeshCreator := meshCreator.(hybridRelayMeshCreator)
if meshCreator == nil && cfg.IsHybridServer() || isHybridMeshCreator {
if (meshCreator == nil && cfg.IsHybridServer()) || isHybridMeshCreator {
// no mesh creator provided and this node is a listening/relaying node
// then override and use hybrid relay meshing
// or, if a hybrid relay meshing requested explicitly, do the same
childWsNetMeshCreator = noopMeshCreator{}
childP2PNetMeshCreator = noopMeshPubSubFilteredCreator{}
childP2PNetMeshCreator = noopMeshCreator{}
hybridMeshCreator = hybridRelayMeshCreator{}
}

Expand All @@ -77,6 +77,7 @@ func NewHybridP2PNetwork(log logging.Logger, cfg config.Local, datadir string, p
}

hybridMesh, err := hybridMeshCreator.create(
withTargetConnCount(cfg.GossipFanout),
withWebsocketNetwork(wsnet),
withP2PNetwork(p2pnet))
if err != nil {
Expand Down Expand Up @@ -187,7 +188,12 @@ func (n *HybridP2PNetwork) RegisterHTTPHandlerFunc(path string, handlerFunc func
}

// RequestConnectOutgoing implements GossipNode
func (n *HybridP2PNetwork) RequestConnectOutgoing(replace bool, quit <-chan struct{}) {}
func (n *HybridP2PNetwork) RequestConnectOutgoing(replace bool, quit <-chan struct{}) {
_ = n.runParallel(func(net GossipNode) error {
net.RequestConnectOutgoing(replace, quit)
return nil
})
}

// GetPeers implements GossipNode
func (n *HybridP2PNetwork) GetPeers(options ...PeerOption) []Peer {
Expand Down
2 changes: 2 additions & 0 deletions network/hybridNetwork_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func TestHybridNetwork_DuplicateConn(t *testing.T) {

cfg := config.GetDefaultLocal()
cfg.EnableP2PHybridMode = true
cfg.DNSBootstrapID = ""
log := logging.TestingLog(t)
const p2pKeyDir = ""

Expand Down Expand Up @@ -208,6 +209,7 @@ func TestHybridNetwork_HybridRelayStrategy(t *testing.T) {

cfg := config.GetDefaultLocal()
cfg.EnableP2PHybridMode = true
cfg.DNSBootstrapID = ""
log := logging.TestingLog(t)

genesisInfo := GenesisInfo{genesisID, "net"}
Expand Down
54 changes: 35 additions & 19 deletions network/mesh.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"sync"
"time"

"github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/network/p2p"
"github.com/libp2p/go-libp2p/p2p/discovery/backoff"
)
Expand All @@ -43,10 +44,11 @@ type baseMesher struct {

type meshConfig struct {
parentCtx context.Context
targetConnCount int
meshUpdateRequests chan meshRequest
meshThreadInterval time.Duration
backoff backoff.BackoffStrategy
netMeshFn func() bool
netMeshFn func(int) int
peerStatReporter func()
closer func()

Expand All @@ -68,7 +70,7 @@ func withMeshExpJitterBackoff() meshOption {
cfg.backoff = eb
}
}
func withMeshNetMeshFn(netMeshFn func() bool) meshOption {
func withMeshNetMeshFn(netMeshFn func(int) int) meshOption {
return func(cfg *meshConfig) {
cfg.netMeshFn = netMeshFn
}
Expand Down Expand Up @@ -102,6 +104,12 @@ func withContext(ctx context.Context) meshOption {
}
}

func withTargetConnCount(targetConnCount int) meshOption {
return func(cfg *meshConfig) {
cfg.targetConnCount = targetConnCount
}
}

func withWebsocketNetwork(wsnet *WebsocketNetwork) meshOption {
return func(cfg *meshConfig) {
cfg.wsnet = wsnet
Expand All @@ -128,6 +136,9 @@ func newBaseMesher(opts ...meshOption) (*baseMesher, error) {
if cfg.meshUpdateRequests == nil {
return nil, errors.New("mesh update requests channel is not set")
}
if cfg.targetConnCount == 0 {
logging.Base().Warn("target connection count not set, not connecting to any peers")
}
if cfg.meshThreadInterval == 0 {
cfg.meshThreadInterval = meshThreadInterval
}
Expand Down Expand Up @@ -155,9 +166,9 @@ func (m *baseMesher) meshThread() {
return
}

hasPeers := m.netMeshFn()
numOutgoing := m.netMeshFn(m.targetConnCount)
if m.backoff != nil {
if hasPeers {
if numOutgoing > 0 {
// found something, reset timer to the configured value
timer.Reset(m.meshThreadInterval)
m.backoff.Reset()
Expand Down Expand Up @@ -229,10 +240,29 @@ func (c hybridRelayMeshCreator) create(opts ...meshOption) (mesher, error) {
out := make(chan meshRequest, 5)
var wg sync.WaitGroup

meshFn := func(targetConnCount int) int {
wsConnections := cfg.wsnet.meshThreadInner(targetConnCount)

var p2pConnections int
if wsConnections <= targetConnCount {
// note "less or equal". Even if p2pTarget is zero it makes sense to call
// p2p meshThreadInner to fetch DHT peers
p2pTarget := targetConnCount - wsConnections
p2pConnections = cfg.p2pnet.meshThreadInner(p2pTarget)

if cfg.wsnet.log.GetLevel() >= logging.Debug {
cfg.wsnet.log.Debugf("Hybrid WS-priority mesh: WS out connections=%d, P2P out connections=%d, target=%d",
wsConnections, p2pConnections, targetConnCount)
}
}
return wsConnections + p2pConnections
}

ctx := cfg.wsnet.ctx
mesh, err := newBaseMesher(
withContext(ctx),
withMeshNetMeshFn(cfg.wsnet.meshThreadInner),
withTargetConnCount(cfg.wsnet.config.GossipFanout),
withMeshNetMeshFn(meshFn),
withMeshPeerStatReporter(func() {
cfg.p2pnet.peerStater.sendPeerConnectionsTelemetryStatus(cfg.wsnet)
cfg.p2pnet.peerStater.sendPeerConnectionsTelemetryStatus(cfg.p2pnet)
Expand Down Expand Up @@ -289,17 +319,3 @@ type noopMesh struct{}

func (m *noopMesh) start() {}
func (m *noopMesh) stop() {}

type noopMeshPubSubFilteredCreator struct{}

func (c noopMeshPubSubFilteredCreator) create(opts ...meshOption) (mesher, error) {
return &noopMesh{}, nil
}
func (c noopMeshPubSubFilteredCreator) makeConfig(wsnet *WebsocketNetwork, p2pnet *P2PNetwork) networkConfig {
return networkConfig{
pubsubOpts: []p2p.PubSubOption{
p2p.DisablePubSubPeerExchange(),
p2p.SetPubSubPeerFilter(p2pnet.p2pRelayPeerFilter, p2pnet.pstore),
},
}
}
93 changes: 93 additions & 0 deletions network/mesh_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright (C) 2019-2025 Algorand, Inc.
// This file is part of go-algorand
//
// go-algorand is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// go-algorand is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with go-algorand. If not, see <https://www.gnu.org/licenses/>.

package network

import (
"context"
"net/http"
"sync/atomic"
"testing"
"time"

"github.com/algorand/go-algorand/config"
"github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/network/limitcaller"
p2piface "github.com/algorand/go-algorand/network/p2p"
"github.com/algorand/go-algorand/protocol"
"github.com/algorand/go-algorand/test/partitiontest"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/stretchr/testify/require"
)

// mockP2PService implements p2p.Service and counts DialPeersUntilTargetCount invocations.
// It relies on p2p's meshThreadInner's defer of DialPeersUntilTargetCount to detect invocation.
type mockP2PService struct{ dialCount atomic.Int32 }

func (m *mockP2PService) Start() error { return nil }
func (m *mockP2PService) Close() error { return nil }
func (m *mockP2PService) ID() peer.ID { return "" }
func (m *mockP2PService) IDSigner() *p2piface.PeerIDChallengeSigner { return nil }
func (m *mockP2PService) AddrInfo() peer.AddrInfo { return peer.AddrInfo{} }
func (m *mockP2PService) NetworkNotify(network.Notifiee) {}
func (m *mockP2PService) NetworkStopNotify(network.Notifiee) {}
func (m *mockP2PService) DialPeersUntilTargetCount(int) { m.dialCount.Add(1) }
func (m *mockP2PService) ClosePeer(peer.ID) error { return nil }
func (m *mockP2PService) Conns() []network.Conn { return nil }
func (m *mockP2PService) ListPeersForTopic(string) []peer.ID { return nil }
func (m *mockP2PService) Subscribe(string, pubsub.ValidatorEx) (p2piface.SubNextCancellable, error) {
return nil, nil
}
func (m *mockP2PService) Publish(context.Context, string, []byte) error { return nil }
func (m *mockP2PService) GetHTTPClient(*peer.AddrInfo, limitcaller.ConnectionTimeStore, time.Duration) (*http.Client, error) {
return &http.Client{}, nil
}

// TestMesh_HybridRelayP2PInnerCall ensures the wsConnections <= targetConnCount condition
// in the hybridRelayMeshCreator mesh function in order to make sure P2PNetwork.meshThreadInner is invoked
func TestMesh_HybridRelayP2PInnerCall(t *testing.T) {
partitiontest.PartitionTest(t)
t.Parallel()

cfg := config.GetDefaultLocal()
cfg.GossipFanout = 0
cfg.DNSBootstrapID = ""
cfg.EnableP2PHybridMode = true
cfg.PublicAddress = "public-address"
cfg.NetAddress = "127.0.0.1:0"
cfg.P2PHybridNetAddress = "127.0.0.1:0"

log := logging.TestingLog(t)
genesisInfo := GenesisInfo{GenesisID: "test-genesis", NetworkID: protocol.NetworkID("test-network")}
net, err := NewHybridP2PNetwork(log, cfg, "", nil, genesisInfo, &nopeNodeInfo{}, nil)
require.NoError(t, err)

mockSvc := &mockP2PService{}
net.p2pNetwork.service = mockSvc
net.p2pNetwork.relayMessages = false // prevent pubsub startup

err = net.Start()
require.NoError(t, err)
defer net.Stop()

net.RequestConnectOutgoing(false, nil)
require.Eventually(t, func() bool {
// RequestConnectOutgoing queues mesh update request so we have to wait a bit
return mockSvc.dialCount.Load() > 0
}, 3*time.Second, 50*time.Millisecond, "expected DialPeersUntilTargetCount to be called")
}
7 changes: 7 additions & 0 deletions network/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ const AlgorandWsProtocolV22 = "/algorand-ws/2.2.0"

const dialTimeout = 30 * time.Second

const psmdkDialed = "dialed"

// MakeHost creates a libp2p host but does not start listening.
// Use host.Network().Listen() on the returned address to start listening.
func MakeHost(cfg config.Local, datadir string, pstore *pstore.PeerStore) (host.Host, string, error) {
Expand Down Expand Up @@ -302,6 +304,8 @@ func (s *serviceImpl) DialPeersUntilTargetCount(targetConnCount int) {
err := s.dialNode(context.Background(), peerInfo) // leaving the calls as blocking for now, to not over-connect beyond fanout
if err != nil {
s.log.Warnf("failed to connect to peer %s: %v", peerInfo.ID, err)
} else {
numOutgoingConns++
}
}
}
Expand All @@ -314,6 +318,9 @@ func (s *serviceImpl) dialNode(ctx context.Context, peer *peer.AddrInfo) error {
}
ctx, cancel := context.WithTimeout(ctx, dialTimeout)
defer cancel()
if err := s.host.Peerstore().Put(peer.ID, psmdkDialed, true); err != nil { // mark this peer as explicitly dialed
return err
}
return s.host.Connect(ctx, *peer)
}

Expand Down
Loading
Loading