From c0aba4efbba9d2e53beb359de91c7469c782aa1d Mon Sep 17 00:00:00 2001 From: Haiko Schol Date: Thu, 19 Jun 2025 19:24:55 +0700 Subject: [PATCH 01/12] feat(statement-distribution): Implement handleIncomingManifestCommon() --- .../statement-distribution/grid_tracker.go | 13 +- .../session_topology_view.go | 26 +++ .../session_topology_view_test.go | 74 ++++++ .../statement-distribution/state_v2.go | 31 ++- .../statement_distribution.go | 212 +++++++++++++++++- 5 files changed, 344 insertions(+), 12 deletions(-) diff --git a/dot/parachain/statement-distribution/grid_tracker.go b/dot/parachain/statement-distribution/grid_tracker.go index 5bca5c9c87..bbffe1607a 100644 --- a/dot/parachain/statement-distribution/grid_tracker.go +++ b/dot/parachain/statement-distribution/grid_tracker.go @@ -23,6 +23,17 @@ const ( acknowledgement ) +func (mk manifestKind) String() string { + switch mk { + case full: + return "full" + case acknowledgement: + return "acknowledgement" + default: + panic("unreachable") + } +} + type validatorGroupPair struct { validator parachaintypes.ValidatorIndex group parachaintypes.GroupIndex @@ -95,7 +106,7 @@ func (g *gridTracker) importManifest( //skipcq: GO-R1005 sessionTopology *sessionTopologyView, groups groups, candidateHash parachaintypes.CandidateHash, - secondingLimit uint, //nolint:unparam + secondingLimit uint, manifest manifestSummary, kind manifestKind, sender parachaintypes.ValidatorIndex, diff --git a/dot/parachain/statement-distribution/session_topology_view.go b/dot/parachain/statement-distribution/session_topology_view.go index 09ac308bad..1166c5950e 100644 --- a/dot/parachain/statement-distribution/session_topology_view.go +++ b/dot/parachain/statement-distribution/session_topology_view.go @@ -4,6 +4,7 @@ package statementdistribution import ( + "iter" "maps" "slices" @@ -51,6 +52,31 @@ func newSessionTopologyView() *sessionTopologyView { } } +// iterSendingForGroup returns an iterator over all validator indices from the group who are allowed to +// send us manifests of the given kind. +func (stv *sessionTopologyView) iterSendingForGroup( + group parachaintypes.GroupIndex, + kind manifestKind, +) iter.Seq[parachaintypes.ValidatorIndex] { + return func(yield func(parachaintypes.ValidatorIndex) bool) { + groupView, ok := stv.groupViews[group] + if !ok { + return + } + + validators := groupView.receiving + if kind == acknowledgement { + validators = groupView.sending + } + + for validatorIndex := range validators { + if !yield(validatorIndex) { + return + } + } + } +} + func (stv *sessionTopologyView) clone() *sessionTopologyView { groupViews := make(map[parachaintypes.GroupIndex]groupSubView) diff --git a/dot/parachain/statement-distribution/session_topology_view_test.go b/dot/parachain/statement-distribution/session_topology_view_test.go index 5bab325ea1..19ec516786 100644 --- a/dot/parachain/statement-distribution/session_topology_view_test.go +++ b/dot/parachain/statement-distribution/session_topology_view_test.go @@ -149,3 +149,77 @@ func TestBuildSessionTopology(t *testing.T) { ) }) } + +func TestSessionTopologyView_IterSendingForGroup(t *testing.T) { + t.Parallel() + + t.Run("non_existent_group", func(t *testing.T) { + t.Parallel() + + view := newSessionTopologyView() + + var results []parachaintypes.ValidatorIndex + for i := range view.iterSendingForGroup(42, full) { + results = append(results, i) + } + + // Should be empty since group doesn't exist + require.Empty(t, results) + }) + + t.Run("manifestKind_full", func(t *testing.T) { + t.Parallel() + + view := newSessionTopologyView() + + groupIdx := parachaintypes.GroupIndex(1) + view.groupViews[groupIdx] = groupSubView{ + sending: map[parachaintypes.ValidatorIndex]struct{}{ + 5: {}, + 7: {}, + }, + receiving: map[parachaintypes.ValidatorIndex]struct{}{ + 1: {}, + 2: {}, + 3: {}, + }, + } + + var results []parachaintypes.ValidatorIndex + for i := range view.iterSendingForGroup(groupIdx, full) { + results = append(results, i) + } + + // For full manifest kind, should return indices from receiving map + require.Len(t, results, 3) + require.ElementsMatch(t, []parachaintypes.ValidatorIndex{1, 2, 3}, results) + }) + + t.Run("manifestKind_acknowledgement", func(t *testing.T) { + t.Parallel() + + view := newSessionTopologyView() + + groupIdx := parachaintypes.GroupIndex(1) + view.groupViews[groupIdx] = groupSubView{ + sending: map[parachaintypes.ValidatorIndex]struct{}{ + 5: {}, + 7: {}, + }, + receiving: map[parachaintypes.ValidatorIndex]struct{}{ + 1: {}, + 2: {}, + 3: {}, + }, + } + + var results []parachaintypes.ValidatorIndex + for i := range view.iterSendingForGroup(groupIdx, acknowledgement) { + results = append(results, i) + } + + // For acknowledgement kind, should return indices from sending map + require.Len(t, results, 2) + require.ElementsMatch(t, []parachaintypes.ValidatorIndex{5, 7}, results) + }) +} diff --git a/dot/parachain/statement-distribution/state_v2.go b/dot/parachain/statement-distribution/state_v2.go index bcca94ceab..fac517270c 100644 --- a/dot/parachain/statement-distribution/state_v2.go +++ b/dot/parachain/statement-distribution/state_v2.go @@ -37,12 +37,13 @@ type statementStore interface { // skipcq:SCC-U1000 type perRelayParentState struct { localValidator *localValidatorStore - statementStore statementStore // TODO #4719: Create statement store + statementStore statementStore // TODO: Use statement store impl secondingLimit uint session parachaintypes.SessionIndex transposedClaimQueue parachaintypes.TransposedClaimQueue groupsPerPara map[parachaintypes.ParaID][]parachaintypes.GroupIndex disabledValidators map[parachaintypes.ValidatorIndex]struct{} + assignmentsPerGroup map[parachaintypes.GroupIndex][]parachaintypes.ParaID } // isDisabled returns `true` if the given validator is disabled in the context of the relay parent. @@ -51,7 +52,7 @@ func (p *perRelayParentState) isDisabled(vIdx parachaintypes.ValidatorIndex) boo return ok } -func (p *perRelayParentState) disableBitmask(group []parachaintypes.ValidatorIndex) (parachaintypes.BitVec, error) { +func (p *perRelayParentState) disabledBitmask(group []parachaintypes.ValidatorIndex) (parachaintypes.BitVec, error) { disableBm := make([]bool, len(group)) for idx, v := range group { disableBm[idx] = p.isDisabled(v) @@ -83,7 +84,7 @@ type perSessionState struct { sessionInfo parachaintypes.SessionInfo groups *groups authLookup map[parachaintypes.AuthorityDiscoveryID]parachaintypes.ValidatorIndex - gridView any // TODO: use SessionTopologyView from statement-distribution grid (#4576) + gridView *sessionTopologyView // when localValidator is nil means it is inactive localValidator *parachaintypes.ValidatorIndex @@ -123,14 +124,17 @@ func newPerSessionState(sessionInfo parachaintypes.SessionInfo, // discovery being a superset of the active validators for consensus. // skipcq:SCC-U1000 func (s *perSessionState) supplyTopology(topology *grid.SessionGridTopology, localIdx *parachaintypes.ValidatorIndex) { - // TODO #4373: implement once buildSessionTopology is done - // gridView := buildSessionTopology( - // s.sessionInfo.ValidatorGroups, - // topology, - // localIdx, - // ) + gridView, err := buildSessionTopology( + s.sessionInfo.ValidatorGroups, + topology, + localIdx, + ) + if err != nil { + logger.Errorf("Failed to build sessionTopologyView for validator index %d: %s", localIdx, err) + return + } - // s.gridView = gridView + s.gridView = gridView logger.Infof( "Node uses the following topology indices: "+ @@ -138,6 +142,13 @@ func (s *perSessionState) supplyTopology(topology *grid.SessionGridTopology, loc localIdx, s.localValidator) } +// isNotValidator returns `true` if local is neither active or inactive validator node. +// +// `false` is also returned if session topology is not known yet. +func (s *perSessionState) isNotValidator() bool { + return s.gridView != nil && s.localValidator == nil +} + // skipcq:SCC-U1000 type peerState struct { view parachaintypes.View diff --git a/dot/parachain/statement-distribution/statement_distribution.go b/dot/parachain/statement-distribution/statement_distribution.go index 984f554ec0..fafa51b7f1 100644 --- a/dot/parachain/statement-distribution/statement_distribution.go +++ b/dot/parachain/statement-distribution/statement_distribution.go @@ -8,6 +8,7 @@ import ( "context" "errors" "fmt" + "slices" "time" "github.com/ChainSafe/gossamer/dot/parachain/backing" @@ -26,6 +27,48 @@ var ( errEmptyGroup = errors.New("group of validators empty") ) +var ( + costConflictingManifest = parachainutil.UnifiedReputationChange{ //nolint:unused + Type: parachainutil.CostMajor, + Reason: "Manifest conflicts with previous", + } + + costMalformedManifest = parachainutil.UnifiedReputationChange{ //nolint:unused + Type: parachainutil.CostMajor, + Reason: "Manifest is malformed", + } + + costInsufficientManifest = parachainutil.UnifiedReputationChange{ //nolint:unused + Type: parachainutil.CostMajor, + Reason: "Manifest statements insufficient to back candidate", + } + + costUnexpectedManifestDisallowed = parachainutil.UnifiedReputationChange{ //nolint:unused + Type: parachainutil.CostMinor, + Reason: "Unexpected Manifest, Peer Disallowed", + } + + costUnexpectedManifestMissingKnowledge = parachainutil.UnifiedReputationChange{ //nolint:unused + Type: parachainutil.CostMinor, + Reason: "Unexpected Manifest, missing knowledge for relay parent", + } + + costUnexpectedManifestPeerUnknown = parachainutil.UnifiedReputationChange{ //nolint:unused + Type: parachainutil.CostMinor, + Reason: "Unexpected Manifest, Peer Unknown", + } + + costExcessiveSeconded = parachainutil.UnifiedReputationChange{ //nolint:unused + Type: parachainutil.CostMinor, + Reason: "Sent Excessive `Seconded` Statements", + } + + costInaccurateAdvertisement = parachainutil.UnifiedReputationChange{ //nolint:unused + Type: parachainutil.CostMajor, + Reason: "Peer advertised a candidate inaccurately", + } +) + var logger = log.NewFromGlobal(log.AddContext("pkg", "parachain-statement-distribution")) type StatementDistribution struct { @@ -292,6 +335,172 @@ func (s *StatementDistribution) sendBackingFreshStatements( return nil } +type manifestImportSuccess struct { //nolint:unused + relayParentState perRelayParentState + perSession perSessionState + acknowledge bool + senderIndex parachaintypes.ValidatorIndex +} + +// handleIncomingManifestCommon handles the common part of incoming manifests of both types (full & acknowledgement) +// +// Basic sanity checks around data, importing the manifest into the grid tracker, finding the +// sending peer's validator index, reporting the peer for any misbehaviour, etc. +func (s *StatementDistribution) handleIncomingManifestCommon( //nolint:unused + peer peer.ID, + peers map[peer.ID]peerState, + perRelayParent map[common.Hash]perRelayParentState, + perSession map[parachaintypes.SessionIndex]perSessionState, + candidates candidates, + candidateHash parachaintypes.CandidateHash, + relayParent common.Hash, + paraID parachaintypes.ParaID, + manifestSummary manifestSummary, + manifestKind manifestKind, + reputation *parachainutil.ReputationAggregator, +) *manifestImportSuccess { + // 1. sanity checks: peer is connected, relay-parent in state, para ID matches group index. + peerState, ok := peers[peer] + if !ok { + return nil + } + + relayParentState, ok := perRelayParent[relayParent] + if !ok { + reputation.Modify(s.SubSystemToOverseer, peer, costUnexpectedManifestMissingKnowledge) + return nil + } + + perSessionEntry, ok := perSession[relayParentState.session] + if !ok { + return nil + } + + if relayParentState.localValidator == nil { + if perSessionEntry.isNotValidator() { + reputation.Modify(s.SubSystemToOverseer, peer, costUnexpectedManifestMissingKnowledge) + } + return nil + } + + expectedGroups, ok := relayParentState.groupsPerPara[paraID] + if !ok { + reputation.Modify(s.SubSystemToOverseer, peer, costMalformedManifest) + return nil + } + + if !slices.Contains(expectedGroups, manifestSummary.claimedGroupIndex) { + reputation.Modify(s.SubSystemToOverseer, peer, costMalformedManifest) + return nil + } + + gridTopology := perSessionEntry.gridView + if gridTopology == nil { + return nil + } + + var senderIndex *parachaintypes.ValidatorIndex + for idx := range gridTopology.iterSendingForGroup(manifestSummary.claimedGroupIndex, manifestKind) { + if int(idx) >= len(perSessionEntry.sessionInfo.DiscoveryKeys) { + continue + } + + ad := perSessionEntry.sessionInfo.DiscoveryKeys[idx] + if peerState.isAuthority(ad) { + senderIndex = &idx + break + } + } + + if senderIndex == nil { + reputation.Modify(s.SubSystemToOverseer, peer, costUnexpectedManifestPeerUnknown) + return nil + } + + // 2. sanity checks: peer is validator, bitvec size, import into grid tracker + + // Ignore votes from disabled validators when counting towards the threshold. + groupIndex := manifestSummary.claimedGroupIndex + group := perSessionEntry.groups.get(groupIndex) + disabledMask, err := relayParentState.disabledBitmask(group) + if err != nil { + logger.Criticalf("perRelayParentState.disabledBitmask() failed in handleIncomingManifestCommon(): %s", err) + return nil + } + + manifestSummary.statementKnowledge.MaskSeconded(disabledMask) + manifestSummary.statementKnowledge.MaskValid(disabledMask) + + assignments, ok := relayParentState.assignmentsPerGroup[groupIndex] + if !ok { + return nil + } + + var secondingLimit uint + for _, pID := range assignments { + if pID == paraID { + secondingLimit += 1 + } + } + + localValidator := *relayParentState.localValidator // non-nil check already done above + acknowledge, err := localValidator.gridTracker.importManifest( + gridTopology, + *perSessionEntry.groups, + candidateHash, + secondingLimit, + manifestSummary, + manifestKind, + *senderIndex, + ) + switch err { + case errManifestImportConflicting: + reputation.Modify(s.SubSystemToOverseer, peer, costConflictingManifest) + return nil + case errManifestImportOverflow: + reputation.Modify(s.SubSystemToOverseer, peer, costExcessiveSeconded) + return nil + case errManifestImportInsufficient: + reputation.Modify(s.SubSystemToOverseer, peer, costInsufficientManifest) + return nil + case errManifestImportMalformed: + reputation.Modify(s.SubSystemToOverseer, peer, costMalformedManifest) + return nil + case errManifestImportDisallowed: + reputation.Modify(s.SubSystemToOverseer, peer, costUnexpectedManifestDisallowed) + return nil + default: + } + + // 3. if accepted by grid, insert as unconfirmed. + if err = candidates.insertUnconfirmed( + peer, + candidateHash, + relayParent, + groupIndex, + &hashAndParaID{manifestSummary.claimedParentHash, paraID}, + ); errors.Is(err, errBadAdvertisement) { + reputation.Modify(s.SubSystemToOverseer, peer, costInaccurateAdvertisement) + return nil + } + + if acknowledge { + logger.Tracef( + "immediate ack, known candidate: candidateHash=%s, from=%d, localIndex=%d, manifestKind=%s", + candidateHash.String(), + *senderIndex, + *perSessionEntry.localValidator, + manifestKind.String(), + ) + } + return &manifestImportSuccess{ + relayParentState: relayParentState, + perSession: perSessionEntry, + acknowledge: acknowledge, + senderIndex: *senderIndex, + } +} + // compareAndConvert ensure the original compact statement matches // the same encoding as the converted statement and transforms the // converted statement into a SignedFullStatementWithPVD with the @@ -417,7 +626,8 @@ func postAcknowledgementStatementMessages( groups *groups, groupIndex parachaintypes.GroupIndex, candidateHash parachaintypes.CandidateHash, - peerID peer.ID, validationVersion validationprotocol.ValidationVersion, + peerID peer.ID, + validationVersion validationprotocol.ValidationVersion, ) []*networkbridgemessages.SendValidationMessage { sendingFilter := gridTracker.pendingStatementsFor(recipient, candidateHash) if sendingFilter == nil { From 8abae0001bc8d3e7243178ead713c2bd54fd2d4e Mon Sep 17 00:00:00 2001 From: Haiko Schol Date: Thu, 19 Jun 2025 20:28:33 +0700 Subject: [PATCH 02/12] remove statementStore interface --- .../mocks_generate_test.go | 1 - .../mocks_statement_store_test.go | 19 +- .../statement-distribution/state_v2.go | 18 +- .../statement_distribution.go | 14 +- .../statement_distribution_test.go | 269 +++++++++--------- .../statement-distribution/statement_store.go | 22 +- 6 files changed, 167 insertions(+), 176 deletions(-) diff --git a/dot/parachain/statement-distribution/mocks_generate_test.go b/dot/parachain/statement-distribution/mocks_generate_test.go index 284b640ea1..2f2fadc987 100644 --- a/dot/parachain/statement-distribution/mocks_generate_test.go +++ b/dot/parachain/statement-distribution/mocks_generate_test.go @@ -4,5 +4,4 @@ package statementdistribution //go:generate mockgen -destination=mocks_implicitview_test.go -package=$GOPACKAGE github.com/ChainSafe/gossamer/dot/parachain/util ImplicitView -//go:generate mockgen -destination=mocks_statement_store_test.go -package=$GOPACKAGE . statementStore //go:generate mockgen -destination=mocks_candidates_store_test.go -package=$GOPACKAGE . candidatesStore diff --git a/dot/parachain/statement-distribution/mocks_statement_store_test.go b/dot/parachain/statement-distribution/mocks_statement_store_test.go index 9574ae1a78..9d6814f458 100644 --- a/dot/parachain/statement-distribution/mocks_statement_store_test.go +++ b/dot/parachain/statement-distribution/mocks_statement_store_test.go @@ -53,10 +53,10 @@ func (mr *MockstatementStoreMockRecorder) fillStatementFilter(arg0, arg1, arg2 a } // freshStatementsForBacking mocks base method. -func (m *MockstatementStore) freshStatementsForBacking(validators []parachaintypes.ValidatorIndex, candidateHash parachaintypes.CandidateHash) []parachaintypes.SignedStatement { +func (m *MockstatementStore) freshStatementsForBacking(validators []parachaintypes.ValidatorIndex, candidateHash parachaintypes.CandidateHash) []*parachaintypes.SignedStatement { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "freshStatementsForBacking", validators, candidateHash) - ret0, _ := ret[0].([]parachaintypes.SignedStatement) + ret0, _ := ret[0].([]*parachaintypes.SignedStatement) return ret0 } @@ -67,10 +67,10 @@ func (mr *MockstatementStoreMockRecorder) freshStatementsForBacking(validators, } // groupStatements mocks base method. -func (m *MockstatementStore) groupStatements(arg0 *groups, arg1 parachaintypes.GroupIndex, arg2 parachaintypes.CandidateHash, arg3 *parachaintypes.StatementFilter) []parachaintypes.SignedStatement { +func (m *MockstatementStore) groupStatements(arg0 *groups, arg1 parachaintypes.GroupIndex, arg2 parachaintypes.CandidateHash, arg3 *parachaintypes.StatementFilter) []*parachaintypes.SignedStatement { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "groupStatements", arg0, arg1, arg2, arg3) - ret0, _ := ret[0].([]parachaintypes.SignedStatement) + ret0, _ := ret[0].([]*parachaintypes.SignedStatement) return ret0 } @@ -93,15 +93,16 @@ func (mr *MockstatementStoreMockRecorder) noteKnownByBacking(arg0, arg1 any) *go } // validatorStatement mocks base method. -func (m *MockstatementStore) validatorStatement(stmt originatorStatementPair) *parachaintypes.SignedStatement { +func (m *MockstatementStore) validatorStatement(validatorIndex parachaintypes.ValidatorIndex, statement parachaintypes.CompactStatement) (*parachaintypes.SignedStatement, bool) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "validatorStatement", stmt) + ret := m.ctrl.Call(m, "validatorStatement", validatorIndex, statement) ret0, _ := ret[0].(*parachaintypes.SignedStatement) - return ret0 + ret1, _ := ret[1].(bool) + return ret0, ret1 } // validatorStatement indicates an expected call of validatorStatement. -func (mr *MockstatementStoreMockRecorder) validatorStatement(stmt any) *gomock.Call { +func (mr *MockstatementStoreMockRecorder) validatorStatement(validatorIndex, statement any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "validatorStatement", reflect.TypeOf((*MockstatementStore)(nil).validatorStatement), stmt) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "validatorStatement", reflect.TypeOf((*MockstatementStore)(nil).validatorStatement), validatorIndex, statement) } diff --git a/dot/parachain/statement-distribution/state_v2.go b/dot/parachain/statement-distribution/state_v2.go index fac517270c..e156b5edb2 100644 --- a/dot/parachain/statement-distribution/state_v2.go +++ b/dot/parachain/statement-distribution/state_v2.go @@ -18,26 +18,10 @@ type candidatesStore interface { getConfirmed(candidateHash parachaintypes.CandidateHash) (*confirmedCandidate, bool) } -type statementStore interface { - validatorStatement(stmt originatorStatementPair) *parachaintypes.SignedStatement - - // freshStatementsForBacking provides a list of all statements marked as being - // unknown by the backing subsystem. This provides `Seconded` statements prior to `Valid` statements. - freshStatementsForBacking(validators []parachaintypes.ValidatorIndex, - candidateHash parachaintypes.CandidateHash) []parachaintypes.SignedStatement - noteKnownByBacking(parachaintypes.ValidatorIndex, parachaintypes.CompactStatement) - fillStatementFilter(parachaintypes.GroupIndex, parachaintypes.CandidateHash, *parachaintypes.StatementFilter) - // Get an iterator over stored signed statements by the group conforming to the - // given filter. - // Seconded statements are provided first. - groupStatements(*groups, parachaintypes.GroupIndex, parachaintypes.CandidateHash, - *parachaintypes.StatementFilter) []parachaintypes.SignedStatement -} - // skipcq:SCC-U1000 type perRelayParentState struct { localValidator *localValidatorStore - statementStore statementStore // TODO: Use statement store impl + statementStore *statementStore secondingLimit uint session parachaintypes.SessionIndex transposedClaimQueue parachaintypes.TransposedClaimQueue diff --git a/dot/parachain/statement-distribution/statement_distribution.go b/dot/parachain/statement-distribution/statement_distribution.go index fafa51b7f1..76673f3112 100644 --- a/dot/parachain/statement-distribution/statement_distribution.go +++ b/dot/parachain/statement-distribution/statement_distribution.go @@ -312,7 +312,7 @@ func (s *StatementDistribution) sendBackingFreshStatements( panic(fmt.Sprintf("unexpected error setting Statement VDT: %s", err.Error())) } - signed, err := compareAndConvert(freshStmt, convertedStmt, withPVD) + signed, err := compareAndConvert(*freshStmt, convertedStmt, withPVD) if err != nil { return fmt.Errorf("comparing and converting stmt: %w", err) } @@ -544,7 +544,7 @@ func localKnowledgeFilter( groupSize int, groupIndex parachaintypes.GroupIndex, candidateHash parachaintypes.CandidateHash, - statementStore statementStore, + statementStore *statementStore, // Store, ) (*parachaintypes.StatementFilter, error) { f, err := parachaintypes.NewStatementFilter(uint(groupSize), false) if err != nil { @@ -622,7 +622,7 @@ func postAcknowledgementStatementMessages( recipient parachaintypes.ValidatorIndex, rp common.Hash, gridTracker *gridTracker, - stmtStore statementStore, + stmtStore *statementStore, groups *groups, groupIndex parachaintypes.GroupIndex, candidateHash parachaintypes.CandidateHash, @@ -648,7 +648,7 @@ func postAcknowledgementStatementMessages( stmtMessage := validationprotocol.NewStatementDistributionMessage() err := stmtMessage.SetValue(validationprotocol.Statement{ RelayParent: rp, - Compact: parachaintypes.UncheckedSignedCompactStatement(stmt), + Compact: parachaintypes.UncheckedSignedCompactStatement(*stmt), }) if err != nil { panic(fmt.Sprintf("failed while defining enum variant: %s", err.Error())) @@ -671,14 +671,14 @@ func postAcknowledgementStatementMessages( } func pendingStatementNetworkMessage( - stmtStore statementStore, + stmtStore *statementStore, rp common.Hash, peerID peer.ID, validationVersion validationprotocol.ValidationVersion, pending originatorStatementPair, ) *networkbridgemessages.SendValidationMessage { if validationVersion == validationprotocol.ValidationVersionV3 { - signed := stmtStore.validatorStatement(pending) - if signed == nil { + signed, known := stmtStore.validatorStatement(pending.validatorIndex, pending.statement) + if !known || signed == nil { return nil } diff --git a/dot/parachain/statement-distribution/statement_distribution_test.go b/dot/parachain/statement-distribution/statement_distribution_test.go index 271b041e8a..16c18571a4 100644 --- a/dot/parachain/statement-distribution/statement_distribution_test.go +++ b/dot/parachain/statement-distribution/statement_distribution_test.go @@ -19,8 +19,6 @@ import ( func TestSendBackingFreshStatements(t *testing.T) { t.Run("should_send_3_statements_to_backing", func(t *testing.T) { - ctrl := gomock.NewController(t) - relayParent := common.Hash{0x01} groupIndex := parachaintypes.GroupIndex(0) @@ -35,14 +33,16 @@ func TestSendBackingFreshStatements(t *testing.T) { validators, {3, 4, 5}, } + + groups := newGroups(initGroups, 1) + sessionState := &perSessionState{ - groups: newGroups(initGroups, 1), + groups: groups, } - stmtStore := NewMockstatementStore(ctrl) - stmtStore.EXPECT(). - freshStatementsForBacking(validators, candidateHash). - Return([]parachaintypes.SignedStatement{ + stmtStore := newStatementStore(groups) + { + stmts := []parachaintypes.SignedStatement{ parachaintypes.SignedStatement( parachaintypes.UncheckedSignedCompactStatement{ ValidatorIndex: parachaintypes.ValidatorIndex(0), @@ -64,17 +64,14 @@ func TestSendBackingFreshStatements(t *testing.T) { Signature: parachaintypes.ValidatorSignature{0x05, 0x05, 0x05}, }, ), - }) - - stmtStore.EXPECT(). - noteKnownByBacking(parachaintypes.ValidatorIndex(0), - parachaintypes.NewCompactSeconded(candidateHash)) - stmtStore.EXPECT(). - noteKnownByBacking(parachaintypes.ValidatorIndex(1), - parachaintypes.NewCompactValid(candidateHash)) - stmtStore.EXPECT(). - noteKnownByBacking(parachaintypes.ValidatorIndex(2), - parachaintypes.NewCompactValid(candidateHash)) + } + + for _, stmt := range stmts { + newStmt, err := stmtStore.insert(groups, &stmt, statementOriginRemote) + require.NoError(t, err) + require.True(t, newStmt) + } + } relayParentState := &perRelayParentState{ statementStore: stmtStore, @@ -152,8 +149,6 @@ func TestSendBackingFreshStatements(t *testing.T) { }) t.Run("should_fail_when_confirmed_candidate_does_not_match", func(t *testing.T) { - ctrl := gomock.NewController(t) - relayParent := common.Hash{0x01} groupIndex := parachaintypes.GroupIndex(0) @@ -162,25 +157,33 @@ func TestSendBackingFreshStatements(t *testing.T) { require.NoError(t, err) candidateHash := parachaintypes.CandidateHash{Value: h} + groups := newGroups([][]parachaintypes.ValidatorIndex{{0, 1, 2}}, 1) + sessionState := &perSessionState{ - groups: newGroups([][]parachaintypes.ValidatorIndex{{0, 1, 2}}, 1), + groups: groups, } - stmtStore := NewMockstatementStore(ctrl) + stmtStore := newStatementStore(groups) // returning a compact statement with a candidate hash // that when encoded generates a different encoding from the // confirmed candidate we passed to sendBackingFreshStatements - stmtStore.EXPECT(). - freshStatementsForBacking([]parachaintypes.ValidatorIndex{0, 1, 2}, candidateHash). - Return([]parachaintypes.SignedStatement{ - parachaintypes.SignedStatement( - parachaintypes.UncheckedSignedCompactStatement{ - ValidatorIndex: parachaintypes.ValidatorIndex(0), - Payload: *parachaintypes.NewCompactSeconded( - parachaintypes.CandidateHash{Value: common.Hash{0xab, 0xab}}).ToEncodable(), - }, - ), - }) + freshStmt := parachaintypes.SignedStatement( + parachaintypes.UncheckedSignedCompactStatement{ + ValidatorIndex: parachaintypes.ValidatorIndex(0), + Payload: *parachaintypes.NewCompactSeconded( + parachaintypes.CandidateHash{Value: common.Hash{0xab, 0xab}}).ToEncodable(), + }, + ) + + fp := fingerprint{ + validator: parachaintypes.ValidatorIndex(1), + kind: fingerprintKindCompactSeconded, + candidateHash: candidateHash, + } + stmtStore.knownStmts[fp] = &storedStatement{ + stmt: &freshStmt, + knownByBacking: false, + } relayParentState := &perRelayParentState{ statementStore: stmtStore, @@ -316,24 +319,27 @@ func TestSendPendingGridMessages(t *testing.T) { peerID := peer.ID("peer-ex") v3 := validationprotocol.ValidationVersionV3 - var f *parachaintypes.StatementFilter - stmtStoreMock := NewMockstatementStore(ctrl) - stmtStoreMock.EXPECT(). - fillStatementFilter( - parachaintypes.GroupIndex(1), - parachaintypes.CandidateHash{Value: common.Hash{0x12}}, - gomock.AssignableToTypeOf((*parachaintypes.StatementFilter)(nil)), - ). - Do(func(_ parachaintypes.GroupIndex, _ parachaintypes.CandidateHash, filter *parachaintypes.StatementFilter) { - require.NoError(t, filter.SecondedInGroup.Set(1, true)) - f = filter - }) + stmtStore := newStatementStore(gps) + + seconded, err := parachaintypes.NewBitVec([]bool{false, true, false}) + require.NoError(t, err) + + valid, err := parachaintypes.NewBitVec([]bool{false, false, false}) + require.NoError(t, err) + + stmtStore.groupStmts[groupAndCandidateHash{ + groupIdx: parachaintypes.GroupIndex(1), + candidateHash: parachaintypes.CandidateHash{Value: common.Hash{0x12}}, + }] = &groupStatements{ + seconded, + valid, + } rpState := &perRelayParentState{ localValidator: &localValidatorStore{ gridTracker: gt, }, - statementStore: stmtStoreMock, + statementStore: stmtStore, } overseer := make(chan any, 1) @@ -341,7 +347,7 @@ func TestSendPendingGridMessages(t *testing.T) { SubSystemToOverseer: overseer, } - err := sd.sendPendingGridMessages(rpHash, peerID, + err = sd.sendPendingGridMessages(rpHash, peerID, v3, peerValidatorID, gps, rpState, candidatesMock, ) @@ -350,31 +356,27 @@ func TestSendPendingGridMessages(t *testing.T) { require.Len(t, overseer, 1) outgoingMessage := <-overseer - // building the validation protocol exepected message - manifest := validationprotocol.BackedCandidateManifest{ - RelayParent: rpHash, - CandidateHash: parachaintypes.CandidateHash{Value: common.Hash{0x12}}, - GroupIndex: parachaintypes.GroupIndex(1), - ParaID: parachaintypes.ParaID(10), - ParentHeadDataHash: common.Hash(bytes.Repeat([]byte{0xbc}, 32)), - StatementKnowledge: *f, - } + svm, ok := outgoingMessage.(networkbridgemessages.SendValidationMessages) + require.True(t, ok) + require.Len(t, svm.Messages, 1) - sdm := validationprotocol.NewStatementDistributionMessage() - require.NoError(t, sdm.SetValue(manifest)) + svmVal, err := svm.Messages[0].ValidationProtocolMessage.Value() + require.NoError(t, err) - expectedMessage := validationprotocol.NewValidationProtocolVDT() - require.NoError(t, expectedMessage.SetValue( - validationprotocol.StatementDistribution{StatementDistributionMessage: sdm})) + sdm, ok := svmVal.(validationprotocol.StatementDistribution) + require.True(t, ok) - require.Equal(t, networkbridgemessages.SendValidationMessages{ - Messages: []*networkbridgemessages.SendValidationMessage{ - { - To: []peer.ID{peerID}, - ValidationProtocolMessage: expectedMessage, - }, - }, - }, outgoingMessage) + sdmVal, err := sdm.StatementDistributionMessage.Value() + require.NoError(t, err) + + bcm, ok := sdmVal.(validationprotocol.BackedCandidateManifest) + require.True(t, ok) + + require.Equal(t, rpHash, bcm.RelayParent) + require.Equal(t, parachaintypes.CandidateHash{Value: common.Hash{0x12}}, bcm.CandidateHash) + require.Equal(t, parachaintypes.GroupIndex(1), bcm.GroupIndex) + require.Equal(t, parachaintypes.ParaID(10), bcm.ParaID) + require.Equal(t, common.Hash(bytes.Repeat([]byte{0xbc}, 32)), bcm.ParentHeadDataHash) }) t.Run("pending_full_and_ack_manifest_confirmed", func(t *testing.T) { @@ -422,37 +424,38 @@ func TestSendPendingGridMessages(t *testing.T) { peerID := peer.ID("peer-ex") v3 := validationprotocol.ValidationVersionV3 - var fstKnowledge *parachaintypes.StatementFilter - var sndKnowledge *parachaintypes.StatementFilter - - stmtStoreMock := NewMockstatementStore(ctrl) - stmtStoreMock.EXPECT(). - fillStatementFilter( - parachaintypes.GroupIndex(1), - parachaintypes.CandidateHash{Value: common.Hash{0x12}}, - gomock.AssignableToTypeOf((*parachaintypes.StatementFilter)(nil)), - ). - Do(func(_ parachaintypes.GroupIndex, _ parachaintypes.CandidateHash, filter *parachaintypes.StatementFilter) { - require.NoError(t, filter.SecondedInGroup.Set(1, true)) - fstKnowledge = filter - }) - - stmtStoreMock.EXPECT(). - fillStatementFilter( - parachaintypes.GroupIndex(0), - parachaintypes.CandidateHash{Value: common.Hash{0xab}}, - gomock.AssignableToTypeOf((*parachaintypes.StatementFilter)(nil)), - ). - Do(func(_ parachaintypes.GroupIndex, _ parachaintypes.CandidateHash, filter *parachaintypes.StatementFilter) { - require.NoError(t, filter.SecondedInGroup.Set(0, true)) - sndKnowledge = filter - }) + stmtStore := newStatementStore(gps) + + seconded, err := parachaintypes.NewBitVec([]bool{false, true, false}) + require.NoError(t, err) + + valid, err := parachaintypes.NewBitVec([]bool{false, false, false}) + require.NoError(t, err) + + stmtStore.groupStmts[groupAndCandidateHash{ + groupIdx: parachaintypes.GroupIndex(1), + candidateHash: parachaintypes.CandidateHash{Value: common.Hash{0x12}}, + }] = &groupStatements{ + seconded, + valid, + } + + seconded, err = parachaintypes.NewBitVec([]bool{true, false, false}) + require.NoError(t, err) + + stmtStore.groupStmts[groupAndCandidateHash{ + groupIdx: parachaintypes.GroupIndex(0), + candidateHash: parachaintypes.CandidateHash{Value: common.Hash{0xab}}, + }] = &groupStatements{ + seconded, + valid, + } rpState := &perRelayParentState{ localValidator: &localValidatorStore{ gridTracker: gt, }, - statementStore: stmtStoreMock, + statementStore: stmtStore, } overseer := make(chan any, 1) @@ -460,7 +463,7 @@ func TestSendPendingGridMessages(t *testing.T) { SubSystemToOverseer: overseer, } - err := sd.sendPendingGridMessages(rpHash, peerID, + err = sd.sendPendingGridMessages(rpHash, peerID, v3, peerValidatorID, gps, rpState, candidatesMock, ) @@ -469,47 +472,51 @@ func TestSendPendingGridMessages(t *testing.T) { require.Equal(t, 1, len(overseer)) outgoingMessage := <-overseer - // building the validation protocol exepected message - manifest := validationprotocol.BackedCandidateManifest{ - RelayParent: rpHash, - CandidateHash: parachaintypes.CandidateHash{Value: common.Hash{0x12}}, - GroupIndex: parachaintypes.GroupIndex(1), - ParaID: parachaintypes.ParaID(10), - ParentHeadDataHash: common.Hash(bytes.Repeat([]byte{0xbc}, 32)), - StatementKnowledge: *fstKnowledge, - } + svm, ok := outgoingMessage.(networkbridgemessages.SendValidationMessages) + require.True(t, ok) + require.Len(t, svm.Messages, 2) - ack := validationprotocol.BackedCandidateKnown{ - CandidateHash: parachaintypes.CandidateHash{Value: common.Hash{0xab}}, - StatementKnowledge: *sndKnowledge, - } + firstSvm, err := svm.Messages[0].ValidationProtocolMessage.Value() + require.NoError(t, err) - manifestSDM := validationprotocol.NewStatementDistributionMessage() - require.NoError(t, manifestSDM.SetValue(manifest)) + firstSdm, ok := firstSvm.(validationprotocol.StatementDistribution) + require.True(t, ok) - ackSDM := validationprotocol.NewStatementDistributionMessage() - require.NoError(t, ackSDM.SetValue(ack)) + firstSdmVal, err := firstSdm.StatementDistributionMessage.Value() + require.NoError(t, err) - manifestVPMessage := validationprotocol.NewValidationProtocolVDT() - require.NoError(t, manifestVPMessage.SetValue( - validationprotocol.StatementDistribution{StatementDistributionMessage: manifestSDM})) + secondSvm, err := svm.Messages[1].ValidationProtocolMessage.Value() + require.NoError(t, err) - ackVPMessage := validationprotocol.NewValidationProtocolVDT() - require.NoError(t, ackVPMessage.SetValue( - validationprotocol.StatementDistribution{StatementDistributionMessage: ackSDM})) + secondSdm, ok := secondSvm.(validationprotocol.StatementDistribution) + require.True(t, ok) - require.Equal(t, networkbridgemessages.SendValidationMessages{ - Messages: []*networkbridgemessages.SendValidationMessage{ - { - To: []peer.ID{peerID}, - ValidationProtocolMessage: manifestVPMessage, - }, - { - To: []peer.ID{peerID}, - ValidationProtocolMessage: ackVPMessage, - }, - }, - }, outgoingMessage) + secondSdmVal, err := secondSdm.StatementDistributionMessage.Value() + require.NoError(t, err) + + var bcm validationprotocol.BackedCandidateManifest + var bck validationprotocol.BackedCandidateKnown + + // order in svm.Messages is random + bcm, ok = firstSdmVal.(validationprotocol.BackedCandidateManifest) + if ok { + bck, ok = secondSdmVal.(validationprotocol.BackedCandidateKnown) + require.True(t, ok) + } else { + bck, ok = firstSdmVal.(validationprotocol.BackedCandidateKnown) + require.True(t, ok) + + bcm, ok = secondSdmVal.(validationprotocol.BackedCandidateManifest) + require.True(t, ok) + } + + require.Equal(t, rpHash, bcm.RelayParent) + require.Equal(t, parachaintypes.CandidateHash{Value: common.Hash{0x12}}, bcm.CandidateHash) + require.Equal(t, parachaintypes.GroupIndex(1), bcm.GroupIndex) + require.Equal(t, parachaintypes.ParaID(10), bcm.ParaID) + require.Equal(t, common.Hash(bytes.Repeat([]byte{0xbc}, 32)), bcm.ParentHeadDataHash) + + require.Equal(t, parachaintypes.CandidateHash{Value: common.Hash{0xab}}, bck.CandidateHash) }) // TODO: include tests for postAcknowledgementStatementMessages that diff --git a/dot/parachain/statement-distribution/statement_store.go b/dot/parachain/statement-distribution/statement_store.go index 4fda57da21..1ce8fa10c8 100644 --- a/dot/parachain/statement-distribution/statement_store.go +++ b/dot/parachain/statement-distribution/statement_store.go @@ -82,7 +82,7 @@ type groupAndCandidateHash struct { // Storage for statements. Intended to be used for statements signed under // the same relay-parent. -type statements struct { +type statementStore struct { validatorMeta map[parachaintypes.ValidatorIndex]*validatorMeta // we keep statements per-group because even though only one group _should_ be @@ -92,7 +92,7 @@ type statements struct { knownStmts map[fingerprint]*storedStatement } -func newStatementStore(groups *groups) *statements { +func newStatementStore(groups *groups) *statementStore { meta := map[parachaintypes.ValidatorIndex]*validatorMeta{} for gIdx, validators := range groups.all() { @@ -105,7 +105,7 @@ func newStatementStore(groups *groups) *statements { } } - return &statements{ + return &statementStore{ validatorMeta: meta, groupStmts: make(map[groupAndCandidateHash]*groupStatements), knownStmts: make(map[fingerprint]*storedStatement), @@ -114,11 +114,11 @@ func newStatementStore(groups *groups) *statements { // Insert adds a statement. Returns true if it was not known already, false if it was. // Ignores statements by unknown validators and returns an error. -func (s *statements) insert( +func (s *statementStore) insert( groups *groups, statement *parachaintypes.SignedStatement, origin statementOrigin, -) (bool, error) { //nolint:unparam +) (bool, error) { validatorIndex := statement.ValidatorIndex validatorMeta, ok := s.validatorMeta[validatorIndex] if !ok { @@ -195,7 +195,7 @@ func (s *statements) insert( } // fillStatementFilter fills a StatementFilter with all statements already known for the given group and candidate hash. -func (s *statements) fillStatementFilter( //nolint:unused +func (s *statementStore) fillStatementFilter( groupIndex parachaintypes.GroupIndex, candidateHash parachaintypes.CandidateHash, statementFilter *parachaintypes.StatementFilter, @@ -212,7 +212,7 @@ func (s *statements) fillStatementFilter( //nolint:unused // groupStatements returns all stored signed statements by the group conforming to the given filter. // Seconded statements are provided first. -func (s *statements) groupStatements( //nolint:unused +func (s *statementStore) groupStatements( groups *groups, groupIndex parachaintypes.GroupIndex, candidateHash parachaintypes.CandidateHash, @@ -256,7 +256,7 @@ func (s *statements) groupStatements( //nolint:unused } // validatorStatement returns the full statement of this kind issued by this validator, if it is known. -func (s *statements) validatorStatement( //nolint:unused +func (s *statementStore) validatorStatement( validatorIndex parachaintypes.ValidatorIndex, statement parachaintypes.CompactStatement, ) (*parachaintypes.SignedStatement, bool) { @@ -277,7 +277,7 @@ func (s *statements) validatorStatement( //nolint:unused // freshStatementsForBacking returns all statements for the given candidate hash // and validators that are not yet known by backing. // Seconded statements are provided before Valid statements. -func (s *statements) freshStatementsForBacking( +func (s *statementStore) freshStatementsForBacking( validators []parachaintypes.ValidatorIndex, candidateHash parachaintypes.CandidateHash, ) []*parachaintypes.SignedStatement { @@ -303,7 +303,7 @@ func (s *statements) freshStatementsForBacking( } // secondedCount returns the amount of known Seconded statements by the given validator index. -func (s *statements) secondedCount( //nolint:unused +func (s *statementStore) secondedCount( //nolint:unused validatorIndex parachaintypes.ValidatorIndex, ) uint { if meta, ok := s.validatorMeta[validatorIndex]; ok { @@ -313,7 +313,7 @@ func (s *statements) secondedCount( //nolint:unused } // noteKnownByBacking marks a statement as known by the backing subsystem. -func (s *statements) noteKnownByBacking( //nolint:unused +func (s *statementStore) noteKnownByBacking( validatorIndex parachaintypes.ValidatorIndex, statement parachaintypes.CompactStatement, ) { From 73c0826ab46cf622ee772e13e41250e5f4a8dd1c Mon Sep 17 00:00:00 2001 From: Haiko Schol Date: Tue, 24 Jun 2025 16:58:13 +0700 Subject: [PATCH 03/12] add tests for handleIncomingManifestCommon() --- .../statement_distribution.go | 20 +- .../statement_distribution_test.go | 195 ++++++++++++++++++ 2 files changed, 205 insertions(+), 10 deletions(-) diff --git a/dot/parachain/statement-distribution/statement_distribution.go b/dot/parachain/statement-distribution/statement_distribution.go index 76673f3112..9a50784c04 100644 --- a/dot/parachain/statement-distribution/statement_distribution.go +++ b/dot/parachain/statement-distribution/statement_distribution.go @@ -28,42 +28,42 @@ var ( ) var ( - costConflictingManifest = parachainutil.UnifiedReputationChange{ //nolint:unused + costConflictingManifest = parachainutil.UnifiedReputationChange{ Type: parachainutil.CostMajor, Reason: "Manifest conflicts with previous", } - costMalformedManifest = parachainutil.UnifiedReputationChange{ //nolint:unused + costMalformedManifest = parachainutil.UnifiedReputationChange{ Type: parachainutil.CostMajor, Reason: "Manifest is malformed", } - costInsufficientManifest = parachainutil.UnifiedReputationChange{ //nolint:unused + costInsufficientManifest = parachainutil.UnifiedReputationChange{ Type: parachainutil.CostMajor, Reason: "Manifest statements insufficient to back candidate", } - costUnexpectedManifestDisallowed = parachainutil.UnifiedReputationChange{ //nolint:unused + costUnexpectedManifestDisallowed = parachainutil.UnifiedReputationChange{ Type: parachainutil.CostMinor, Reason: "Unexpected Manifest, Peer Disallowed", } - costUnexpectedManifestMissingKnowledge = parachainutil.UnifiedReputationChange{ //nolint:unused + costUnexpectedManifestMissingKnowledge = parachainutil.UnifiedReputationChange{ Type: parachainutil.CostMinor, Reason: "Unexpected Manifest, missing knowledge for relay parent", } - costUnexpectedManifestPeerUnknown = parachainutil.UnifiedReputationChange{ //nolint:unused + costUnexpectedManifestPeerUnknown = parachainutil.UnifiedReputationChange{ Type: parachainutil.CostMinor, Reason: "Unexpected Manifest, Peer Unknown", } - costExcessiveSeconded = parachainutil.UnifiedReputationChange{ //nolint:unused + costExcessiveSeconded = parachainutil.UnifiedReputationChange{ Type: parachainutil.CostMinor, Reason: "Sent Excessive `Seconded` Statements", } - costInaccurateAdvertisement = parachainutil.UnifiedReputationChange{ //nolint:unused + costInaccurateAdvertisement = parachainutil.UnifiedReputationChange{ Type: parachainutil.CostMajor, Reason: "Peer advertised a candidate inaccurately", } @@ -335,7 +335,7 @@ func (s *StatementDistribution) sendBackingFreshStatements( return nil } -type manifestImportSuccess struct { //nolint:unused +type manifestImportSuccess struct { relayParentState perRelayParentState perSession perSessionState acknowledge bool @@ -346,7 +346,7 @@ type manifestImportSuccess struct { //nolint:unused // // Basic sanity checks around data, importing the manifest into the grid tracker, finding the // sending peer's validator index, reporting the peer for any misbehaviour, etc. -func (s *StatementDistribution) handleIncomingManifestCommon( //nolint:unused +func (s *StatementDistribution) handleIncomingManifestCommon( peer peer.ID, peers map[peer.ID]peerState, perRelayParent map[common.Hash]perRelayParentState, diff --git a/dot/parachain/statement-distribution/statement_distribution_test.go b/dot/parachain/statement-distribution/statement_distribution_test.go index 16c18571a4..4b2ab3fb84 100644 --- a/dot/parachain/statement-distribution/statement_distribution_test.go +++ b/dot/parachain/statement-distribution/statement_distribution_test.go @@ -10,7 +10,9 @@ import ( "github.com/ChainSafe/gossamer/dot/parachain/backing" networkbridgemessages "github.com/ChainSafe/gossamer/dot/parachain/network-bridge/messages" parachaintypes "github.com/ChainSafe/gossamer/dot/parachain/types" + "github.com/ChainSafe/gossamer/dot/parachain/util" validationprotocol "github.com/ChainSafe/gossamer/dot/parachain/validation-protocol" + "github.com/ChainSafe/gossamer/dot/peerset" "github.com/ChainSafe/gossamer/lib/common" "github.com/libp2p/go-libp2p/core/peer" "github.com/stretchr/testify/require" @@ -522,3 +524,196 @@ func TestSendPendingGridMessages(t *testing.T) { // TODO: include tests for postAcknowledgementStatementMessages that // integrates grid tracker pending statements and statement store } + +func TestHandleIncomingManifestCommon(t *testing.T) { + pierre := peer.ID("pierre") + candidateHash := parachaintypes.CandidateHash{Value: common.Hash{0x12}} + relayParent := common.Hash{0xab} + paraID := parachaintypes.ParaID(10) + + t.Run("peer_not_connected", func(t *testing.T) { + sd := StatementDistribution{} + + importSuccess := sd.handleIncomingManifestCommon( + pierre, + make(map[peer.ID]peerState), + make(map[common.Hash]perRelayParentState), + make(map[parachaintypes.SessionIndex]perSessionState), + candidates{}, + candidateHash, + relayParent, + paraID, + manifestSummary{}, + full, + nil, + ) + + require.Nil(t, importSuccess) + }) + + t.Run("not_in_relay_parent_state", func(t *testing.T) { + peers := map[peer.ID]peerState{ + pierre: {}, + } + + repAgg := util.NewReputationAggregator(func(rep util.UnifiedReputationChange) bool { return false }) + + overseer := make(chan any, 1) + sd := StatementDistribution{ + SubSystemToOverseer: overseer, + } + + importSuccess := sd.handleIncomingManifestCommon( + pierre, + peers, + make(map[common.Hash]perRelayParentState), + make(map[parachaintypes.SessionIndex]perSessionState), + candidates{}, + candidateHash, + relayParent, + paraID, + manifestSummary{}, + full, + repAgg, + ) + + require.Nil(t, importSuccess) + + repAgg.Send(overseer) + require.Len(t, overseer, 1) + + msg := <-overseer + + repMsg, ok := msg.(networkbridgemessages.ReportPeer) + require.True(t, ok) + require.Equal(t, pierre, repMsg.PeerID) + + require.Equal( + t, + peerset.Reputation(costUnexpectedManifestMissingKnowledge.CostOrBenefit()), + repMsg.ReputationChange.Value, + ) + + require.Equal(t, costUnexpectedManifestMissingKnowledge.Reason, repMsg.ReputationChange.Reason) + }) + + t.Run("happy_path", func(t *testing.T) { + groupIndex := parachaintypes.GroupIndex(0) + + manifestSummary := manifestSummary{ + claimedGroupIndex: groupIndex, + claimedParentHash: common.Hash{0x03}, + } + + seconded, err := parachaintypes.NewBitVec([]bool{true, false, false}) + require.NoError(t, err) + valid, err := parachaintypes.NewBitVec([]bool{false, true, false}) + require.NoError(t, err) + + manifestSummary.statementKnowledge = parachaintypes.StatementFilter{ + SecondedInGroup: seconded, + ValidatedInGroup: valid, + } + + authKey := [32]byte{0x04} + peerStateEntry := peerState{ + discoveryIds: &map[parachaintypes.AuthorityDiscoveryID]struct{}{authKey: {}}, + } + peers := map[peer.ID]peerState{ + pierre: peerStateEntry, + } + + gt := newGridTracker() + localValidator := &localValidatorStore{ + gridTracker: gt, + } + + initGroups := [][]parachaintypes.ValidatorIndex{ + {0, 1, 2}, + {3, 4, 5}, + } + groups := newGroups(initGroups, 2) + + relayParentState := perRelayParentState{ + session: 1, + localValidator: localValidator, + groupsPerPara: map[parachaintypes.ParaID][]parachaintypes.GroupIndex{paraID: {groupIndex}}, + assignmentsPerGroup: map[parachaintypes.GroupIndex][]parachaintypes.ParaID{ + groupIndex: {paraID}, + }, + } + perRelayParent := map[common.Hash]perRelayParentState{ + relayParent: relayParentState, + } + + validatorIndex := parachaintypes.ValidatorIndex(1) + gridTopology := &sessionTopologyView{ + groupViews: map[parachaintypes.GroupIndex]groupSubView{ + groupIndex: { + sending: make(map[parachaintypes.ValidatorIndex]struct{}), + receiving: map[parachaintypes.ValidatorIndex]struct{}{validatorIndex: {}}, + }, + }, + } + + sessionInfo := parachaintypes.SessionInfo{ + DiscoveryKeys: []parachaintypes.AuthorityDiscoveryID{ + {0x0a}, // Validator 0 + authKey, // Validator 1 - matches our peer's authority key + {0x0c}, // Validator 2 + }, + } + + sessionState := perSessionState{ + gridView: gridTopology, + groups: groups, + sessionInfo: sessionInfo, + localValidator: &validatorIndex, + } + perSession := map[parachaintypes.SessionIndex]perSessionState{ + 1: sessionState, + } + + candidates := candidates{ + candidates: make(map[parachaintypes.CandidateHash]candidateState), + byParent: make(map[hashAndParaID]map[parachaintypes.CandidateHash]struct{}), + } + + reputation := util.NewReputationAggregator(func(rep util.UnifiedReputationChange) bool { return false }) + + overseerCh := make(chan any, 1) + sd := &StatementDistribution{ + SubSystemToOverseer: overseerCh, + } + + importSuccess := sd.handleIncomingManifestCommon( + pierre, + peers, + perRelayParent, + perSession, + candidates, + candidateHash, + relayParent, + paraID, + manifestSummary, + full, + reputation, + ) + + require.NotNil(t, importSuccess) + require.Equal(t, relayParentState, importSuccess.relayParentState) + require.Equal(t, sessionState, importSuccess.perSession) + require.Equal(t, validatorIndex, importSuccess.senderIndex) + + // Verify the candidate was added as unconfirmed in candidate store and grid tracker + + _, ok := candidates.candidates[candidateHash] + require.True(t, ok) + + validatorAndGroups, ok := gt.unconfirmed[candidateHash] + require.True(t, ok) + require.Len(t, validatorAndGroups, 1) + require.Equal(t, validatorIndex, validatorAndGroups[0].validator) + require.Equal(t, groupIndex, validatorAndGroups[0].group) + }) +} From d1ed3016f35d44cabf60a5d4238426f173e231a4 Mon Sep 17 00:00:00 2001 From: Haiko Schol Date: Wed, 25 Jun 2025 17:25:55 +0700 Subject: [PATCH 04/12] feat(statement-distribution): Implement handleIncomingManifest() --- .../statement-distribution/state_v2.go | 5 +- .../statement_distribution.go | 90 ++++++++++++++++++- 2 files changed, 92 insertions(+), 3 deletions(-) diff --git a/dot/parachain/statement-distribution/state_v2.go b/dot/parachain/statement-distribution/state_v2.go index e156b5edb2..b9e47523e6 100644 --- a/dot/parachain/statement-distribution/state_v2.go +++ b/dot/parachain/statement-distribution/state_v2.go @@ -12,6 +12,7 @@ import ( validationprotocol "github.com/ChainSafe/gossamer/dot/parachain/validation-protocol" "github.com/ChainSafe/gossamer/lib/common" "github.com/ChainSafe/gossamer/lib/keystore" + "github.com/libp2p/go-libp2p/core/peer" ) type candidatesStore interface { @@ -212,11 +213,11 @@ func (p *peerState) iterKnownDiscoveryIDs() []parachaintypes.AuthorityDiscoveryI type v2State struct { implicitView parachainutil.ImplicitView - candidates any // TODO #4718: Create Candidates Tracker + candidates candidates perRelayParent map[common.Hash]perRelayParentState perSession map[parachaintypes.SessionIndex]perSessionState unusedTopologies map[parachaintypes.SessionIndex]events.NewGossipTopology - peers map[string]peerState + peers map[peer.ID]peerState keystore keystore.Keystore authorities map[parachaintypes.AuthorityDiscoveryID]string requestManager any // TODO: #4377 diff --git a/dot/parachain/statement-distribution/statement_distribution.go b/dot/parachain/statement-distribution/statement_distribution.go index 9a50784c04..dd95dca77b 100644 --- a/dot/parachain/statement-distribution/statement_distribution.go +++ b/dot/parachain/statement-distribution/statement_distribution.go @@ -501,6 +501,94 @@ func (s *StatementDistribution) handleIncomingManifestCommon( } } +func (s *StatementDistribution) handleIncomingManifest( //nolint:unused + state v2State, + peer peer.ID, + manifest validationprotocol.BackedCandidateManifest, + reputation *parachainutil.ReputationAggregator, +) { + logger.Debugf("Received incoming manifest peer=%s candidateHash=%s", peer.String(), manifest.CandidateHash.String()) + + importSuccess := s.handleIncomingManifestCommon( + peer, + state.peers, + state.perRelayParent, + state.perSession, + state.candidates, + manifest.CandidateHash, + manifest.RelayParent, + manifest.ParaID, + manifestSummary{ + claimedParentHash: manifest.ParentHeadDataHash, + claimedGroupIndex: manifest.GroupIndex, + statementKnowledge: manifest.StatementKnowledge, + }, + full, + reputation, + ) + + if importSuccess == nil { + return + } + + rpState := importSuccess.relayParentState + perSession := importSuccess.perSession + senderIndex := importSuccess.senderIndex + + if importSuccess.acknowledge { + // 4. if already known within grid (confirmed & backed), acknowledge candidate + logger.Tracef("Known candidate - acknowledging manifest candidateHash=%s", manifest.CandidateHash.String()) + + group := perSession.groups.get(manifest.GroupIndex) + if group == nil { + return // sanity + } + + localKnowledge, err := localKnowledgeFilter( + len(group), + manifest.GroupIndex, + manifest.CandidateHash, + rpState.statementStore, + ) + if err != nil { + logger.Errorf("building local knowledge filter: %s", err) + return + } + + // Assume the latest stable version, if we don't have info about peer version. + validationVersion := validationprotocol.ValidationVersionV3 + peerState, ok := state.peers[peer] + if ok { + validationVersion = peerState.protocolVersion + } + + messages /* statementsCount */, _ := acknowledgementAndStatementMessages( + peer, + validationVersion, + senderIndex, + perSession.groups, + &rpState, + manifest.RelayParent, + manifest.GroupIndex, + manifest.CandidateHash, + *localKnowledge, + ) + + if len(messages) != 0 { + s.SubSystemToOverseer <- networkbridgemessages.SendValidationMessages{Messages: messages} + // TODO metrics.on_statements_distributed(statements_count) + } + } else if !state.candidates.isConfirmed(manifest.CandidateHash) { + // 5. if unconfirmed, add request entry + logger.Tracef("Unknown candidate - requesting candidateHash=%s", manifest.CandidateHash.String()) + + // TODO #4377 + //state.requestManager. + // getOrInsert(manifest.RelayParent, manifest.CandidateHash, manifest.GroupIndex). + // addPeer(peer) + } +} + // compareAndConvert ensure the original compact statement matches // the same encoding as the converted statement and transforms the // converted statement into a SignedFullStatementWithPVD with the @@ -566,7 +654,7 @@ func acknowledgementAndStatementMessages( groupIndex parachaintypes.GroupIndex, candidateHash parachaintypes.CandidateHash, localKnowledge parachaintypes.StatementFilter, -) ([]*networkbridgemessages.SendValidationMessage, int) { +) ([]*networkbridgemessages.SendValidationMessage, int) { //nolint:unparam if rpState.localValidator == nil { return []*networkbridgemessages.SendValidationMessage{}, 0 } From d4d9c7dcf9eb55f35449de5ddbff0f9bf37e4848 Mon Sep 17 00:00:00 2001 From: Haiko Schol Date: Wed, 25 Jun 2025 18:05:26 +0700 Subject: [PATCH 05/12] feat(statement-distribution): Implement handleIncomingAcknowledgement() --- .../statement_distribution.go | 88 +++++++++++++++++++ 1 file changed, 88 insertions(+) diff --git a/dot/parachain/statement-distribution/statement_distribution.go b/dot/parachain/statement-distribution/statement_distribution.go index dd95dca77b..06b7283963 100644 --- a/dot/parachain/statement-distribution/statement_distribution.go +++ b/dot/parachain/statement-distribution/statement_distribution.go @@ -58,6 +58,11 @@ var ( Reason: "Unexpected Manifest, Peer Unknown", } + costUnexpectedAcknowledgementUnknownCandidate = parachainutil.UnifiedReputationChange{ //nolint:unused + Type: parachainutil.CostMinor, + Reason: "Unexpected acknowledgement, unknown candidate", + } + costExcessiveSeconded = parachainutil.UnifiedReputationChange{ Type: parachainutil.CostMinor, Reason: "Sent Excessive `Seconded` Statements", @@ -589,6 +594,89 @@ func (s *StatementDistribution) handleIncomingManifest( //nolint:unused } } +func (s *StatementDistribution) handleIncomingAcknowledgement( //nolint:unused + state v2State, + peer peer.ID, + ack validationprotocol.BackedCandidateKnown, + reputation *parachainutil.ReputationAggregator, +) { + // The key difference between acknowledgments and full manifests is that only + // the candidate hash is included alongside the bitfields, so the candidate + // must be confirmed for us to even process it. + + logger.Debugf( + "Received incoming acknowledgement peer=%s candidateHash=%s", + peer.String(), + ack.CandidateHash.String(), + ) + + confirmed, ok := state.candidates.getConfirmed(ack.CandidateHash) + if !ok { + reputation.Modify(s.SubSystemToOverseer, peer, costUnexpectedAcknowledgementUnknownCandidate) + return + } + + relayParent := confirmed.receipt.Descriptor.RelayParent + parentHeadDataHash := confirmed.parentHash + groupIndex := confirmed.assignedGroup + paraID := confirmed.receipt.Descriptor.ParaID + + importSuccess := s.handleIncomingManifestCommon( + peer, + state.peers, + state.perRelayParent, + state.perSession, + state.candidates, + ack.CandidateHash, + relayParent, + paraID, + manifestSummary{ + claimedParentHash: parentHeadDataHash, + claimedGroupIndex: groupIndex, + statementKnowledge: ack.StatementKnowledge, + }, + acknowledgement, + reputation, + ) + + if importSuccess == nil { + return + } + + rpState := importSuccess.relayParentState + perSession := importSuccess.perSession + senderIndex := importSuccess.senderIndex + + localValidator := rpState.localValidator + if localValidator == nil { + return + } + + // Assume the latest stable version, if we don't have info about peer version. + validationVersion := validationprotocol.ValidationVersionV3 + peerState, ok := state.peers[peer] + if ok { + validationVersion = peerState.protocolVersion + } + + messages := postAcknowledgementStatementMessages( + senderIndex, + relayParent, + localValidator.gridTracker, + rpState.statementStore, + perSession.groups, + groupIndex, + ack.CandidateHash, + peer, + validationVersion, + ) + + if len(messages) != 0 { + s.SubSystemToOverseer <- networkbridgemessages.SendValidationMessages{Messages: messages} + // TODO metrics.on_statements_distributed(len(messages)) + } +} + // compareAndConvert ensure the original compact statement matches // the same encoding as the converted statement and transforms the // converted statement into a SignedFullStatementWithPVD with the From c0ed0506508d4bee4b5e6ebe3140bff1bb7bfaac Mon Sep 17 00:00:00 2001 From: Haiko Schol Date: Thu, 26 Jun 2025 17:36:11 +0700 Subject: [PATCH 06/12] add test for StatementDistribution.handleIncomingManifest() --- .../statement_distribution.go | 2 +- .../statement_distribution_test.go | 128 ++++++++++++++++++ 2 files changed, 129 insertions(+), 1 deletion(-) diff --git a/dot/parachain/statement-distribution/statement_distribution.go b/dot/parachain/statement-distribution/statement_distribution.go index 06b7283963..44f6585083 100644 --- a/dot/parachain/statement-distribution/statement_distribution.go +++ b/dot/parachain/statement-distribution/statement_distribution.go @@ -506,7 +506,7 @@ func (s *StatementDistribution) handleIncomingManifestCommon( } } -func (s *StatementDistribution) handleIncomingManifest( //nolint:unused +func (s *StatementDistribution) handleIncomingManifest( state v2State, peer peer.ID, manifest validationprotocol.BackedCandidateManifest, diff --git a/dot/parachain/statement-distribution/statement_distribution_test.go b/dot/parachain/statement-distribution/statement_distribution_test.go index 4b2ab3fb84..1b3b2bab0d 100644 --- a/dot/parachain/statement-distribution/statement_distribution_test.go +++ b/dot/parachain/statement-distribution/statement_distribution_test.go @@ -717,3 +717,131 @@ func TestHandleIncomingManifestCommon(t *testing.T) { require.Equal(t, groupIndex, validatorAndGroups[0].group) }) } + +func TestHandleIncomingManifest(t *testing.T) { + overseerCh := make(chan any, 10) + + sd := &StatementDistribution{ + SubSystemToOverseer: overseerCh, + } + + peerID := peer.ID("pierre") + groupIndex := parachaintypes.GroupIndex(0) + validatorIndex := parachaintypes.ValidatorIndex(1) + candidateHash := parachaintypes.CandidateHash{Value: common.Hash{1, 2, 3, 4}} + relayParent := common.Hash{5, 6, 7, 8} + paraID := parachaintypes.ParaID(42) + authKey := [32]byte{0x04} + + seconded, err := parachaintypes.NewBitVec([]bool{true, false, false}) + require.NoError(t, err) + valid, err := parachaintypes.NewBitVec([]bool{false, true, false}) + require.NoError(t, err) + + manifest := validationprotocol.BackedCandidateManifest{ + RelayParent: relayParent, + CandidateHash: candidateHash, + GroupIndex: groupIndex, + ParaID: paraID, + ParentHeadDataHash: common.Hash{9, 10, 11, 12}, + StatementKnowledge: parachaintypes.StatementFilter{ + SecondedInGroup: seconded, + ValidatedInGroup: valid, + }, + } + + groups := newGroups( + [][]parachaintypes.ValidatorIndex{ + {0, 1, 2}, + {3, 4, 5}, + }, + 2, + ) + + gt := newGridTracker() + gt.confirmedBacked[candidateHash] = knownBackedCandidate{ + mutualKnowledge: make(map[parachaintypes.ValidatorIndex]mutualKnowledge), + } + + mockState := v2State{ + peers: map[peer.ID]peerState{ + peerID: { + protocolVersion: validationprotocol.ValidationVersionV3, + discoveryIds: &map[parachaintypes.AuthorityDiscoveryID]struct{}{authKey: {}}, + }, + }, + perRelayParent: map[common.Hash]perRelayParentState{ + relayParent: { + session: 1, + localValidator: &localValidatorStore{ + gridTracker: gt, + }, + groupsPerPara: map[parachaintypes.ParaID][]parachaintypes.GroupIndex{ + paraID: {groupIndex}, + }, + assignmentsPerGroup: map[parachaintypes.GroupIndex][]parachaintypes.ParaID{ + groupIndex: {paraID}, + }, + statementStore: newStatementStore(groups), + }, + }, + perSession: map[parachaintypes.SessionIndex]perSessionState{ + 1: { + localValidator: &validatorIndex, + gridView: &sessionTopologyView{ + groupViews: map[parachaintypes.GroupIndex]groupSubView{ + groupIndex: { + sending: make(map[parachaintypes.ValidatorIndex]struct{}), + receiving: map[parachaintypes.ValidatorIndex]struct{}{validatorIndex: {}}, + }, + }, + }, + groups: groups, + sessionInfo: parachaintypes.SessionInfo{ + DiscoveryKeys: []parachaintypes.AuthorityDiscoveryID{ + {0x0a}, // Validator 0 + authKey, // Validator 1 - matches our peer's authority key + {0x0c}, // Validator 2 + }, + }, + }, + }, + candidates: candidates{ + candidates: map[parachaintypes.CandidateHash]candidateState{ + candidateHash: &confirmedCandidate{ + receipt: parachaintypes.CommittedCandidateReceiptV2{ + Descriptor: parachaintypes.CandidateDescriptorV2{ + RelayParent: relayParent, + ParaID: paraID, + }, + }, + assignedGroup: groupIndex, + parentHash: common.Hash{9, 10, 11, 12}, + importableUnder: map[common.Hash]struct{}{ + {13}: {}, + }, + }, + }, + }, + } + + reputation := util.NewReputationAggregator(func(util.UnifiedReputationChange) bool { return false }) + + sd.handleIncomingManifest(mockState, peerID, manifest, reputation) + + require.Len(t, overseerCh, 1) + + var message any + select { + case message = <-overseerCh: + default: + t.Fatal("No message was sent to the overseer") + } + + sendMessages, ok := message.(networkbridgemessages.SendValidationMessages) + require.True(t, ok) + + require.NotEmpty(t, sendMessages.Messages) + require.Contains(t, sendMessages.Messages[0].To, peerID) + require.NotNil(t, sendMessages.Messages[0].ValidationProtocolMessage) +} From d5d24c1d52e305de03d098e88c9c12282976fef2 Mon Sep 17 00:00:00 2001 From: Haiko Schol Date: Thu, 26 Jun 2025 19:58:51 +0700 Subject: [PATCH 07/12] parallelify the tests --- .../statement_distribution_test.go | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/dot/parachain/statement-distribution/statement_distribution_test.go b/dot/parachain/statement-distribution/statement_distribution_test.go index 4b2ab3fb84..8c708ed72e 100644 --- a/dot/parachain/statement-distribution/statement_distribution_test.go +++ b/dot/parachain/statement-distribution/statement_distribution_test.go @@ -20,7 +20,11 @@ import ( ) func TestSendBackingFreshStatements(t *testing.T) { + t.Parallel() + t.Run("should_send_3_statements_to_backing", func(t *testing.T) { + t.Parallel() + relayParent := common.Hash{0x01} groupIndex := parachaintypes.GroupIndex(0) @@ -151,6 +155,8 @@ func TestSendBackingFreshStatements(t *testing.T) { }) t.Run("should_fail_when_confirmed_candidate_does_not_match", func(t *testing.T) { + t.Parallel() + relayParent := common.Hash{0x01} groupIndex := parachaintypes.GroupIndex(0) @@ -206,7 +212,11 @@ func TestSendBackingFreshStatements(t *testing.T) { } func TestSendPendingGridMessages(t *testing.T) { + t.Parallel() + t.Run("nil_local_validator", func(t *testing.T) { + t.Parallel() + rpHash := common.Hash{0xab} peerID := peer.ID("peer-ex") validationVersion := validationprotocol.ValidationVersionV3 @@ -225,6 +235,8 @@ func TestSendPendingGridMessages(t *testing.T) { }) t.Run("empty_pending_manifests_for_validator_id", func(t *testing.T) { + t.Parallel() + gt := newGridTracker() rpHash := common.Hash{0xab} @@ -247,6 +259,8 @@ func TestSendPendingGridMessages(t *testing.T) { }) t.Run("pending_stmts_but_none_confirmed", func(t *testing.T) { + t.Parallel() + ctrl := gomock.NewController(t) peerValidatorID := parachaintypes.ValidatorIndex(0) @@ -285,6 +299,8 @@ func TestSendPendingGridMessages(t *testing.T) { }) t.Run("pending_full_manifest_confirmed", func(t *testing.T) { + t.Parallel() + ctrl := gomock.NewController(t) peerValidatorID := parachaintypes.ValidatorIndex(4) @@ -382,6 +398,8 @@ func TestSendPendingGridMessages(t *testing.T) { }) t.Run("pending_full_and_ack_manifest_confirmed", func(t *testing.T) { + t.Parallel() + ctrl := gomock.NewController(t) peerValidatorID := parachaintypes.ValidatorIndex(4) @@ -526,12 +544,16 @@ func TestSendPendingGridMessages(t *testing.T) { } func TestHandleIncomingManifestCommon(t *testing.T) { + t.Parallel() + pierre := peer.ID("pierre") candidateHash := parachaintypes.CandidateHash{Value: common.Hash{0x12}} relayParent := common.Hash{0xab} paraID := parachaintypes.ParaID(10) t.Run("peer_not_connected", func(t *testing.T) { + t.Parallel() + sd := StatementDistribution{} importSuccess := sd.handleIncomingManifestCommon( @@ -552,6 +574,8 @@ func TestHandleIncomingManifestCommon(t *testing.T) { }) t.Run("not_in_relay_parent_state", func(t *testing.T) { + t.Parallel() + peers := map[peer.ID]peerState{ pierre: {}, } @@ -598,6 +622,8 @@ func TestHandleIncomingManifestCommon(t *testing.T) { }) t.Run("happy_path", func(t *testing.T) { + t.Parallel() + groupIndex := parachaintypes.GroupIndex(0) manifestSummary := manifestSummary{ From f244c27045891bfa70c4f57d649561983eee6933 Mon Sep 17 00:00:00 2001 From: Haiko Schol Date: Thu, 26 Jun 2025 20:10:23 +0700 Subject: [PATCH 08/12] review feedback --- dot/parachain/statement-distribution/state_v2.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dot/parachain/statement-distribution/state_v2.go b/dot/parachain/statement-distribution/state_v2.go index e156b5edb2..8a700bcd47 100644 --- a/dot/parachain/statement-distribution/state_v2.go +++ b/dot/parachain/statement-distribution/state_v2.go @@ -114,7 +114,7 @@ func (s *perSessionState) supplyTopology(topology *grid.SessionGridTopology, loc localIdx, ) if err != nil { - logger.Errorf("Failed to build sessionTopologyView for validator index %d: %s", localIdx, err) + logger.Errorf("building sessionTopologyView for validator index %d: %s", localIdx, err) return } @@ -128,7 +128,7 @@ func (s *perSessionState) supplyTopology(topology *grid.SessionGridTopology, loc // isNotValidator returns `true` if local is neither active or inactive validator node. // -// `false` is also returned if session topology is not known yet. +// returns `false` if session topology is not known yet. func (s *perSessionState) isNotValidator() bool { return s.gridView != nil && s.localValidator == nil } From ee51a897e843c292bf95e898cc28acd080d3c978 Mon Sep 17 00:00:00 2001 From: Haiko Schol Date: Thu, 26 Jun 2025 20:12:57 +0700 Subject: [PATCH 09/12] parallel test --- .../statement-distribution/statement_distribution_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/dot/parachain/statement-distribution/statement_distribution_test.go b/dot/parachain/statement-distribution/statement_distribution_test.go index 43e773a852..f28330f974 100644 --- a/dot/parachain/statement-distribution/statement_distribution_test.go +++ b/dot/parachain/statement-distribution/statement_distribution_test.go @@ -745,6 +745,8 @@ func TestHandleIncomingManifestCommon(t *testing.T) { } func TestHandleIncomingManifest(t *testing.T) { + t.Parallel() + overseerCh := make(chan any, 10) sd := &StatementDistribution{ From 2368d17669edbbd68e8680237d0802fbd2c5896c Mon Sep 17 00:00:00 2001 From: Haiko Schol Date: Mon, 30 Jun 2025 20:16:50 +0700 Subject: [PATCH 10/12] feat(statement-distribution): Implement requestManager --- .../messages/attested_candidate.go | 91 +++ .../messages/request_response_protocols.go | 3 + .../statement-distribution/request_manager.go | 549 ++++++++++++++++++ .../request_manager_test.go | 118 ++++ .../statement-distribution/state_v2.go | 6 +- .../statement_distribution.go | 16 +- 6 files changed, 776 insertions(+), 7 deletions(-) create mode 100644 dot/parachain/network-bridge/messages/attested_candidate.go create mode 100644 dot/parachain/statement-distribution/request_manager.go create mode 100644 dot/parachain/statement-distribution/request_manager_test.go diff --git a/dot/parachain/network-bridge/messages/attested_candidate.go b/dot/parachain/network-bridge/messages/attested_candidate.go new file mode 100644 index 0000000000..d0c7821709 --- /dev/null +++ b/dot/parachain/network-bridge/messages/attested_candidate.go @@ -0,0 +1,91 @@ +// Copyright 2025 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + +package messages + +import ( + "fmt" + "time" + + "github.com/ChainSafe/gossamer/dot/network" + parachaintypes "github.com/ChainSafe/gossamer/dot/parachain/types" + "github.com/ChainSafe/gossamer/pkg/scale" +) + +const ( + // AttestedCandidateTimeout We want attested candidate requests to time out relatively fast, + // because slow requests will bottleneck the backing system. Ideally, we'd have + // an adaptive timeout based on the candidate size, because there will be a lot of variance + // in candidate sizes: candidates with no code and no messages vs candidates with code + // and messages. + // + // We supply leniency because there are often large candidates and asynchronous + // backing allows them to be included over a longer window of time. Exponential back-off + // up to a maximum of 10 seconds would be ideal, but isn't supported by the + // infrastructure here yet: see https://github.com/paritytech/polkadot/issues/6009 + AttestedCandidateTimeout = time.Millisecond * 2500 + + // MaxParallelAttestedCandidateRequests We don't want a slow peer to slow down all the others, + // at the same time we want to get out the data quickly in full to at least some peers + // (as this will reduce load on us as they then can start serving the data). So this value is a tradeoff. + // 5 seems to be sensible. So we would need to have 5 slow nodes connected, to delay transfer for others + // by [AttestedCandidateTimeout]. + MaxParallelAttestedCandidateRequests = 5 +) + +// AttestedCandidateRequest is a request for a candidate with statements. +type AttestedCandidateRequest struct { + // Hash of the candidate we want to request. + CandidateHash parachaintypes.CandidateHash + // Statement filter with 'OR' semantics, indicating which validators + // not to send statements for. + // + // The filter must have exactly the minimum size required to + // fit all validators from the backing group. + // + // The response may not contain any statements masked out by this mask. + Mask parachaintypes.StatementFilter +} + +// Encode returns the SCALE encoding of the AttestedCandidateRequest +func (r *AttestedCandidateRequest) Encode() ([]byte, error) { + return scale.Marshal(*r) +} + +// Decode returns the SCALE decoding of the AttestedCandidateRequest +func (r *AttestedCandidateRequest) Decode(in []byte) (err error) { + return scale.Unmarshal(in, r) +} + +func (r *AttestedCandidateRequest) Response() network.ResponseMessage { + return &AttestedCandidateResponse{} +} + +func (r *AttestedCandidateRequest) Protocol() ReqProtocolName { + return AttestedCandidateV2 +} + +// AttestedCandidateResponse is the response to an [AttestedCandidateRequest]. +type AttestedCandidateResponse struct { + CandidateReceipt parachaintypes.CommittedCandidateReceiptV2 + PersistedValidationData parachaintypes.PersistedValidationData + Statements []any // TODO: implement parachaintypes.UncheckedSignedStatement +} + +// Encode returns the SCALE encoding of the AttestedCandidateResponse +func (r *AttestedCandidateResponse) Encode() ([]byte, error) { + return scale.Marshal(*r) +} + +// Decode returns the SCALE decoding of the AttestedCandidateResponse +func (r *AttestedCandidateResponse) Decode(in []byte) (err error) { + return scale.Unmarshal(in, r) +} + +// String returns a human-readable representation of the AttestedCandidateResponse +func (r *AttestedCandidateResponse) String() string { + return fmt.Sprintf( + "AttestedCandidateResponse CandidateReceipt=%v", + r.CandidateReceipt, + ) +} diff --git a/dot/parachain/network-bridge/messages/request_response_protocols.go b/dot/parachain/network-bridge/messages/request_response_protocols.go index cdf31357c3..4aa99f5bc0 100644 --- a/dot/parachain/network-bridge/messages/request_response_protocols.go +++ b/dot/parachain/network-bridge/messages/request_response_protocols.go @@ -21,6 +21,7 @@ const ( AvailableDataFetchingV1 StatementFetchingV1 DisputeSendingV1 + AttestedCandidateV2 ) func (n ReqProtocolName) String() string { @@ -39,6 +40,8 @@ func (n ReqProtocolName) String() string { return "req_statement/1" case DisputeSendingV1: return "send_dispute/1" + case AttestedCandidateV2: + return "/req_attested_candidate/2" default: panic("unknown protocol") } diff --git a/dot/parachain/statement-distribution/request_manager.go b/dot/parachain/statement-distribution/request_manager.go new file mode 100644 index 0000000000..2e82f1672d --- /dev/null +++ b/dot/parachain/statement-distribution/request_manager.go @@ -0,0 +1,549 @@ +// Copyright 2025 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + +package statementdistribution + +import ( + "bytes" + "fmt" + "slices" + "time" + + "github.com/ChainSafe/gossamer/dot/parachain/network-bridge/messages" + parachaintypes "github.com/ChainSafe/gossamer/dot/parachain/types" + "github.com/ChainSafe/gossamer/lib/common" + "github.com/libp2p/go-libp2p/core/peer" +) + +type origin byte + +const ( + cluster origin = 0 + unspecified origin = 1 +) + +// candidateIdentifier is an identifier for a candidate. +// +// In the [requestManager], we are requesting candidates for which we have no information other +// than the candidate hash and statements signed by validators. It is possible for +// validators for multiple groups to abuse this lack of information: until we actually +// get the preimage of this candidate we cannot confirm anything other than the candidate hash. +type candidateIdentifier struct { + // The relay-parent this candidate is ostensibly under. + relayParent common.Hash + // The hash of the candidate. + candidateHash parachaintypes.CandidateHash + // The index of the group claiming to be assigned to the candidate's para. + groupIndex parachaintypes.GroupIndex +} + +func (ci candidateIdentifier) String() string { + return fmt.Sprintf( + "relayParent=%s candidateHash=%s groupIndex=%d", + ci.relayParent, + ci.candidateHash, + ci.groupIndex, + ) +} + +func (ci candidateIdentifier) compare(other candidateIdentifier) int { + if comp := bytes.Compare(ci.relayParent[:], other.relayParent[:]); comp != 0 { + return comp + } + + if comp := bytes.Compare(ci.candidateHash.Value[:], other.candidateHash.Value[:]); comp != 0 { + return comp + } + + return int(ci.groupIndex - other.groupIndex) +} + +type candidateIdentifierSet map[candidateIdentifier]struct{} + +func (s candidateIdentifierSet) insert(id candidateIdentifier) { + s[id] = struct{}{} +} + +func (s candidateIdentifierSet) contains(id candidateIdentifier) bool { //nolint:unused + _, ok := s[id] + return ok +} + +func (s candidateIdentifierSet) retain(keep func(candidateIdentifier) bool) { + for id := range s { + if !keep(id) { + delete(s, id) + } + } +} + +// requestProperties is used in target selection and validation of a request. +type requestProperties struct { //nolint:unused + // A mask for limiting the statements the response is allowed to contain. + // The mask has `OR` semantics: statements by validators corresponding to bits + // in the mask are not desired. It also returns the required backing threshold + // for the candidate. + unwantedMask parachaintypes.StatementFilter + + // The required backing threshold, if any. If this is non-nil, then requests will only + // be made to peers which can provide enough statements to back the candidate, when + // taking into account the `unwantedMask`, and a response will only be validated + // in the case of those statements. + // + // If this is nil, it is assumed that only the candidate itself is needed. + backingThreshold *int +} + +type taggedResponse struct { //nolint:unused + identifier candidateIdentifier + requestedPeer peer.ID + props requestProperties + // payload is AttestedCandidateResponse, maybe ReqRespResult should be generic over the response type? + response chan messages.ReqRespResult +} + +type unhandledResponse struct { //nolint:unused + response taggedResponse +} + +type priority struct { + origin origin + attempts uint +} + +func (p priority) compare(other priority) int { + if p.origin != other.origin { + return int(p.origin - other.origin) + } + + return int(p.attempts - other.attempts) +} + +// requestedCandidate represents a pending request. +type requestedCandidate struct { + priority priority + knownBy []peer.ID // polkadot-sdk uses VecDeque + // Has the request been sent out and a response not yet received? + inFlight bool + // The timestamp for the next time we should retry, if the response failed. + nextRetryTime *time.Time +} + +func (rc requestedCandidate) isPending() bool { //nolint:unused + if rc.inFlight { + return false + } + + if rc.nextRetryTime != nil { + if time.Now().Before(*rc.nextRetryTime) { + return false + } + } + + return true +} + +type priorityCandidatePair struct { + priority + candidateIdentifier +} + +type priorityCandidatePairs []priorityCandidatePair + +func (p *priorityCandidatePairs) binarySearch(priority priority, candidateIdentifier candidateIdentifier) (int, bool) { + target := priorityCandidatePair{priority, candidateIdentifier} + + return slices.BinarySearchFunc( + *p, + target, + func(pair priorityCandidatePair, target priorityCandidatePair) int { + if comp := pair.priority.compare(target.priority); comp != 0 { + return comp + } + return pair.candidateIdentifier.compare(target.candidateIdentifier) + }, + ) +} + +func (p *priorityCandidatePairs) remove(index int) { + *p = slices.Delete(*p, index, index+1) +} + +func (p *priorityCandidatePairs) insert(prio priority, id candidateIdentifier, index int) { + *p = slices.Insert(*p, index, priorityCandidatePair{prio, id}) +} + +func (p *priorityCandidatePairs) retain(keep func(priorityCandidatePair) bool) { + n := 0 + for _, pair := range *p { + if keep(pair) { + (*p)[n] = pair + n++ + } + } + *p = (*p)[:n] +} + +// entry is used for manipulating a [requestedCandidate] stored in [requestManager] state. +type entry struct { + prevIndex int + identifier candidateIdentifier + byPriority *priorityCandidatePairs // using a pointer to enable append/delete ops on the slice + requested *requestedCandidate +} + +// addPeer adds a peer to the set of known peers. +func (e *entry) addPeer(p peer.ID) { + if !slices.Contains(e.requested.knownBy, p) { + e.requested.knownBy = append(e.requested.knownBy, p) + } +} + +// setClusterPriority notes that the candidate is required for the cluster. +// Calling this method mutates the state of the [requestManager] object from which this entry was obtained by calling +// requestManager.getOrInsert(). +func (e *entry) setClusterPriority() { + e.requested.priority.origin = cluster + + _ = insertOrUpdatePriority( + e.byPriority, + &e.prevIndex, + e.identifier, + e.requested.priority, + ) +} + +type requestManager struct { + requests map[candidateIdentifier]*requestedCandidate + // sorted by priority. + byPriority priorityCandidatePairs + // all unique identifiers for the candidate. + uniqueIdentifiers map[parachaintypes.CandidateHash]candidateIdentifierSet +} + +func newRequestManager() *requestManager { + return &requestManager{ + requests: make(map[candidateIdentifier]*requestedCandidate), + byPriority: make(priorityCandidatePairs, 0), + uniqueIdentifiers: make(map[parachaintypes.CandidateHash]candidateIdentifierSet), + } +} + +// getOrInsert gets an [entry] for mutating a request and inserts it if the +// manager doesn't store this request already. +func (rm *requestManager) getOrInsert( + relayParent common.Hash, + candidateHash parachaintypes.CandidateHash, + groupIndex parachaintypes.GroupIndex, +) *entry { + identifier := candidateIdentifier{relayParent, candidateHash, groupIndex} + var priorityIndex int + + candidate, knownRequest := rm.requests[identifier] + if !knownRequest { + candidate = &requestedCandidate{ + priority: priority{origin: unspecified, attempts: 0}, + knownBy: make([]peer.ID, 0), + inFlight: false, + nextRetryTime: nil, + } + + rm.requests[identifier] = candidate + + if uniques, ok := rm.uniqueIdentifiers[candidateHash]; !ok { + rm.uniqueIdentifiers[candidateHash] = candidateIdentifierSet{ + identifier: {}, + } + } else { + uniques.insert(identifier) + } + + priorityIndex = insertOrUpdatePriority( + &rm.byPriority, + nil, + identifier, + candidate.priority, + ) + } else { + var found bool + priorityIndex, found = rm.byPriority.binarySearch(candidate.priority, identifier) + if !found { + panic("requested candidates always have a priority entry; qed") + } + } + + return &entry{ + prevIndex: priorityIndex, + identifier: identifier, + byPriority: &rm.byPriority, + requested: candidate, + } +} + +// removeFor removes all pending requests for the given candidate. +func (rm *requestManager) removeFor(candidate parachaintypes.CandidateHash) { //nolint:unused + identifiers, ok := rm.uniqueIdentifiers[candidate] + if !ok { + return + } + + rm.byPriority.retain(func(pair priorityCandidatePair) bool { + return identifiers.contains(pair.candidateIdentifier) + }) + + for id := range identifiers { + delete(rm.requests, id) + } +} + +// removeByRelayParent removes pending requests based on relay-parent. +func (rm *requestManager) removeByRelayParent(relayParent common.Hash) { + candidateHashes := make(map[parachaintypes.CandidateHash]struct{}) + + // Remove from byPriority and requests. + rm.byPriority.retain(func(pair priorityCandidatePair) bool { + id := pair.candidateIdentifier + retain := relayParent != id.relayParent + if !retain { + delete(rm.requests, id) + candidateHashes[id.candidateHash] = struct{}{} + } + return retain + }) + + // Remove from uniqueIdentifiers. + for candidateHash := range candidateHashes { + uniqueIds, ok := rm.uniqueIdentifiers[candidateHash] + if ok { + uniqueIds.retain(func(id candidateIdentifier) bool { + return relayParent != id.relayParent + }) + + if len(uniqueIds) == 0 { + delete(rm.uniqueIdentifiers, candidateHash) + } + } + } + + logger.Debugf("Requests remaining after cleanup: %d", len(rm.byPriority)) +} + +// hasPendingRequests returns true if there are pending requests that are dispatchable. +func (rm *requestManager) hasPendingRequests() bool { //nolint:unused + for _, request := range rm.requests { + if request.isPending() { + return true + } + } + return false +} + +// nextRetryTime returns the time at which the next request to be retried will be ready. +func (rm *requestManager) nextRetryTime() *time.Time { //nolint:unused + var next *time.Time + + for _, request := range rm.requests { + if request.inFlight || request.nextRetryTime == nil { + continue + } + + if next == nil || request.nextRetryTime.Before(*next) { + next = request.nextRetryTime + } + } + + return next +} + +// TODO: replace with implementation (#4378) +type responseManager interface { //nolint:unused + incoming() *unhandledResponse + len() int + push(taggedResponse, peer.ID) + isSendingTo(peer.ID) bool +} + +// nextRequest yields the next request to dispatch, if there is any. +// +// This function accepts two closures as an argument. +// +// The first closure is used to gather information about the desired +// properties of a response, which is used to select targets and validate +// the response later on. +// +// The second closure is used to determine the specific advertised +// statements by a peer, to be compared against the mask and backing +// threshold and returns `None` if the peer is no longer connected. +func (rm *requestManager) nextRequest( //nolint:unused + responseManager responseManager, + requestProps func(candidateIdentifier) *requestProperties, + peerAdvertised func(candidateIdentifier, peer.ID) *parachaintypes.StatementFilter, +) *messages.OutgoingRequest { + // The number of parallel requests a node can answer is limited by + // `MAX_PARALLEL_ATTESTED_CANDIDATE_REQUESTS`, however there is no + // need for the current node to limit itself to the same amount of + // requests, because the requests are going to different nodes anyways. + // While looking at https://github.com/paritytech/polkadot-sdk/issues/3314, + // found out that these requests take around 100ms to fulfil, so it + // would make sense to try to request things as early as we can, given + // we would need to request it for each candidate, around 25 right now + // on kusama. + if responseManager.len() >= 2*messages.MaxParallelAttestedCandidateRequests { + return nil + } + + var res *messages.OutgoingRequest + + type idWithIndex struct { + id candidateIdentifier + index int + } + var cleanupOutdated []idWithIndex + + for i, pair := range rm.byPriority { + id := pair.candidateIdentifier + entry, ok := rm.requests[id] + if !ok { + logger.Errorf("Missing entry for priority queue member identifier=%s", id.String()) + continue + } + + if !entry.isPending() { + continue + } + + props := requestProps(id) + if props == nil { + cleanupOutdated = append(cleanupOutdated, idWithIndex{id, i}) + continue + } + + target := findRequestTargetWithUpdate( + &entry.knownBy, + id, + *props, + peerAdvertised, + responseManager, + ) + if target == nil { + continue + } + + logger.Debugf("Issuing candidate request candidateHash=%s peer=%s", id.candidateHash.String(), *target) + + res = messages.NewOutgoingRequest( + parachaintypes.PeerID(*target), + &messages.AttestedCandidateRequest{ + CandidateHash: id.candidateHash, + Mask: props.unwantedMask, + }, + ) + + responseManager.push( + taggedResponse{ + identifier: id, + requestedPeer: *target, + props: *props, + response: res.Result, + }, + *target, + ) + + break + } + + for _, idIdx := range cleanupOutdated { + rm.byPriority.remove(idIdx.index) + delete(rm.requests, idIdx.id) + + identifiers, ok := rm.uniqueIdentifiers[idIdx.id.candidateHash] + if ok { + delete(identifiers, idIdx.id) + if len(identifiers) == 0 { + delete(rm.uniqueIdentifiers, idIdx.id.candidateHash) + } + } + } + + return res +} + +func insertOrUpdatePriority( + prioritySorted *priorityCandidatePairs, + prevIndex *int, + candidateIdentifier candidateIdentifier, + newPriority priority, +) int { + if prevIndex != nil { + // GIGO: this behaves strangely if prev-index is not for the + // expected identifier. + if (*prioritySorted)[*prevIndex].priority == newPriority { + // unchanged. + return *prevIndex + } else { + prioritySorted.remove(*prevIndex) + } + } + + index, found := prioritySorted.binarySearch(newPriority, candidateIdentifier) + if !found { + prioritySorted.insert(newPriority, candidateIdentifier, index) + } + + return index +} + +// findRequestTargetWithUpdate finds a valid request target, returning nil if none exists. +// Cleans up disconnected peers and places the returned peer at the back of the queue. +// The method mutates the `knownBy` argument. +func findRequestTargetWithUpdate( //nolint:unused + knownBy *[]peer.ID, + candidateIdentifier candidateIdentifier, + props requestProperties, + peerAdvertised func(candidateIdentifier, peer.ID) *parachaintypes.StatementFilter, + responseManager responseManager, +) *peer.ID { + if knownBy == nil { + logger.Debugf("Unexpected call to findRequestTargetWithUpdate() with nil knownBy argument") + return nil + } + + type peerWithIndex struct { + peer peer.ID + index int + } + var target *peerWithIndex + var prune []int + + for i, p := range *knownBy { + // If we are already sending to that peer, skip for now + if responseManager.isSendingTo(p) { + continue + } + + filter := peerAdvertised(candidateIdentifier, p) + if filter == nil { + prune = append(prune, i) + continue + } + + filter.MaskSeconded(props.unwantedMask.SecondedInGroup) + filter.MaskValid(props.unwantedMask.ValidatedInGroup) + if secondedAndSufficient(filter, props.backingThreshold) { + target = &peerWithIndex{p, i} + break + } + } + + for _, idx := range prune { + *knownBy = slices.Delete(*knownBy, idx, idx+1) + } + + if target != nil { + idx := target.index - len(prune) + *knownBy = slices.Delete(*knownBy, idx, idx+1) + return &target.peer + } + + return nil +} diff --git a/dot/parachain/statement-distribution/request_manager_test.go b/dot/parachain/statement-distribution/request_manager_test.go new file mode 100644 index 0000000000..7383d7e66d --- /dev/null +++ b/dot/parachain/statement-distribution/request_manager_test.go @@ -0,0 +1,118 @@ +// Copyright 2025 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + +package statementdistribution + +import ( + "testing" + + parachaintypes "github.com/ChainSafe/gossamer/dot/parachain/types" + "github.com/ChainSafe/gossamer/lib/common" + "github.com/stretchr/testify/require" +) + +func TestRequestManager(t *testing.T) { + parentA := common.Hash{1} + parentB := common.Hash{2} + parentC := common.Hash{3} + + candidateA1 := parachaintypes.CandidateHash{Value: common.Hash{11}} + candidateA2 := parachaintypes.CandidateHash{Value: common.Hash{12}} + candidateB1 := parachaintypes.CandidateHash{Value: common.Hash{21}} + candidateB2 := parachaintypes.CandidateHash{Value: common.Hash{22}} + candidateC1 := parachaintypes.CandidateHash{Value: common.Hash{31}} + duplicateHash := parachaintypes.CandidateHash{Value: common.Hash{31}} + + t.Run("removeByRelayParent", func(t *testing.T) { + manager := newRequestManager() + manager.getOrInsert(parentA, candidateA1, 1) + manager.getOrInsert(parentA, candidateA2, 1) + manager.getOrInsert(parentB, candidateB1, 1) + manager.getOrInsert(parentB, candidateB2, 2) + manager.getOrInsert(parentC, candidateC1, 2) + manager.getOrInsert(parentA, duplicateHash, 1) + + require.Len(t, manager.requests, 6) + require.Len(t, manager.byPriority, 6) + require.Len(t, manager.uniqueIdentifiers, 5) + + manager.removeByRelayParent(parentA) + + require.Len(t, manager.requests, 3) + require.Len(t, manager.byPriority, 3) + require.Len(t, manager.uniqueIdentifiers, 3) + + _, ok := manager.uniqueIdentifiers[candidateA1] + require.False(t, ok) + + _, ok = manager.uniqueIdentifiers[candidateA2] + require.False(t, ok) + + // Duplicate hash should still be there (under a different parent). + _, ok = manager.uniqueIdentifiers[duplicateHash] + require.True(t, ok) + + manager.removeByRelayParent(parentB) + + require.Len(t, manager.requests, 1) + require.Len(t, manager.byPriority, 1) + require.Len(t, manager.uniqueIdentifiers, 1) + + _, ok = manager.uniqueIdentifiers[candidateB1] + require.False(t, ok) + + _, ok = manager.uniqueIdentifiers[candidateB2] + require.False(t, ok) + + manager.removeByRelayParent(parentC) + + require.Empty(t, manager.requests) + require.Empty(t, manager.byPriority) + require.Empty(t, manager.uniqueIdentifiers) + }) + + t.Run("test_priority_ordering", func(t *testing.T) { + manager := newRequestManager() + + // Add some entries, set a couple of them to cluster (high) priority. + identifierA1 := manager.getOrInsert(parentA, candidateA1, 1).identifier + + entry := manager.getOrInsert(parentA, candidateA2, 1) + entry.setClusterPriority() + identifierA2 := entry.identifier + + identifierB1 := manager.getOrInsert(parentB, candidateB1, 1).identifier + + identifierB2 := manager.getOrInsert(parentB, candidateB2, 2).identifier + + entry = manager.getOrInsert(parentC, candidateC1, 2) + entry.setClusterPriority() + identifierC1 := entry.identifier + + require.Equal( + t, + manager.byPriority, + priorityCandidatePairs{ + { + priority: priority{origin: cluster, attempts: 0}, + candidateIdentifier: identifierA2, + }, + { + priority: priority{origin: cluster, attempts: 0}, + candidateIdentifier: identifierC1, + }, + { + priority: priority{origin: unspecified, attempts: 0}, + candidateIdentifier: identifierA1, + }, + { + priority: priority{origin: unspecified, attempts: 0}, + candidateIdentifier: identifierB1, + }, + { + priority: priority{origin: unspecified, attempts: 0}, + candidateIdentifier: identifierB2, + }, + }) + }) +} diff --git a/dot/parachain/statement-distribution/state_v2.go b/dot/parachain/statement-distribution/state_v2.go index 522e710e99..30322efe2e 100644 --- a/dot/parachain/statement-distribution/state_v2.go +++ b/dot/parachain/statement-distribution/state_v2.go @@ -61,7 +61,7 @@ type activeValidatorState struct { index parachaintypes.ValidatorIndex groupIndex parachaintypes.GroupIndex assignments []parachaintypes.ParaID - clusterTracker any // TODO: use cluster tracker implementation (#4713) + clusterTracker clusterTracker } // skipcq:SCC-U1000 @@ -220,6 +220,6 @@ type v2State struct { peers map[peer.ID]peerState keystore keystore.Keystore authorities map[parachaintypes.AuthorityDiscoveryID]string - requestManager any // TODO: #4377 - responseManager any // TODO: #4378 + requestManager *requestManager + responseManager responseManager // TODO: when switching to implementation, might want to use a pointer here (#4378) } diff --git a/dot/parachain/statement-distribution/statement_distribution.go b/dot/parachain/statement-distribution/statement_distribution.go index 44f6585083..644431c6d9 100644 --- a/dot/parachain/statement-distribution/statement_distribution.go +++ b/dot/parachain/statement-distribution/statement_distribution.go @@ -587,10 +587,9 @@ func (s *StatementDistribution) handleIncomingManifest( // 5. if unconfirmed, add request entry logger.Tracef("Unknown candidate - requesting candidateHash=%s", manifest.CandidateHash.String()) - // TODO #4377 - //state.requestManager. - // getOrInsert(manifest.RelayParent, manifest.CandidateHash, manifest.GroupIndex). - // addPeer(peer) + state.requestManager. + getOrInsert(manifest.RelayParent, manifest.CandidateHash, manifest.GroupIndex). + addPeer(peer) } } @@ -882,3 +881,12 @@ func pendingStatementNetworkMessage( return nil } + +// secondedAndSufficient returns true if the statement filter meets the backing threshold for grid requests. +func secondedAndSufficient(filter *parachaintypes.StatementFilter, backingThreshold *int) bool { //nolint:unused + if backingThreshold == nil { + return true + } + + return filter.HasSeconded() && filter.BackingValidators() >= *backingThreshold +} From 1cb193d7efe39ae022a29a5a6fd26d29387f97bb Mon Sep 17 00:00:00 2001 From: Haiko Schol Date: Thu, 3 Jul 2025 17:11:26 +0700 Subject: [PATCH 11/12] lint --- dot/parachain/statement-distribution/statement_distribution.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dot/parachain/statement-distribution/statement_distribution.go b/dot/parachain/statement-distribution/statement_distribution.go index 959259a320..7668962128 100644 --- a/dot/parachain/statement-distribution/statement_distribution.go +++ b/dot/parachain/statement-distribution/statement_distribution.go @@ -145,7 +145,7 @@ func (s *StatementDistribution) awaitMessageFrom( // upon learning about a new relay parent. func (s *StatementDistribution) sendPendingGridMessages( rp common.Hash, - peerID peer.ID, //nolint:unparam + peerID peer.ID, //nolint:unparam validationVersion validationprotocol.ValidationVersion, //nolint:unparam peerValidatorID parachaintypes.ValidatorIndex, groups *groups, From c748f5b9b698fe0ea03bbeeebfc6032d78f23d5c Mon Sep 17 00:00:00 2001 From: Haiko Schol Date: Fri, 4 Jul 2025 12:28:19 +0700 Subject: [PATCH 12/12] address review feedback --- .../statement-distribution/request_manager.go | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/dot/parachain/statement-distribution/request_manager.go b/dot/parachain/statement-distribution/request_manager.go index 2e82f1672d..79a30e2cf5 100644 --- a/dot/parachain/statement-distribution/request_manager.go +++ b/dot/parachain/statement-distribution/request_manager.go @@ -18,8 +18,8 @@ import ( type origin byte const ( - cluster origin = 0 - unspecified origin = 1 + cluster origin = iota + unspecified ) // candidateIdentifier is an identifier for a candidate. @@ -134,10 +134,8 @@ func (rc requestedCandidate) isPending() bool { //nolint:unused return false } - if rc.nextRetryTime != nil { - if time.Now().Before(*rc.nextRetryTime) { - return false - } + if rc.nextRetryTime != nil && time.Now().Before(*rc.nextRetryTime) { + return false } return true @@ -480,9 +478,8 @@ func insertOrUpdatePriority( if (*prioritySorted)[*prevIndex].priority == newPriority { // unchanged. return *prevIndex - } else { - prioritySorted.remove(*prevIndex) } + prioritySorted.remove(*prevIndex) } index, found := prioritySorted.binarySearch(newPriority, candidateIdentifier)