Skip to content

Commit b9ec702

Browse files
committed
gossipsub: implement core topic table extension
1 parent ee80a36 commit b9ec702

File tree

3 files changed

+474
-15
lines changed

3 files changed

+474
-15
lines changed

extensions.go

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -34,16 +34,13 @@ func WithTestExtension(c TestExtensionConfig) Option {
3434
func WithTopicTableExtension(c TopicTableExtensionConfig) Option {
3535
return func(ps *PubSub) error {
3636
if rt, ok := ps.rt.(*GossipSubRouter); ok {
37-
rt.extensions.topicTableExtension = &topicTableExtension{}
38-
39-
bundleHashes := make([][]byte, len(c.topicBundles))
40-
for _, topics := range c.topicBundles {
41-
hash := computeTopicBundleHash(topics)
42-
bundleHashes = append(bundleHashes, hash[:])
43-
}
44-
rt.extensions.myExtensions.TopicTableExtension = &pubsub_pb.ExtTopicTable{
45-
TopicBundleHashes: bundleHashes,
37+
e, err := newTopicTableExtension(c.topicBundles)
38+
if err != nil {
39+
return err
4640
}
41+
42+
rt.extensions.myExtensions.TopicTableExtension = e.GetControlExtension()
43+
rt.extensions.topicTableExtension = e
4744
}
4845
return nil
4946
}
@@ -169,6 +166,21 @@ func (es *extensionsState) extensionsAddPeer(id peer.ID) {
169166
if es.myExtensions.TestExtension && es.peerExtensions[id].TestExtension {
170167
es.testExtension.AddPeer(id)
171168
}
169+
if es.myExtensions.TopicTableExtension != nil && es.peerExtensions[id].TopicTableExtension != nil {
170+
hashSlices := es.peerExtensions[id].TopicTableExtension.GetTopicBundleHashes()
171+
// Parsing the slices of bytes, to get a slice of TopicBundleHash
172+
bundleHashes := make([]TopicBundleHash, len(hashSlices))
173+
for _, buf := range hashSlices {
174+
hash, err := newTopicBundleHash(buf)
175+
if err != nil {
176+
// If there is an error parsing the hash, just quietly return
177+
return
178+
}
179+
bundleHashes = append(bundleHashes, *hash)
180+
}
181+
// If there is an error adding a peer, just quietly skip it
182+
_ = es.topicTableExtension.AddPeer(id, bundleHashes)
183+
}
172184
}
173185

174186
// extensionsRemovePeer is always called after extensionsAddPeer.

topictable_extension.go

Lines changed: 149 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,170 @@
11
package pubsub
22

33
import (
4+
"bytes"
45
"crypto/sha256"
6+
"fmt"
7+
pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb"
8+
"github.com/libp2p/go-libp2p/core/peer"
59
"sort"
610
"strings"
711
)
812

13+
type TopicBundleHash [4]byte
14+
15+
func newTopicBundleHash(bytes []byte) (*TopicBundleHash, error) {
16+
if len(bytes) != 4 {
17+
return nil, fmt.Errorf("expected 4 bytes for TopicBundleHash found: %d", len(bytes))
18+
}
19+
var hash TopicBundleHash
20+
copy(hash[:], bytes)
21+
22+
return &hash, nil
23+
}
24+
925
type topicTableExtension struct {
26+
bundleHashes []TopicBundleHash
27+
intersectedHashes map[peer.ID][]TopicBundleHash
28+
29+
indexToName map[TopicBundleHash][]string // bundle hash -> list of topics
30+
nameToIndex map[TopicBundleHash]map[string]int // bundle hash -> topic -> 0-based index in bundle
1031
}
1132

12-
type TopicBundleHash [4]byte
33+
func newTopicTableExtension(myBundles [][]string) (*topicTableExtension, error) {
34+
bundleHashes := make([]TopicBundleHash, 0, len(myBundles))
1335

14-
func computeTopicBundleHash(topics []string) TopicBundleHash {
15-
sortedTopics := make([]string, len(topics))
16-
copy(sortedTopics, topics)
36+
indexToName := make(map[TopicBundleHash][]string)
37+
nameToIndex := make(map[TopicBundleHash]map[string]int)
1738

18-
sort.Strings(sortedTopics)
39+
for _, topics := range myBundles {
40+
sort.Strings(topics)
1941

20-
concatenated := strings.Join(sortedTopics, "")
42+
hash := computeTopicBundleHash(topics)
43+
bundleHashes = append(bundleHashes, hash)
44+
45+
indexToName[hash] = topics
46+
nameToIndex[hash] = make(map[string]int)
47+
for idx, topic := range topics {
48+
nameToIndex[hash][topic] = idx
49+
}
50+
}
51+
if err := validateBundles(bundleHashes); err != nil {
52+
return nil, err
53+
}
54+
e := &topicTableExtension{
55+
bundleHashes: bundleHashes,
56+
intersectedHashes: make(map[peer.ID][]TopicBundleHash),
57+
indexToName: indexToName,
58+
nameToIndex: nameToIndex,
59+
}
60+
return e, nil
61+
}
62+
63+
func (e *topicTableExtension) GetControlExtension() *pubsub_pb.ExtTopicTable {
64+
hashSlices := make([][]byte, 0, len(e.bundleHashes))
65+
66+
for _, hash := range e.bundleHashes {
67+
hashSlices = append(hashSlices, hash[:])
68+
}
69+
return &pubsub_pb.ExtTopicTable{
70+
TopicBundleHashes: hashSlices,
71+
}
72+
}
73+
74+
func (e *topicTableExtension) AddPeer(id peer.ID, bundles []TopicBundleHash) error {
75+
if err := validateBundles(bundles); err != nil {
76+
return err
77+
}
78+
e.intersectedHashes[id] = computeBundleIntersection(e.bundleHashes, bundles)
79+
return nil
80+
}
81+
82+
// Note that topicIndex is 1-based
83+
func (e *topicTableExtension) GetTopicName(id peer.ID, topicIndex int) (string, error) {
84+
if topicIndex < 1 {
85+
return "", fmt.Errorf("Invalid topic index: %d", topicIndex)
86+
}
87+
88+
// Turn the index to 0-based
89+
idx := topicIndex - 1
2190

91+
for _, hash := range e.intersectedHashes[id] {
92+
if idx < len(e.indexToName[hash]) {
93+
return e.indexToName[hash][idx], nil
94+
} else {
95+
idx -= len(e.indexToName[hash])
96+
}
97+
}
98+
return "", fmt.Errorf("Invalid topic index: %d", topicIndex)
99+
}
100+
101+
// It returns a 1-based index
102+
func (e *topicTableExtension) GetTopicIndex(id peer.ID, topicName string) (int, error) {
103+
topicIndex := 0
104+
105+
for _, hash := range e.intersectedHashes[id] {
106+
if idx, ok := e.nameToIndex[hash][topicName]; ok {
107+
topicIndex += idx
108+
// Turn the index to 1-based
109+
topicIndex += 1
110+
return topicIndex, nil
111+
} else {
112+
topicIndex += len(e.nameToIndex[hash])
113+
}
114+
}
115+
return 0, fmt.Errorf("The topic not found: %s", topicName)
116+
}
117+
118+
func validateBundles(bundles []TopicBundleHash) error {
119+
seen := make(map[TopicBundleHash]struct{}, len(bundles))
120+
for _, bundle := range bundles {
121+
if _, ok := seen[bundle]; ok {
122+
return fmt.Errorf("duplicates found")
123+
}
124+
seen[bundle] = struct{}{}
125+
}
126+
return nil
127+
}
128+
129+
// Assume that the topics have been sorted
130+
func computeTopicBundleHash(sortedTopics []string) TopicBundleHash {
131+
concatenated := strings.Join(sortedTopics, "")
22132
hash := sha256.Sum256([]byte(concatenated))
23133

24134
var result TopicBundleHash
25135
copy(result[:], hash[len(hash)-4:])
26136
return result
27137
}
138+
139+
func computeBundleIntersection(first, second []TopicBundleHash) []TopicBundleHash {
140+
var result []TopicBundleHash
141+
142+
// Find common prefix where elements at each index are equal in both slices.
143+
for i := 0; i < min(len(first), len(second)) && bytes.Equal(first[i][:], second[i][:]); i++ {
144+
result = append(result, first[i])
145+
}
146+
147+
// Store the length of the matching prefix. This is our marker.
148+
prefixLen := len(result)
149+
150+
// Build a set of the remaining elements in the first slice after the prefix.
151+
// For each remaining element in the second slice, if it exists in the set,
152+
// add it to the result. (Duplicates possible if not validated up front.)
153+
seen := make(map[TopicBundleHash]struct{})
154+
for _, v := range first[prefixLen:] {
155+
seen[v] = struct{}{}
156+
}
157+
for _, v := range second[prefixLen:] {
158+
if _, ok := seen[v]; ok {
159+
result = append(result, v)
160+
}
161+
}
162+
163+
// Sort the unordered tail lexicographically.
164+
unordered := result[prefixLen:]
165+
sort.Slice(unordered, func(i, j int) bool {
166+
return bytes.Compare(unordered[i][:], unordered[j][:]) < 0
167+
})
168+
169+
return result
170+
}

0 commit comments

Comments
 (0)