Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion comm.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (p *PubSub) getHelloPacket() *RPC {

for t := range subscriptions {
as := &pb.RPC_SubOpts{
Topicid: proto.String(t),
TopicRef: &pb.RPC_SubOpts_Topicid{Topicid: t},
Subscribe: proto.Bool(true),
}
rpc.Subscriptions = append(rpc.Subscriptions, as)
Expand Down
2 changes: 1 addition & 1 deletion compat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func TestMultitopicMessageCompatibility(t *testing.T) {
From: []byte("A"),
Data: []byte("blah"),
Seqno: []byte("123"),
Topic: &topic1,
TopicRef: &pb.Message_Topic{Topic: topic1},
Signature: []byte("a-signature"),
Key: []byte("a-key"),
}
Expand Down
77 changes: 70 additions & 7 deletions extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,18 @@ import (
)

type PeerExtensions struct {
TestExtension bool
TestExtension bool
TopicTableExtension *pubsub_pb.ExtTopicTable
}

type TestExtensionConfig struct {
OnReceiveTestExtension func(from peer.ID)
}

type TopicTableExtensionConfig struct {
TopicBundles [][]string
}

func WithTestExtension(c TestExtensionConfig) Option {
return func(ps *PubSub) error {
if rt, ok := ps.rt.(*GossipSubRouter); ok {
Expand All @@ -26,6 +31,21 @@ func WithTestExtension(c TestExtensionConfig) Option {
}
}

func WithTopicTableExtension(c TopicTableExtensionConfig) Option {
return func(ps *PubSub) error {
if rt, ok := ps.rt.(*GossipSubRouter); ok {
e, err := newTopicTableExtension(c.TopicBundles)
if err != nil {
return err
}

rt.extensions.myExtensions.TopicTableExtension = e.GetControlExtension()
rt.extensions.topicTableExtension = e
}
return nil
}
}

func hasPeerExtensions(rpc *RPC) bool {
if rpc != nil && rpc.Control != nil && rpc.Control.Extensions != nil {
return true
Expand All @@ -37,18 +57,29 @@ func peerExtensionsFromRPC(rpc *RPC) PeerExtensions {
out := PeerExtensions{}
if hasPeerExtensions(rpc) {
out.TestExtension = rpc.Control.Extensions.GetTestExtension()
out.TopicTableExtension = rpc.Control.Extensions.GetTopicTableExtension()
}
return out
}

func (pe *PeerExtensions) ExtendRPC(rpc *RPC) *RPC {
func (pe *PeerExtensions) ExtendHelloRPC(rpc *RPC) *RPC {
if pe.TestExtension {
if rpc.Control == nil {
rpc.Control = &pubsub_pb.ControlMessage{}
}
rpc.Control.Extensions = &pubsub_pb.ControlExtensions{
TestExtension: &pe.TestExtension,
if rpc.Control.Extensions == nil {
rpc.Control.Extensions = &pubsub_pb.ControlExtensions{}
}
rpc.Control.Extensions.TestExtension = &pe.TestExtension
}
if pe.TopicTableExtension != nil {
if rpc.Control == nil {
rpc.Control = &pubsub_pb.ControlMessage{}
}
if rpc.Control.Extensions == nil {
rpc.Control.Extensions = &pubsub_pb.ControlExtensions{}
}
rpc.Control.Extensions.TopicTableExtension = pe.TopicTableExtension
}
return rpc
}
Expand All @@ -60,7 +91,8 @@ type extensionsState struct {
reportMisbehavior func(peer.ID)
sendRPC func(p peer.ID, r *RPC, urgent bool)

testExtension *testExtension
testExtension *testExtension
topicTableExtension *topicTableExtension
}

func newExtensionsState(myExtensions PeerExtensions, reportMisbehavior func(peer.ID), sendRPC func(peer.ID, *RPC, bool)) *extensionsState {
Expand All @@ -70,7 +102,9 @@ func newExtensionsState(myExtensions PeerExtensions, reportMisbehavior func(peer
sentExtensions: make(map[peer.ID]struct{}),
reportMisbehavior: reportMisbehavior,
sendRPC: sendRPC,
testExtension: nil,

testExtension: nil,
topicTableExtension: nil,
}
}

Expand All @@ -96,9 +130,16 @@ func (es *extensionsState) HandleRPC(rpc *RPC) {
es.extensionsHandleRPC(rpc)
}

func (es *extensionsState) InterceptRPC(rpc *RPC) *RPC {
if es.myExtensions.TopicTableExtension != nil && es.peerExtensions[rpc.from].TopicTableExtension != nil {
rpc = es.topicTableExtension.InterceptRPC(rpc)
}
return rpc
}

func (es *extensionsState) AddPeer(id peer.ID, helloPacket *RPC) *RPC {
// Send our extensions as the first message.
helloPacket = es.myExtensions.ExtendRPC(helloPacket)
helloPacket = es.myExtensions.ExtendHelloRPC(helloPacket)

es.sentExtensions[id] = struct{}{}
if _, ok := es.peerExtensions[id]; ok {
Expand Down Expand Up @@ -126,12 +167,34 @@ func (es *extensionsState) RemovePeer(id peer.ID) {
}
}

func (es *extensionsState) ExtendRPC(id peer.ID, rpc *RPC) *RPC {
if es.myExtensions.TopicTableExtension != nil && es.peerExtensions[id].TopicTableExtension != nil {
rpc = es.topicTableExtension.ExtendRPC(id, rpc)
}
return rpc
}

// extensionsAddPeer is only called once we've both sent and received the
// extensions control message.
func (es *extensionsState) extensionsAddPeer(id peer.ID) {
if es.myExtensions.TestExtension && es.peerExtensions[id].TestExtension {
es.testExtension.AddPeer(id)
}
if es.myExtensions.TopicTableExtension != nil && es.peerExtensions[id].TopicTableExtension != nil {
hashSlices := es.peerExtensions[id].TopicTableExtension.GetTopicBundleHashes()
// Parsing the slices of bytes, to get a slice of TopicBundleHash
bundleHashes := make([]TopicBundleHash, len(hashSlices))
for _, buf := range hashSlices {
hash, err := newTopicBundleHash(buf)
if err != nil {
// If there is an error parsing the hash, just quietly return
return
}
bundleHashes = append(bundleHashes, *hash)
}
// If there is an error adding a peer, just quietly skip it
_ = es.topicTableExtension.AddPeer(id, bundleHashes)
}
}

// extensionsRemovePeer is always called after extensionsAddPeer.
Expand Down
4 changes: 4 additions & 0 deletions floodsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ func (fs *FloodSubRouter) AddPeer(p peer.ID, proto protocol.ID, hello *RPC) *RPC
return hello
}

func (fs *FloodSubRouter) InterceptRPC(rpc *RPC) *RPC {
return rpc
}

func (fs *FloodSubRouter) RemovePeer(p peer.ID) {
fs.tracer.RemovePeer(p)
}
Expand Down
8 changes: 4 additions & 4 deletions fuzz_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func generateSub(data []byte, limit int) *pb.RPC_SubOpts {
subscribe := generateBool(&data)

str := string(make([]byte, topicIDSize))
return &pb.RPC_SubOpts{Subscribe: &subscribe, Topicid: &str}
return &pb.RPC_SubOpts{Subscribe: &subscribe, TopicRef: &pb.RPC_SubOpts_Topicid{Topicid: str}}
}

func generateControl(data []byte, limit int) *pb.ControlMessage {
Expand Down Expand Up @@ -65,7 +65,7 @@ func generateControl(data []byte, limit int) *pb.ControlMessage {
msgCount := int(generateU16(&data)) % limit
topicSize := int(generateU16(&data)) % limit
topic := string(make([]byte, topicSize))
ctl.Ihave = append(ctl.Ihave, &pb.ControlIHave{TopicID: &topic})
ctl.Ihave = append(ctl.Ihave, &pb.ControlIHave{TopicRef: &pb.ControlIHave_TopicID{TopicID: topic}})

ctl.Ihave[i].MessageIDs = make([]string, 0, msgCount)
for j := 0; j < msgCount; j++ {
Expand All @@ -81,7 +81,7 @@ func generateControl(data []byte, limit int) *pb.ControlMessage {
for i := 0; i < numGraft; i++ {
topicSize := int(generateU16(&data)) % limit
topic := string(make([]byte, topicSize))
ctl.Graft = append(ctl.Graft, &pb.ControlGraft{TopicID: &topic})
ctl.Graft = append(ctl.Graft, &pb.ControlGraft{TopicRef: &pb.ControlGraft_TopicID{TopicID: topic}})
}
if ctl.Size() > limit {
return &pb.ControlMessage{}
Expand All @@ -92,7 +92,7 @@ func generateControl(data []byte, limit int) *pb.ControlMessage {
for i := 0; i < numPrune; i++ {
topicSize := int(generateU16(&data)) % limit
topic := string(make([]byte, topicSize))
ctl.Prune = append(ctl.Prune, &pb.ControlPrune{TopicID: &topic})
ctl.Prune = append(ctl.Prune, &pb.ControlPrune{TopicRef: &pb.ControlPrune_TopicID{TopicID: topic}})
}
if ctl.Size() > limit {
return &pb.ControlMessage{}
Expand Down
21 changes: 16 additions & 5 deletions gossipsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -780,6 +780,13 @@ loop:
return helloPacket
}

func (gs *GossipSubRouter) InterceptRPC(rpc *RPC) *RPC {
if gs.feature(GossipSubFeatureExtensions, gs.peers[rpc.from]) {
return gs.extensions.InterceptRPC(rpc)
}
return rpc
}

func (gs *GossipSubRouter) RemovePeer(p peer.ID) {
gs.logger.Debug("PEERDOWN: Remove disconnected peer", "peer", p)
gs.tracer.RemovePeer(p)
Expand Down Expand Up @@ -1463,7 +1470,7 @@ func (gs *GossipSubRouter) Leave(topic string) {
}

func (gs *GossipSubRouter) sendGraft(p peer.ID, topic string) {
graft := []*pb.ControlGraft{{TopicID: &topic}}
graft := []*pb.ControlGraft{{TopicRef: &pb.ControlGraft_TopicID{TopicID: topic}}}
out := rpcWithControl(nil, nil, nil, graft, nil, nil)
gs.sendRPC(p, out, false)
}
Expand Down Expand Up @@ -1532,6 +1539,10 @@ func (gs *GossipSubRouter) doDropRPC(rpc *RPC, p peer.ID, reason string) {

func (gs *GossipSubRouter) doSendRPC(rpc *RPC, p peer.ID, q *rpcQueue, urgent bool) {
var err error
// let extensions modify the rpc before sending if the other peer supports extensions
if gs.feature(GossipSubFeatureExtensions, gs.peers[p]) {
rpc = gs.extensions.ExtendRPC(p, rpc)
}
if urgent {
err = q.UrgentPush(rpc, false)
} else {
Expand Down Expand Up @@ -1925,7 +1936,7 @@ func (gs *GossipSubRouter) sendGraftPrune(tograft, toprune map[peer.ID][]string,
// topic here changes with every
// iteration of the slice.
copiedID := topic
graft = append(graft, &pb.ControlGraft{TopicID: &copiedID})
graft = append(graft, &pb.ControlGraft{TopicRef: &pb.ControlGraft_TopicID{TopicID: copiedID}})
}

var prune []*pb.ControlPrune
Expand Down Expand Up @@ -2007,7 +2018,7 @@ func (gs *GossipSubRouter) emitGossip(topic string, exclude map[peer.ID]struct{}
shuffleStrings(mids)
copy(peerMids, mids)
}
gs.enqueueGossip(p, &pb.ControlIHave{TopicID: &topic, MessageIDs: peerMids})
gs.enqueueGossip(p, &pb.ControlIHave{TopicRef: &pb.ControlIHave_TopicID{TopicID: topic}, MessageIDs: peerMids})
}
}

Expand Down Expand Up @@ -2104,7 +2115,7 @@ func (gs *GossipSubRouter) piggybackControl(p peer.ID, out *RPC, ctl *pb.Control
func (gs *GossipSubRouter) makePrune(p peer.ID, topic string, doPX bool, isUnsubscribe bool) *pb.ControlPrune {
if !gs.feature(GossipSubFeaturePX, gs.peers[p]) {
// GossipSub v1.0 -- no peer exchange, the peer won't be able to parse it anyway
return &pb.ControlPrune{TopicID: &topic}
return &pb.ControlPrune{TopicRef: &pb.ControlPrune_TopicID{TopicID: topic}}
}

backoff := uint64(gs.params.PruneBackoff / time.Second)
Expand All @@ -2130,7 +2141,7 @@ func (gs *GossipSubRouter) makePrune(p peer.ID, topic string, doPX bool, isUnsub
}
}

return &pb.ControlPrune{TopicID: &topic, Peers: px, Backoff: &backoff}
return &pb.ControlPrune{TopicRef: &pb.ControlPrune_TopicID{TopicID: topic}, Peers: px, Backoff: &backoff}
}

func (gs *GossipSubRouter) getPeers(topic string, count int, filter func(peer.ID) bool) []peer.ID {
Expand Down
40 changes: 20 additions & 20 deletions gossipsub_spam_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ func TestGossipsubAttackSpamIWANT(t *testing.T) {
if sub.GetSubscribe() {
// Reply by subcribing to the topic and grafting to the peer
writeMsg(&pb.RPC{
Subscriptions: []*pb.RPC_SubOpts{{Subscribe: sub.Subscribe, Topicid: sub.Topicid}},
Control: &pb.ControlMessage{Graft: []*pb.ControlGraft{{TopicID: sub.Topicid}}},
Subscriptions: []*pb.RPC_SubOpts{{Subscribe: sub.Subscribe, TopicRef: &pb.RPC_SubOpts_Topicid{Topicid: sub.GetTopicid()}}},
Control: &pb.ControlMessage{Graft: []*pb.ControlGraft{{TopicRef: &pb.ControlGraft_TopicID{TopicID: sub.GetTopicid()}}}},
})

go func() {
Expand Down Expand Up @@ -194,8 +194,8 @@ func TestGossipsubAttackSpamIHAVE(t *testing.T) {
if sub.GetSubscribe() {
// Reply by subcribing to the topic and grafting to the peer
writeMsg(&pb.RPC{
Subscriptions: []*pb.RPC_SubOpts{{Subscribe: sub.Subscribe, Topicid: sub.Topicid}},
Control: &pb.ControlMessage{Graft: []*pb.ControlGraft{{TopicID: sub.Topicid}}},
Subscriptions: []*pb.RPC_SubOpts{{Subscribe: sub.Subscribe, TopicRef: &pb.RPC_SubOpts_Topicid{Topicid: sub.GetTopicid()}}},
Control: &pb.ControlMessage{Graft: []*pb.ControlGraft{{TopicRef: &pb.ControlGraft_TopicID{TopicID: sub.GetTopicid()}}}},
})

sub := sub
Expand All @@ -209,7 +209,7 @@ func TestGossipsubAttackSpamIHAVE(t *testing.T) {
// Send a bunch of IHAVEs
for i := 0; i < 3*GossipSubMaxIHaveLength; i++ {
ihavelst := []string{"someid" + strconv.Itoa(i)}
ihave := []*pb.ControlIHave{{TopicID: sub.Topicid, MessageIDs: ihavelst}}
ihave := []*pb.ControlIHave{{TopicRef: &pb.ControlIHave_TopicID{TopicID: sub.GetTopicid()}, MessageIDs: ihavelst}}
orpc := rpcWithControl(nil, ihave, nil, nil, nil, nil)
writeMsg(&orpc.RPC)
}
Expand Down Expand Up @@ -239,7 +239,7 @@ func TestGossipsubAttackSpamIHAVE(t *testing.T) {
// Send a bunch of IHAVEs
for i := 0; i < 3*GossipSubMaxIHaveLength; i++ {
ihavelst := []string{"someid" + strconv.Itoa(i+100)}
ihave := []*pb.ControlIHave{{TopicID: sub.Topicid, MessageIDs: ihavelst}}
ihave := []*pb.ControlIHave{{TopicRef: &pb.ControlIHave_TopicID{TopicID: sub.GetTopicid()}, MessageIDs: ihavelst}}
orpc := rpcWithControl(nil, ihave, nil, nil, nil, nil)
writeMsg(&orpc.RPC)
}
Expand Down Expand Up @@ -329,14 +329,14 @@ func TestGossipsubAttackGRAFTNonExistentTopic(t *testing.T) {
if sub.GetSubscribe() {
// Reply by subcribing to the topic and grafting to the peer
writeMsg(&pb.RPC{
Subscriptions: []*pb.RPC_SubOpts{{Subscribe: sub.Subscribe, Topicid: sub.Topicid}},
Control: &pb.ControlMessage{Graft: []*pb.ControlGraft{{TopicID: sub.Topicid}}},
Subscriptions: []*pb.RPC_SubOpts{{Subscribe: sub.Subscribe, TopicRef: &pb.RPC_SubOpts_Topicid{Topicid: sub.GetTopicid()}}},
Control: &pb.ControlMessage{Graft: []*pb.ControlGraft{{TopicRef: &pb.ControlGraft_TopicID{TopicID: sub.GetTopicid()}}}},
})

// Graft to the peer on a non-existent topic
nonExistentTopic := "non-existent"
writeMsg(&pb.RPC{
Control: &pb.ControlMessage{Graft: []*pb.ControlGraft{{TopicID: &nonExistentTopic}}},
Control: &pb.ControlMessage{Graft: []*pb.ControlGraft{{TopicRef: &pb.ControlGraft_TopicID{TopicID: nonExistentTopic}}}},
})

go func() {
Expand Down Expand Up @@ -428,9 +428,9 @@ func TestGossipsubAttackGRAFTDuringBackoff(t *testing.T) {
for _, sub := range irpc.GetSubscriptions() {
if sub.GetSubscribe() {
// Reply by subcribing to the topic and grafting to the peer
graft := []*pb.ControlGraft{{TopicID: sub.Topicid}}
graft := []*pb.ControlGraft{{TopicRef: &pb.ControlGraft_TopicID{TopicID: sub.GetTopicid()}}}
writeMsg(&pb.RPC{
Subscriptions: []*pb.RPC_SubOpts{{Subscribe: sub.Subscribe, Topicid: sub.Topicid}},
Subscriptions: []*pb.RPC_SubOpts{{Subscribe: sub.Subscribe, TopicRef: &pb.RPC_SubOpts_Topicid{Topicid: sub.GetTopicid()}}},
Control: &pb.ControlMessage{Graft: graft},
})

Expand All @@ -452,7 +452,7 @@ func TestGossipsubAttackGRAFTDuringBackoff(t *testing.T) {
// Send a PRUNE to remove the attacker node from the legit
// host's mesh
var prune []*pb.ControlPrune
prune = append(prune, &pb.ControlPrune{TopicID: sub.Topicid})
prune = append(prune, &pb.ControlPrune{TopicRef: &pb.ControlPrune_TopicID{TopicID: sub.GetTopicid()}})
writeMsg(&pb.RPC{
Control: &pb.ControlMessage{Prune: prune},
})
Expand Down Expand Up @@ -703,8 +703,8 @@ func TestGossipsubAttackInvalidMessageSpam(t *testing.T) {
if sub.GetSubscribe() {
// Reply by subcribing to the topic and grafting to the peer
writeMsg(&pb.RPC{
Subscriptions: []*pb.RPC_SubOpts{{Subscribe: sub.Subscribe, Topicid: sub.Topicid}},
Control: &pb.ControlMessage{Graft: []*pb.ControlGraft{{TopicID: sub.Topicid}}},
Subscriptions: []*pb.RPC_SubOpts{{Subscribe: sub.Subscribe, TopicRef: &pb.RPC_SubOpts_Topicid{Topicid: sub.GetTopicid()}}},
Control: &pb.ControlMessage{Graft: []*pb.ControlGraft{{TopicRef: &pb.ControlGraft_TopicID{TopicID: sub.GetTopicid()}}}},
})

go func() {
Expand All @@ -720,10 +720,10 @@ func TestGossipsubAttackInvalidMessageSpam(t *testing.T) {
// fail validation and reduce the attacker's score)
for i := 0; i < 100; i++ {
msg := &pb.Message{
Data: []byte("some data" + strconv.Itoa(i)),
Topic: &mytopic,
From: []byte(attacker.ID()),
Seqno: []byte{byte(i + 1)},
Data: []byte("some data" + strconv.Itoa(i)),
TopicRef: &pb.Message_Topic{Topic: mytopic},
From: []byte(attacker.ID()),
Seqno: []byte{byte(i + 1)},
}
writeMsg(&pb.RPC{
Publish: []*pb.Message{msg},
Expand Down Expand Up @@ -836,8 +836,8 @@ func TestGossipsubAttackSpamIDONTWANT(t *testing.T) {
if sub.GetSubscribe() {
// Reply by subcribing to the topic and grafting to the middle peer
writeMsg(&pb.RPC{
Subscriptions: []*pb.RPC_SubOpts{{Subscribe: sub.Subscribe, Topicid: sub.Topicid}},
Control: &pb.ControlMessage{Graft: []*pb.ControlGraft{{TopicID: sub.Topicid}}},
Subscriptions: []*pb.RPC_SubOpts{{Subscribe: sub.Subscribe, TopicRef: &pb.RPC_SubOpts_Topicid{Topicid: sub.GetTopicid()}}},
Control: &pb.ControlMessage{Graft: []*pb.ControlGraft{{TopicRef: &pb.ControlGraft_TopicID{TopicID: sub.GetTopicid()}}}},
})

go func() {
Expand Down
Loading