Skip to content
Open
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
0a73b70
draft meshWSPriority
algorandskiy Jul 18, 2025
bd1e846
generalize
algorandskiy Jul 24, 2025
4ba908c
debug logging fix
algorandskiy Aug 11, 2025
a1d6ad9
rename ps metadata key prop
algorandskiy Aug 15, 2025
e935a44
p2p: no ws streams for non-dialied outgoing peers
algorandskiy Aug 15, 2025
dd1293d
provide gossipSubParams D values
algorandskiy Aug 19, 2025
6f4e8e8
enable peer exchange
algorandskiy Aug 19, 2025
7c8cc68
allow p2p meshThreadInner called for relays with GossipFanout=0
algorandskiy Aug 25, 2025
7187dfe
fix some network flakiness
algorandskiy Aug 25, 2025
ca0ead5
fix linter
algorandskiy Aug 25, 2025
2507146
remove unused noopMeshPubSubFilteredCreator
algorandskiy Aug 26, 2025
706cab0
Merge remote-tracking branch 'upstream/master' into pavel/ws-relays-p…
algorandskiy Sep 15, 2025
f6f9d06
CR: enforce outgoing conn in dialNode for hybrid relays only
algorandskiy Sep 15, 2025
580d7ce
fix linter
algorandskiy Sep 15, 2025
e62c8dc
hybrid mesh: delay p2p backup
algorandskiy Sep 18, 2025
f3fa8ad
fixes to backup logic
algorandskiy Sep 19, 2025
8a367f8
add networkAdvanceMonitor
algorandskiy Sep 17, 2025
1bc13d1
generalize conn perf logic and use for p2p
algorandskiy Sep 18, 2025
bd39dc9
add conn perf monitor tests
algorandskiy Sep 25, 2025
e874534
Merge remote-tracking branch 'upstream/master' into pavel/ws-relays-p…
algorandskiy Sep 25, 2025
73522ea
CR: use defer
algorandskiy Oct 1, 2025
5337292
CR feedback: fix P2PNetwork.meshThreadInner
algorandskiy Oct 15, 2025
0450f23
CR feedback: call p2p mesh thread even if no p2p conns needed to upda…
algorandskiy Oct 17, 2025
4127019
CR feedback: count only self-initiated conns in DialPeersUntilTargetC…
algorandskiy Oct 17, 2025
967b5af
fix linter
algorandskiy Oct 17, 2025
9465158
Merge remote-tracking branch 'upstream/master' into pavel/ws-relays-p…
algorandskiy Oct 17, 2025
834d13f
CR: fan in meshUpdateRequests in a loop
algorandskiy Oct 21, 2025
94825d5
Merge remote-tracking branch 'upstream/master' into pavel/ws-relays-p…
algorandskiy Oct 21, 2025
c7a355f
Merge remote-tracking branch 'upstream/master' into pavel/ws-relays-p…
algorandskiy Oct 24, 2025
7378c85
fix linter
algorandskiy Oct 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 125 additions & 0 deletions network/connPerfMon.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/algorand/go-deadlock"

"github.com/algorand/go-algorand/crypto"
"github.com/algorand/go-algorand/logging"
)

//msgp:ignore pmStage
Expand Down Expand Up @@ -384,3 +385,127 @@ func (pm *connectionPerformanceMonitor) accumulateMessage(msg *IncomingMessage,
delete(msgBucket.messages, msgDigest)
}
}

type networkAdvanceMonitor struct {
// lastNetworkAdvance contains the last timestamp where the agreement protocol was able to make a notable progress.
// it used as a watchdog to help us detect connectivity issues ( such as cliques )
lastNetworkAdvance time.Time

mu deadlock.Mutex
}

func makeNetworkAdvanceMonitor() *networkAdvanceMonitor {
return &networkAdvanceMonitor{
lastNetworkAdvance: time.Now().UTC(),
}
}

func (m *networkAdvanceMonitor) lastAdvancedWithin(interval time.Duration) bool {
m.mu.Lock()
defer m.mu.Unlock()
// now < last + interval <=> now - last < interval
return time.Now().UTC().Before(m.lastNetworkAdvance.Add(interval))
}

func (m *networkAdvanceMonitor) updateLastAdvance() {
m.mu.Lock()
defer m.mu.Unlock()
m.lastNetworkAdvance = time.Now().UTC()
}

type outgoingConnsCloser struct {
log logging.Logger
net outgoingDisconnectable
cliqueResolveInterval time.Duration
connPerfMonitor *connectionPerformanceMonitor
netAdvMonitor *networkAdvanceMonitor
}

type outgoingDisconnectable interface {
outgoingPeers() (peers []Peer)
numOutgoingPending() int
disconnect(badnode Peer, reason disconnectReason)
OnNetworkAdvance()
}

func makeOutgoingConnsCloser(log logging.Logger, net outgoingDisconnectable, connPerfMonitor *connectionPerformanceMonitor, cliqueResolveInterval time.Duration) *outgoingConnsCloser {
return &outgoingConnsCloser{
log: log,
net: net,
cliqueResolveInterval: cliqueResolveInterval,
connPerfMonitor: connPerfMonitor,
netAdvMonitor: makeNetworkAdvanceMonitor(),
}
}

// checkExistingConnectionsNeedDisconnecting check to see if existing connection need to be dropped due to
// performance issues and/or network being stalled.
func (cc *outgoingConnsCloser) checkExistingConnectionsNeedDisconnecting(targetConnCount int) bool {
// we already connected ( or connecting.. ) to GossipFanout peers.
// get the actual peers.
outgoingPeers := cc.net.outgoingPeers()
if len(outgoingPeers) < targetConnCount {
// reset the performance monitor.
cc.connPerfMonitor.Reset([]Peer{})
return cc.checkNetworkAdvanceDisconnect()
}

if !cc.connPerfMonitor.ComparePeers(outgoingPeers) {
// different set of peers. restart monitoring.
cc.connPerfMonitor.Reset(outgoingPeers)
}

// same set of peers.
peerStat := cc.connPerfMonitor.GetPeersStatistics()
if peerStat == nil {
// performance metrics are not yet ready.
return cc.checkNetworkAdvanceDisconnect()
}

// update peers with the performance metrics we've gathered.
var leastPerformingPeer *wsPeer = nil
for _, stat := range peerStat.peerStatistics {
wsPeer := stat.peer.(*wsPeer)
wsPeer.peerMessageDelay = stat.peerDelay
cc.log.Infof("network performance monitor - peer '%s' delay %d first message portion %d%%", wsPeer.GetAddress(), stat.peerDelay, int(stat.peerFirstMessage*100))
if wsPeer.throttledOutgoingConnection && leastPerformingPeer == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went down a rabbit hole trying to understand what "throttled connection" means and why a peer will be throttled and it was really helpful to read the PR description here: #668

basically, it is a "best of both worlds" to avoid clique formation but still allow for some peers to be performance-ranked, so relays keep half of their peers to be random, stable connections (don't disconnect unless an error occurs). the other half are performance-ranked and the least-performing peer is kicked out periodically.

it's not clear whether we need to cargo-cult all of this over to the P2PNetwork use case, but we can get rid of it once we are fully using pubsub for gossip..

a better name might be monitoredOutgoingConnection or performanceMonitoredOutgoingConnection perhaps

leastPerformingPeer = wsPeer
}
}
if leastPerformingPeer == nil {
return cc.checkNetworkAdvanceDisconnect()
}
cc.net.disconnect(leastPerformingPeer, disconnectLeastPerformingPeer)
cc.connPerfMonitor.Reset([]Peer{})

return true
}

// checkNetworkAdvanceDisconnect is using the lastNetworkAdvance indicator to see if the network is currently "stuck".
// if it's seems to be "stuck", a randomly picked peer would be disconnected.
func (cc *outgoingConnsCloser) checkNetworkAdvanceDisconnect() bool {
if cc.netAdvMonitor.lastAdvancedWithin(cc.cliqueResolveInterval) {
return false
}
outgoingPeers := cc.net.outgoingPeers()
if len(outgoingPeers) == 0 {
return false
}
if cc.net.numOutgoingPending() > 0 {
// we're currently trying to extend the list of outgoing connections. no need to
// disconnect any existing connection to free up room for another connection.
return false
}
var peer *wsPeer
disconnectPeerIdx := crypto.RandUint63() % uint64(len(outgoingPeers))
peer = outgoingPeers[disconnectPeerIdx].(*wsPeer)

cc.net.disconnect(peer, disconnectCliqueResolve)
cc.connPerfMonitor.Reset([]Peer{})
cc.net.OnNetworkAdvance()
return true
}

func (cc *outgoingConnsCloser) updateLastAdvance() {
cc.netAdvMonitor.updateLastAdvance()
}
84 changes: 82 additions & 2 deletions network/connPerfMon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/algorand/go-algorand/crypto"
"github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/protocol"
"github.com/algorand/go-algorand/test/partitiontest"
)
Expand Down Expand Up @@ -92,7 +93,7 @@ func BenchmarkConnMonitor(b *testing.B) {
}
}

func TestConnMonitorStageTiming(t *testing.T) {
func TestConnMonitor_StageTiming(t *testing.T) {
partitiontest.PartitionTest(t)

peers := []Peer{&wsPeer{}, &wsPeer{}, &wsPeer{}, &wsPeer{}}
Expand Down Expand Up @@ -130,7 +131,7 @@ func TestConnMonitorStageTiming(t *testing.T) {
}

}
func TestBucketsPruning(t *testing.T) {
func TestConnMonitor_BucketsPruning(t *testing.T) {
partitiontest.PartitionTest(t)

bucketsCount := 100
Expand Down Expand Up @@ -160,3 +161,82 @@ func TestBucketsPruning(t *testing.T) {
require.Equal(t, bucketsCount-i, len(perfMonitor.pendingMessagesBuckets))
}
}

type mockOutgoingNet struct {
peers []Peer
pending int
disconnectedPeer Peer
disconnectReason disconnectReason
advanceCalled bool
}

func (m *mockOutgoingNet) outgoingPeers() (peers []Peer) { return m.peers }
func (m *mockOutgoingNet) numOutgoingPending() int { return m.pending }
func (m *mockOutgoingNet) disconnect(badnode Peer, reason disconnectReason) {
m.disconnectedPeer = badnode
m.disconnectReason = reason
}
func (m *mockOutgoingNet) OnNetworkAdvance() { m.advanceCalled = true }

func TestConnMonitor_CheckExistingConnections_ThrottledPeers(t *testing.T) {
partitiontest.PartitionTest(t)
mon := makeConnectionPerformanceMonitor(nil)

p1 := &wsPeer{throttledOutgoingConnection: true}
mockNet := &mockOutgoingNet{peers: []Peer{p1}}
cc := makeOutgoingConnsCloser(logging.TestingLog(t), mockNet, mon, 100*time.Second)

res := cc.checkExistingConnectionsNeedDisconnecting(2)
require.False(t, res)
require.Nil(t, mockNet.disconnectedPeer)

p2 := &wsPeer{throttledOutgoingConnection: false} // not throttled
mockNet = &mockOutgoingNet{peers: []Peer{p1, p2}}
cc = makeOutgoingConnsCloser(logging.TestingLog(t), mockNet, mon, 100*time.Second)

mon.Reset(mockNet.peers)
mon.stage = pmStageStopped
mon.connectionDelay = map[Peer]int64{p1: 20, p2: 10}
mon.firstMessageCount = map[Peer]int64{p1: 1, p2: 2}
mon.msgCount = 3

res = cc.checkExistingConnectionsNeedDisconnecting(2)
require.True(t, res, "expected disconnect")
require.Equal(t, p1, mockNet.disconnectedPeer)
require.Equal(t, disconnectLeastPerformingPeer, mockNet.disconnectReason)
}

func TestConnMonitor_CheckExistingConnections_NoThrottledPeers(t *testing.T) {
partitiontest.PartitionTest(t)
mon := makeConnectionPerformanceMonitor(nil)
p1 := &wsPeer{throttledOutgoingConnection: false}
p2 := &wsPeer{throttledOutgoingConnection: false}
mockNet := &mockOutgoingNet{peers: []Peer{p1, p2}}
cc := makeOutgoingConnsCloser(logging.TestingLog(t), mockNet, mon, 0)
mon.Reset(mockNet.peers)
mon.stage = pmStageStopped
mon.connectionDelay = map[Peer]int64{p1: 5, p2: 6}
mon.firstMessageCount = map[Peer]int64{p1: 1, p2: 1}
mon.msgCount = 2

res := cc.checkExistingConnectionsNeedDisconnecting(2)
require.True(t, res)
require.NotNil(t, mockNet.disconnectedPeer)
require.NotEqual(t, disconnectLeastPerformingPeer, mockNet.disconnectReason)
}

func TestNetworkAdvanceMonitor(t *testing.T) {
partitiontest.PartitionTest(t)
m := makeNetworkAdvanceMonitor()

require.True(t, m.lastAdvancedWithin(500*time.Millisecond))

m.mu.Lock()
m.lastNetworkAdvance = time.Now().Add(-2 * time.Second)
m.mu.Unlock()
require.False(t, m.lastAdvancedWithin(500*time.Millisecond), "expected false after stale interval")

// update and verify within again
m.updateLastAdvance()
require.True(t, m.lastAdvancedWithin(500*time.Millisecond))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might need more time on actions workflow...can we make this up to a few seconds?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(and the other cases like it)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these two calls are within the same thread, imo this is unlikely but yes, possible. I suggest to leave as is and mock clocks only if it start failing

}
13 changes: 10 additions & 3 deletions network/hybridNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,14 @@ func NewHybridP2PNetwork(log logging.Logger, cfg config.Local, datadir string, p
var childWsNetMeshCreator MeshCreator = meshCreator
var childP2PNetMeshCreator MeshCreator = meshCreator
var hybridMeshCreator MeshCreator = noopMeshCreator{}
noMeshCreatorAndHybridServer := meshCreator == nil && cfg.IsHybridServer()
_, isHybridMeshCreator := meshCreator.(hybridRelayMeshCreator)
if meshCreator == nil && cfg.IsHybridServer() || isHybridMeshCreator {
if noMeshCreatorAndHybridServer || isHybridMeshCreator {
// no mesh creator provided and this node is a listening/relaying node
// then override and use hybrid relay meshing
// or, if a hybrid relay meshing requested explicitly, do the same
childWsNetMeshCreator = noopMeshCreator{}
childP2PNetMeshCreator = noopMeshPubSubFilteredCreator{}
childP2PNetMeshCreator = noopMeshCreator{}
hybridMeshCreator = hybridRelayMeshCreator{}
}

Expand All @@ -77,6 +78,7 @@ func NewHybridP2PNetwork(log logging.Logger, cfg config.Local, datadir string, p
}

hybridMesh, err := hybridMeshCreator.create(
withTargetConnCount(cfg.GossipFanout),
withWebsocketNetwork(wsnet),
withP2PNetwork(p2pnet))
if err != nil {
Expand Down Expand Up @@ -187,7 +189,12 @@ func (n *HybridP2PNetwork) RegisterHTTPHandlerFunc(path string, handlerFunc func
}

// RequestConnectOutgoing implements GossipNode
func (n *HybridP2PNetwork) RequestConnectOutgoing(replace bool, quit <-chan struct{}) {}
func (n *HybridP2PNetwork) RequestConnectOutgoing(replace bool, quit <-chan struct{}) {
_ = n.runParallel(func(net GossipNode) error {
net.RequestConnectOutgoing(replace, quit)
return nil
})
}

// GetPeers implements GossipNode
func (n *HybridP2PNetwork) GetPeers(options ...PeerOption) []Peer {
Expand Down
2 changes: 2 additions & 0 deletions network/hybridNetwork_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func TestHybridNetwork_DuplicateConn(t *testing.T) {

cfg := config.GetDefaultLocal()
cfg.EnableP2PHybridMode = true
cfg.DNSBootstrapID = ""
log := logging.TestingLog(t)
const p2pKeyDir = ""

Expand Down Expand Up @@ -208,6 +209,7 @@ func TestHybridNetwork_HybridRelayStrategy(t *testing.T) {

cfg := config.GetDefaultLocal()
cfg.EnableP2PHybridMode = true
cfg.DNSBootstrapID = ""
log := logging.TestingLog(t)

genesisInfo := GenesisInfo{genesisID, "net"}
Expand Down
Loading
Loading