Skip to content

Commit

Permalink
remove multi-topic message support
Browse files Browse the repository at this point in the history
  • Loading branch information
vyzo committed Oct 8, 2020
1 parent f7f33e1 commit d6c20b5
Show file tree
Hide file tree
Showing 24 changed files with 1,761 additions and 1,436 deletions.
598 changes: 598 additions & 0 deletions compat/compat.pb.go

Large diffs are not rendered by default.

12 changes: 12 additions & 0 deletions compat/compat.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
syntax = "proto2";

package compat.pb;

message Message {
optional bytes from = 1;
optional bytes data = 2;
optional bytes seqno = 3;
repeated string topicIDs = 4;
optional bytes signature = 5;
optional bytes key = 6;
}
83 changes: 83 additions & 0 deletions compat_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package pubsub

import (
"testing"

compat_pb "github.com/libp2p/go-libp2p-pubsub/compat"
pb "github.com/libp2p/go-libp2p-pubsub/pb"
)

func TestMultitopicMessageCompatibility(t *testing.T) {
topic1 := "topic1"
topic2 := "topic2"

newMessage1 := &pb.Message{
From: []byte("A"),
Data: []byte("blah"),
Seqno: []byte("123"),
Topic: &topic1,
Signature: []byte("a-signature"),
Key: []byte("a-key"),
}
oldMessage1 := &compat_pb.Message{
From: []byte("A"),
Data: []byte("blah"),
Seqno: []byte("123"),
TopicIDs: []string{topic1},
Signature: []byte("a-signature"),
Key: []byte("a-key"),
}
oldMessage2 := &compat_pb.Message{
From: []byte("A"),
Data: []byte("blah"),
Seqno: []byte("123"),
TopicIDs: []string{topic1, topic2},
Signature: []byte("a-signature"),
Key: []byte("a-key"),
}

newMessage1b, err := newMessage1.Marshal()
if err != nil {
t.Fatal(err)
}
oldMessage1b, err := oldMessage1.Marshal()
if err != nil {
t.Fatal(err)
}
oldMessage2b, err := oldMessage2.Marshal()
if err != nil {
t.Fatal(err)
}

newMessage := new(pb.Message)
oldMessage := new(compat_pb.Message)

err = newMessage.Unmarshal(oldMessage1b)
if err != nil {
t.Fatal(err)
}
if newMessage.GetTopic() != topic1 {
t.Fatalf("bad topic: expected %s, got %s", topic1, newMessage.GetTopic())
}

newMessage.Reset()
err = newMessage.Unmarshal(oldMessage2b)
if err != nil {
t.Fatal(err)
}
if newMessage.GetTopic() != topic2 {
t.Fatalf("bad topic: expected %s, got %s", topic2, newMessage.GetTopic())
}

err = oldMessage.Unmarshal(newMessage1b)
if err != nil {
t.Fatal(err)
}
topics := oldMessage.GetTopicIDs()
if len(topics) != 1 {
t.Fatalf("expected 1 topic, got %d", len(topics))
}
if topics[0] != topic1 {
t.Fatalf("bad topic: expected %s, got %s", topic1, topics[0])
}
}
15 changes: 2 additions & 13 deletions floodsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,21 +75,10 @@ func (fs *FloodSubRouter) HandleRPC(rpc *RPC) {}

func (fs *FloodSubRouter) Publish(msg *Message) {
from := msg.ReceivedFrom

tosend := make(map[peer.ID]struct{})
for _, topic := range msg.GetTopicIDs() {
tmap, ok := fs.p.topics[topic]
if !ok {
continue
}

for p := range tmap {
tosend[p] = struct{}{}
}
}
topic := msg.GetTopic()

out := rpcWithMessages(msg.Message)
for pid := range tosend {
for pid := range fs.p.topics[topic] {
if pid == from || pid == peer.ID(msg.GetFrom()) {
continue
}
Expand Down
28 changes: 14 additions & 14 deletions gossipsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -869,26 +869,26 @@ func (gs *GossipSubRouter) connector() {

func (gs *GossipSubRouter) Publish(msg *Message) {
gs.mcache.Put(msg.Message)

from := msg.ReceivedFrom
topic := msg.GetTopic()

tosend := make(map[peer.ID]struct{})
for _, topic := range msg.GetTopicIDs() {
// any peers in the topic?
tmap, ok := gs.p.topics[topic]
if !ok {
continue
}

if gs.floodPublish && from == gs.p.host.ID() {
for p := range tmap {
_, direct := gs.direct[p]
if direct || gs.score.Score(p) >= gs.publishThreshold {
tosend[p] = struct{}{}
}
// any peers in the topic?
tmap, ok := gs.p.topics[topic]
if !ok {
return
}

if gs.floodPublish && from == gs.p.host.ID() {
for p := range tmap {
_, direct := gs.direct[p]
if direct || gs.score.Score(p) >= gs.publishThreshold {
tosend[p] = struct{}{}
}
continue
}

} else {
// direct peers
for p := range gs.direct {
_, inTopic := tmap[p]
Expand Down
8 changes: 4 additions & 4 deletions gossipsub_spam_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -662,10 +662,10 @@ func TestGossipsubAttackInvalidMessageSpam(t *testing.T) {
// fail validation and reduce the attacker's score)
for i := 0; i < 100; i++ {
msg := &pb.Message{
Data: []byte("some data" + strconv.Itoa(i)),
TopicIDs: []string{mytopic},
From: []byte(attacker.ID()),
Seqno: []byte{byte(i + 1)},
Data: []byte("some data" + strconv.Itoa(i)),
Topic: &mytopic,
From: []byte(attacker.ID()),
Seqno: []byte{byte(i + 1)},
}
writeMsg(&pb.RPC{
Publish: []*pb.Message{msg},
Expand Down
13 changes: 5 additions & 8 deletions mcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,14 @@ func (mc *MessageCache) SetMsgIdFn(msgID MsgIdFunction) {
}

type CacheEntry struct {
mid string
topics []string
mid string
topic string
}

func (mc *MessageCache) Put(msg *pb.Message) {
mid := mc.msgID(msg)
mc.msgs[mid] = msg
mc.history[0] = append(mc.history[0], CacheEntry{mid: mid, topics: msg.GetTopicIDs()})
mc.history[0] = append(mc.history[0], CacheEntry{mid: mid, topic: msg.GetTopic()})
}

func (mc *MessageCache) Get(mid string) (*pb.Message, bool) {
Expand Down Expand Up @@ -83,11 +83,8 @@ func (mc *MessageCache) GetGossipIDs(topic string) []string {
var mids []string
for _, entries := range mc.history[:mc.gossip] {
for _, entry := range entries {
for _, t := range entry.topics {
if t == topic {
mids = append(mids, entry.mid)
break
}
if entry.topic == topic {
mids = append(mids, entry.mid)
}
}
}
Expand Down
9 changes: 5 additions & 4 deletions mcache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,11 @@ func makeTestMessage(n int) *pb.Message {
seqno := make([]byte, 8)
binary.BigEndian.PutUint64(seqno, uint64(n))
data := []byte(fmt.Sprintf("%d", n))
topic := "test"
return &pb.Message{
Data: data,
TopicIDs: []string{"test"},
From: []byte("test"),
Seqno: seqno,
Data: data,
Topic: &topic,
From: []byte("test"),
Seqno: seqno,
}
}
123 changes: 60 additions & 63 deletions pb/rpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit d6c20b5

Please sign in to comment.