-
Notifications
You must be signed in to change notification settings - Fork 518
network: wsnet with p2p backup meshing strategy #6391
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 22 commits
0a73b70
bd1e846
4ba908c
a1d6ad9
e935a44
dd1293d
6f4e8e8
7c8cc68
7187dfe
ca0ead5
2507146
706cab0
f6f9d06
580d7ce
e62c8dc
f3fa8ad
8a367f8
1bc13d1
bd39dc9
e874534
73522ea
5337292
0450f23
4127019
967b5af
9465158
834d13f
94825d5
c7a355f
7378c85
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,6 +24,7 @@ import ( | |
| "github.com/stretchr/testify/require" | ||
|
|
||
| "github.com/algorand/go-algorand/crypto" | ||
| "github.com/algorand/go-algorand/logging" | ||
| "github.com/algorand/go-algorand/protocol" | ||
| "github.com/algorand/go-algorand/test/partitiontest" | ||
| ) | ||
|
|
@@ -92,7 +93,7 @@ func BenchmarkConnMonitor(b *testing.B) { | |
| } | ||
| } | ||
|
|
||
| func TestConnMonitorStageTiming(t *testing.T) { | ||
| func TestConnMonitor_StageTiming(t *testing.T) { | ||
| partitiontest.PartitionTest(t) | ||
|
|
||
| peers := []Peer{&wsPeer{}, &wsPeer{}, &wsPeer{}, &wsPeer{}} | ||
|
|
@@ -130,7 +131,7 @@ func TestConnMonitorStageTiming(t *testing.T) { | |
| } | ||
|
|
||
| } | ||
| func TestBucketsPruning(t *testing.T) { | ||
| func TestConnMonitor_BucketsPruning(t *testing.T) { | ||
| partitiontest.PartitionTest(t) | ||
|
|
||
| bucketsCount := 100 | ||
|
|
@@ -160,3 +161,82 @@ func TestBucketsPruning(t *testing.T) { | |
| require.Equal(t, bucketsCount-i, len(perfMonitor.pendingMessagesBuckets)) | ||
| } | ||
| } | ||
|
|
||
| type mockOutgoingNet struct { | ||
| peers []Peer | ||
| pending int | ||
| disconnectedPeer Peer | ||
| disconnectReason disconnectReason | ||
| advanceCalled bool | ||
| } | ||
|
|
||
| func (m *mockOutgoingNet) outgoingPeers() (peers []Peer) { return m.peers } | ||
| func (m *mockOutgoingNet) numOutgoingPending() int { return m.pending } | ||
| func (m *mockOutgoingNet) disconnect(badnode Peer, reason disconnectReason) { | ||
| m.disconnectedPeer = badnode | ||
| m.disconnectReason = reason | ||
| } | ||
| func (m *mockOutgoingNet) OnNetworkAdvance() { m.advanceCalled = true } | ||
|
|
||
| func TestConnMonitor_CheckExistingConnections_ThrottledPeers(t *testing.T) { | ||
algorandskiy marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| partitiontest.PartitionTest(t) | ||
| mon := makeConnectionPerformanceMonitor(nil) | ||
|
|
||
| p1 := &wsPeer{throttledOutgoingConnection: true} | ||
| mockNet := &mockOutgoingNet{peers: []Peer{p1}} | ||
| cc := makeOutgoingConnsCloser(logging.TestingLog(t), mockNet, mon, 100*time.Second) | ||
|
|
||
| res := cc.checkExistingConnectionsNeedDisconnecting(2) | ||
| require.False(t, res) | ||
| require.Nil(t, mockNet.disconnectedPeer) | ||
|
|
||
| p2 := &wsPeer{throttledOutgoingConnection: false} // not throttled | ||
| mockNet = &mockOutgoingNet{peers: []Peer{p1, p2}} | ||
| cc = makeOutgoingConnsCloser(logging.TestingLog(t), mockNet, mon, 100*time.Second) | ||
|
|
||
| mon.Reset(mockNet.peers) | ||
| mon.stage = pmStageStopped | ||
| mon.connectionDelay = map[Peer]int64{p1: 20, p2: 10} | ||
| mon.firstMessageCount = map[Peer]int64{p1: 1, p2: 2} | ||
| mon.msgCount = 3 | ||
|
|
||
| res = cc.checkExistingConnectionsNeedDisconnecting(2) | ||
| require.True(t, res, "expected disconnect") | ||
| require.Equal(t, p1, mockNet.disconnectedPeer) | ||
| require.Equal(t, disconnectLeastPerformingPeer, mockNet.disconnectReason) | ||
| } | ||
|
|
||
| func TestConnMonitor_CheckExistingConnections_NoThrottledPeers(t *testing.T) { | ||
| partitiontest.PartitionTest(t) | ||
| mon := makeConnectionPerformanceMonitor(nil) | ||
| p1 := &wsPeer{throttledOutgoingConnection: false} | ||
| p2 := &wsPeer{throttledOutgoingConnection: false} | ||
| mockNet := &mockOutgoingNet{peers: []Peer{p1, p2}} | ||
| cc := makeOutgoingConnsCloser(logging.TestingLog(t), mockNet, mon, 0) | ||
| mon.Reset(mockNet.peers) | ||
| mon.stage = pmStageStopped | ||
| mon.connectionDelay = map[Peer]int64{p1: 5, p2: 6} | ||
| mon.firstMessageCount = map[Peer]int64{p1: 1, p2: 1} | ||
| mon.msgCount = 2 | ||
|
|
||
| res := cc.checkExistingConnectionsNeedDisconnecting(2) | ||
| require.True(t, res) | ||
| require.NotNil(t, mockNet.disconnectedPeer) | ||
| require.NotEqual(t, disconnectLeastPerformingPeer, mockNet.disconnectReason) | ||
| } | ||
|
|
||
| func TestNetworkAdvanceMonitor(t *testing.T) { | ||
| partitiontest.PartitionTest(t) | ||
| m := makeNetworkAdvanceMonitor() | ||
|
|
||
| require.True(t, m.lastAdvancedWithin(500*time.Millisecond)) | ||
|
|
||
| m.mu.Lock() | ||
| m.lastNetworkAdvance = time.Now().Add(-2 * time.Second) | ||
| m.mu.Unlock() | ||
| require.False(t, m.lastAdvancedWithin(500*time.Millisecond), "expected false after stale interval") | ||
|
|
||
| // update and verify within again | ||
| m.updateLastAdvance() | ||
| require.True(t, m.lastAdvancedWithin(500*time.Millisecond)) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Might need more time on actions workflow...can we make this up to a few seconds? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (and the other cases like it) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. these two calls are within the same thread, imo this is unlikely but yes, possible. I suggest to leave as is and mock clocks only if it start failing |
||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went down a rabbit hole trying to understand what "throttled connection" means and why a peer will be throttled and it was really helpful to read the PR description here: #668
basically, it is a "best of both worlds" to avoid clique formation but still allow for some peers to be performance-ranked, so relays keep half of their peers to be random, stable connections (don't disconnect unless an error occurs). the other half are performance-ranked and the least-performing peer is kicked out periodically.
it's not clear whether we need to cargo-cult all of this over to the P2PNetwork use case, but we can get rid of it once we are fully using pubsub for gossip..
a better name might be
monitoredOutgoingConnectionorperformanceMonitoredOutgoingConnectionperhaps