From 4656c16921903d9513b55abf428ed534615dc227 Mon Sep 17 00:00:00 2001 From: Pop Chunhapanya Date: Sat, 21 Dec 2024 23:21:05 +0700 Subject: [PATCH] Gossipsub v2.0: Handle IANNOUNCE and send INEED --- acache.go | 217 +++++++++++++++++++++++ acache_test.go | 443 ++++++++++++++++++++++++++++++++++++++++++++++ gossipsub.go | 51 +++++- gossipsub_test.go | 233 ++++++++++++++++++++++++ 4 files changed, 943 insertions(+), 1 deletion(-) create mode 100644 acache.go create mode 100644 acache_test.go diff --git a/acache.go b/acache.go new file mode 100644 index 00000000..0ca66d64 --- /dev/null +++ b/acache.go @@ -0,0 +1,217 @@ +package pubsub + +import ( + "container/list" + "fmt" + "sync" + "time" + + "github.com/libp2p/go-libp2p/core/peer" +) + +type IneedMeta struct { + pid peer.ID + mid string +} + +type sendList struct { + // Timeout + t time.Duration + // List of message ids + l *list.List + // Elements in l indexed by message ids. + es map[string]*list.Element +} + +type sendListEntry struct { + meta *IneedMeta + // Send time + sendTime time.Time + // Timeout time + expiryTime time.Time +} + +func newSendList(timeout time.Duration) *sendList { + return &sendList{ + t: timeout, + l: list.New(), + es: make(map[string]*list.Element), + } +} + +// Front returns the first message id in the list. +func (sl *sendList) Front() *sendListEntry { + e := sl.l.Front() + if e != nil { + entry := e.Value.(sendListEntry) + return &entry + } else { + return nil + } +} + +// Push pushes the message id and the peer to the list with send time set to now and expiry time set to now plus timeout. +func (sl *sendList) Push(meta *IneedMeta) { + // there shouldn't already be a message id in the list + if _, ok := sl.es[meta.mid]; ok { + panic(fmt.Errorf("there is already a message id in the list: %s", meta.mid)) + } + // push to the back and remember the element + sl.es[meta.mid] = sl.l.PushBack(sendListEntry{ + meta: meta, + sendTime: time.Now(), + expiryTime: time.Now().Add(sl.t), + }) +} + +// Remove removes the message id from the list. +func (sl *sendList) Remove(mid string) { + // there shouldn already be a message id in the list + if _, ok := sl.es[mid]; !ok { + panic(fmt.Errorf("there is no message id in the list to remove: %s", mid)) + } + // remove it from both the list and the indexing map + sl.l.Remove(sl.es[mid]) + delete(sl.es, mid) +} + +// Has checks if the message id is in the list. +func (sl *sendList) Has(mid string) bool { + _, ok := sl.es[mid] + return ok +} + +type AnnounceCache struct { + lk sync.RWMutex + // Queues indexed by messages ids containing the peers from whom we already receive IANNOUNCE, but not yet send INEED. + m map[string][]peer.ID + // List of pairs of peers and message ids that we already send INEED, but the timeout hasn't occured and the message is not received yet. + // There is supposed to be at most one element per message id in the list at any time. + sl *sendList + // Channel to wake up the background routine and try to send INEED. + c chan<- *IneedMeta + // Channel used to notify a request to send INEED from the cache. + R <-chan *IneedMeta + // Channel used to notify a timeout of INEED from the cache. + T <-chan *IneedMeta + // Used to indicate that the cache is stopped + stopped chan struct{} +} + +func NewAnnounceCache(timeout time.Duration) *AnnounceCache { + c := make(chan *IneedMeta) + R := make(chan *IneedMeta) + T := make(chan *IneedMeta) + ac := &AnnounceCache{ + c: c, + R: R, + T: T, + m: make(map[string][]peer.ID), + sl: newSendList(timeout), + + stopped: make(chan struct{}), + } + go ac.background(c, R, T) + + return ac +} + +func (ac *AnnounceCache) background(c <-chan *IneedMeta, R chan<- *IneedMeta, T chan<- *IneedMeta) { + timer := time.NewTimer(0) + for { + select { + case <-ac.stopped: + return + case meta := <-c: + ac.lk.Lock() + if !ac.sl.Has(meta.mid) { + // If there is no INEED on flight, just send INEED right away by putting it in the list + ac.sl.Push(meta) + // Send the meta data to the cache user, so they can send INEED using that + select { + case R <- meta: + case <-ac.stopped: + ac.lk.Unlock() + return + } + } else { + ac.m[meta.mid] = append(ac.m[meta.mid], meta.pid) + } + case <-timer.C: + ac.lk.Lock() + } + entry := ac.sl.Front() + for entry != nil && entry.expiryTime.Before(time.Now()) { + // If the ongoing INEED times out + mid := entry.meta.mid + + // Remove it from the list + ac.sl.Remove(mid) + + // Notify the cache user that the ongoing INEED times out + select { + case T <- entry.meta: + case <-ac.stopped: + ac.lk.Unlock() + return + } + + // If there is another peer waiting for INEED + if len(ac.m[mid]) > 0 { + meta := &IneedMeta{ + pid: ac.m[mid][0], + mid: mid, + } + ac.m[mid] = ac.m[mid][1:] + ac.sl.Push(meta) + + // Send the meta data to the cache user, so they can send INEED using that + select { + case R <- meta: + case <-ac.stopped: + ac.lk.Unlock() + return + } + } else { + delete(ac.m, mid) + } + + // Look at the next entry + entry = ac.sl.Front() + } + timer.Stop() + if entry = ac.sl.Front(); entry != nil { + // If there still the next entry, wake this background routine correspondingly + timer.Reset(entry.expiryTime.Sub(time.Now())) + } + ac.lk.Unlock() + } +} + +func (ac *AnnounceCache) Add(mid string, pid peer.ID) { + meta := &IneedMeta{ + mid: mid, + pid: pid, + } + select { + case ac.c <- meta: + case <-ac.stopped: + } +} + +// Clear clears all the pending IANNOUNCE and remove the ongoing INEED from the list so the the timeout +// will not be triggered +func (ac *AnnounceCache) Clear(mid string) { + ac.lk.Lock() + defer ac.lk.Unlock() + + // Clear the cache for the given message id + ac.m[mid] = []peer.ID{} + if ac.sl.Has(mid) { + ac.sl.Remove(mid) + } +} + +func (ac *AnnounceCache) Stop() { + close(ac.stopped) +} diff --git a/acache_test.go b/acache_test.go new file mode 100644 index 00000000..84d9892b --- /dev/null +++ b/acache_test.go @@ -0,0 +1,443 @@ +package pubsub + +import ( + "github.com/libp2p/go-libp2p/core/peer" + + "testing" + "time" +) + +type Event struct { + // True if it's a send event, false if it's a timeout event + send bool + meta *IneedMeta + time time.Time +} + +func closeTimes(a time.Time, b time.Time) bool { + return a.Sub(b) < 2*time.Millisecond && b.Sub(a) < 2*time.Millisecond +} + +func TestAnnounceCache(t *testing.T) { + var events []Event + timeout := 50 * time.Millisecond + ac := NewAnnounceCache(timeout) + pidA := peer.ID("A") + pidB := peer.ID("B") + pidC := peer.ID("C") + pidD := peer.ID("D") + + done := make(chan struct{}) + go func() { + timer := time.After(200 * time.Millisecond) + for { + select { + case meta := <-ac.T: + events = append(events, Event{ + send: false, + meta: meta, + time: time.Now(), + }) + case meta := <-ac.R: + events = append(events, Event{ + send: true, + meta: meta, + time: time.Now(), + }) + case <-timer: + done <- struct{}{} + return + } + } + }() + + start := time.Now() + ac.Add("mid1", pidA) + ac.Add("mid1", pidB) + ac.Add("mid2", pidC) + ac.Add("mid2", pidD) + + <-done + + // Check the number of events + if len(events) != 8 { + t.Fatal("incorrect number of events") + } + msgList := map[string][]peer.ID{ + "mid1": []peer.ID{pidA, pidB}, + "mid2": []peer.ID{pidC, pidD}, + } + sentTime := make(map[string]time.Time) + expiryTime := make(map[string]time.Time) + for _, event := range events { + if event.send { + if _, ok := sentTime[event.meta.mid]; ok { + t.Fatal("there shouldn't be an ongoing INEED when send event is received") + } + if msgList[event.meta.mid][0] != event.meta.pid { + t.Fatal("wrong peer id in the send event") + } + expiryT, ok := expiryTime[event.meta.mid] + var expectedSentTime time.Time + if ok { + expectedSentTime = expiryT + } else { + expectedSentTime = start + } + if !closeTimes(expectedSentTime, event.time) { + t.Fatal("send event is not sent timely") + } + sentTime[event.meta.mid] = event.time + } else { + sentT, ok := sentTime[event.meta.mid] + if !ok { + t.Fatal("there shouldn be an ongoing INEED when timeout event is received") + } + if msgList[event.meta.mid][0] != event.meta.pid { + t.Fatal("wrong peer id in the send event") + } + if !closeTimes(sentT.Add(timeout), event.time) { + t.Fatal("timeout event is not sent timely") + } + delete(sentTime, event.meta.mid) + expiryTime[event.meta.mid] = event.time + msgList[event.meta.mid] = msgList[event.meta.mid][1:] + } + } + // Make sure that all send events have corresponding timeout events + if len(sentTime) != 0 { + t.Fatal("there is some send event that doesn't have corresponding timeout event") + } + // Make sure that all peer ids are consumed + for mid, list := range msgList { + if len(list) != 0 { + t.Fatalf("%s has some peer id that doesn't have events", mid) + } + } +} + +func TestAnnounceCacheClear(t *testing.T) { + var events []Event + timeout := 50 * time.Millisecond + ac := NewAnnounceCache(timeout) + pidA := peer.ID("A") + pidB := peer.ID("B") + pidC := peer.ID("C") + pidD := peer.ID("D") + + done := make(chan struct{}) + go func() { + timer := time.After(200 * time.Millisecond) + for { + select { + case meta := <-ac.T: + events = append(events, Event{ + send: false, + meta: meta, + time: time.Now(), + }) + case meta := <-ac.R: + events = append(events, Event{ + send: true, + meta: meta, + time: time.Now(), + }) + case <-timer: + done <- struct{}{} + return + } + } + }() + + go func() { + time.Sleep(80 * time.Millisecond) + ac.Clear("mid1") + }() + + start := time.Now() + ac.Add("mid1", pidA) + ac.Add("mid1", pidB) + ac.Add("mid1", pidC) + ac.Add("mid2", pidD) + + <-done + + // Check the number of events + if len(events) != 5 { + t.Fatal("incorrect number of events") + } + msgList := map[string][]peer.ID{ + "mid1": []peer.ID{pidA, pidB}, + "mid2": []peer.ID{pidD}, + } + sentTime := make(map[string]time.Time) + expiryTime := make(map[string]time.Time) + for _, event := range events { + if event.send { + if _, ok := sentTime[event.meta.mid]; ok { + t.Fatal("there shouldn't be an ongoing INEED when send event is received") + } + if msgList[event.meta.mid][0] != event.meta.pid { + t.Fatal("wrong peer id in the send event") + } + expiryT, ok := expiryTime[event.meta.mid] + var expectedSentTime time.Time + if ok { + expectedSentTime = expiryT + } else { + expectedSentTime = start + } + if !closeTimes(expectedSentTime, event.time) { + t.Fatal("send event is not sent timely") + } + sentTime[event.meta.mid] = event.time + } else { + sentT, ok := sentTime[event.meta.mid] + if !ok { + t.Fatal("there shouldn be an ongoing INEED when timeout event is received") + } + if msgList[event.meta.mid][0] != event.meta.pid { + t.Fatal("wrong peer id in the send event") + } + if !closeTimes(sentT.Add(timeout), event.time) { + t.Fatal("timeout event is not sent timely") + } + delete(sentTime, event.meta.mid) + expiryTime[event.meta.mid] = event.time + msgList[event.meta.mid] = msgList[event.meta.mid][1:] + } + } + // Make sure that there is only one send event that doesn't have corresponding timeout event + if len(sentTime) != 1 { + t.Fatal("there should only be one send event that doesn't have corresponding timeout event") + } + if _, ok := sentTime["mid1"]; !ok { + t.Fatal("One send event that doesn't have corresponding timeout event should be mid1") + } + // Make sure that all peer ids are consumed + for mid, list := range msgList { + if mid == "mid1" { + if len(list) != 1 || list[0] != pidB { + t.Fatal("there should be only pidB that isn't consumed and has no timeout event") + } + } else { + if len(list) != 0 { + t.Fatalf("%s has some peer id that doesn't have events", mid) + } + } + } +} + +func TestAnnounceCacheReAdd(t *testing.T) { + var events []Event + timeout := 50 * time.Millisecond + ac := NewAnnounceCache(timeout) + pidA := peer.ID("A") + pidB := peer.ID("B") + pidC := peer.ID("C") + pidD := peer.ID("D") + + done := make(chan struct{}) + go func() { + timer := time.After(300 * time.Millisecond) + for { + select { + case meta := <-ac.T: + events = append(events, Event{ + send: false, + meta: meta, + time: time.Now(), + }) + case meta := <-ac.R: + events = append(events, Event{ + send: true, + meta: meta, + time: time.Now(), + }) + case <-timer: + done <- struct{}{} + return + } + } + }() + + start := time.Now() + ac.Add("mid1", pidA) + ac.Add("mid1", pidB) + ac.Add("mid2", pidC) + time.Sleep(220 * time.Millisecond) + start2 := time.Now() + ac.Add("mid1", pidD) + + <-done + + // Check the number of events + if len(events) != 8 { + t.Fatal("incorrect number of events") + } + msgList := map[string][]peer.ID{ + "mid1": []peer.ID{pidA, pidB, pidD}, + "mid2": []peer.ID{pidC}, + } + sentTime := make(map[string]time.Time) + expiryTime := make(map[string]time.Time) + for _, event := range events { + if event.send { + if _, ok := sentTime[event.meta.mid]; ok { + t.Fatal("there shouldn't be an ongoing INEED when send event is received") + } + if msgList[event.meta.mid][0] != event.meta.pid { + t.Fatal("wrong peer id in the send event") + } + expiryT, ok := expiryTime[event.meta.mid] + var expectedSentTime time.Time + if event.meta.pid == pidD { + expectedSentTime = start2 + } else if ok { + expectedSentTime = expiryT + } else { + expectedSentTime = start + } + if !closeTimes(expectedSentTime, event.time) { + t.Fatal("send event is not sent timely") + } + sentTime[event.meta.mid] = event.time + } else { + sentT, ok := sentTime[event.meta.mid] + if !ok { + t.Fatal("there shouldn be an ongoing INEED when timeout event is received") + } + if msgList[event.meta.mid][0] != event.meta.pid { + t.Fatal("wrong peer id in the send event") + } + if !closeTimes(sentT.Add(timeout), event.time) { + t.Fatal("timeout event is not sent timely") + } + delete(sentTime, event.meta.mid) + expiryTime[event.meta.mid] = event.time + msgList[event.meta.mid] = msgList[event.meta.mid][1:] + } + } + // Make sure that all send events have corresponding timeout events + if len(sentTime) != 0 { + t.Fatal("there is some send event that doesn't have corresponding timeout event") + } + // Make sure that all peer ids are consumed + for mid, list := range msgList { + if len(list) != 0 { + t.Fatalf("%s has some peer id that doesn't have events", mid) + } + } +} + +func TestAnnounceCacheStop(t *testing.T) { + var events []Event + timeout := 50 * time.Millisecond + ac := NewAnnounceCache(timeout) + pidA := peer.ID("A") + pidB := peer.ID("B") + pidC := peer.ID("C") + pidD := peer.ID("D") + + done := make(chan struct{}) + go func() { + timer := time.After(200 * time.Millisecond) + for { + select { + case meta := <-ac.T: + events = append(events, Event{ + send: false, + meta: meta, + time: time.Now(), + }) + case meta := <-ac.R: + events = append(events, Event{ + send: true, + meta: meta, + time: time.Now(), + }) + case <-timer: + done <- struct{}{} + return + } + } + }() + + go func() { + time.Sleep(80 * time.Millisecond) + ac.Stop() + }() + + start := time.Now() + ac.Add("mid1", pidA) + ac.Add("mid1", pidB) + ac.Add("mid1", pidC) + ac.Add("mid2", pidD) + + <-done + + // Check the number of events + if len(events) != 5 { + t.Fatal("incorrect number of events") + } + msgList := map[string][]peer.ID{ + "mid1": []peer.ID{pidA, pidB}, + "mid2": []peer.ID{pidD}, + } + sentTime := make(map[string]time.Time) + expiryTime := make(map[string]time.Time) + for _, event := range events { + if event.send { + if _, ok := sentTime[event.meta.mid]; ok { + t.Fatal("there shouldn't be an ongoing INEED when send event is received") + } + if msgList[event.meta.mid][0] != event.meta.pid { + t.Fatal("wrong peer id in the send event") + } + expiryT, ok := expiryTime[event.meta.mid] + var expectedSentTime time.Time + if ok { + expectedSentTime = expiryT + } else { + expectedSentTime = start + } + if !closeTimes(expectedSentTime, event.time) { + t.Fatal("send event is not sent timely") + } + sentTime[event.meta.mid] = event.time + } else { + sentT, ok := sentTime[event.meta.mid] + if !ok { + t.Fatal("there shouldn be an ongoing INEED when timeout event is received") + } + if msgList[event.meta.mid][0] != event.meta.pid { + t.Fatal("wrong peer id in the send event") + } + if !closeTimes(sentT.Add(timeout), event.time) { + t.Fatal("timeout event is not sent timely") + } + delete(sentTime, event.meta.mid) + expiryTime[event.meta.mid] = event.time + msgList[event.meta.mid] = msgList[event.meta.mid][1:] + } + } + // Make sure that there is only one send event that doesn't have corresponding timeout event + if len(sentTime) != 1 { + t.Fatal("there should only be one send event that doesn't have corresponding timeout event") + } + if _, ok := sentTime["mid1"]; !ok { + t.Fatal("One send event that doesn't have corresponding timeout event should be mid1") + } + // Make sure that all peer ids are consumed + for mid, list := range msgList { + if mid == "mid1" { + if len(list) != 1 || list[0] != pidB { + t.Fatal("there should be only pidB that isn't consumed and has no timeout event") + } + } else { + if len(list) != 0 { + t.Fatalf("%s has some peer id that doesn't have events", mid) + } + } + } +} diff --git a/gossipsub.go b/gossipsub.go index f9957655..4de9e47d 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -282,6 +282,7 @@ func DefaultGossipSubRouter(h host.Host) *GossipSubRouter { outbound: make(map[peer.ID]bool), connect: make(chan connectInfo, params.MaxPendingConnections), cab: pstoremem.NewAddrBook(), + acache: NewAnnounceCache(params.Timeout), mcache: NewMessageCache(params.HistoryGossip, params.HistoryLength), protos: GossipSubDefaultProtocols, feature: GossipSubDefaultFeatures, @@ -494,6 +495,7 @@ type GossipSubRouter struct { protos []protocol.ID feature GossipSubFeatureTest + acache *AnnounceCache mcache *MessageCache tracer *pubsubTracer score *peerScore @@ -571,6 +573,9 @@ func (gs *GossipSubRouter) Attach(p *PubSub) { // Manage our address book from events emitted by libp2p go gs.manageAddrBook() + // Manage events from the announce cache + go gs.manageAnnounceCache() + // connect to direct peers if len(gs.direct) > 0 { go func() { @@ -631,6 +636,22 @@ func (gs *GossipSubRouter) manageAddrBook() { } } +func (gs *GossipSubRouter) manageAnnounceCache() { + for { + select { + case <-gs.p.ctx.Done(): + gs.acache.Stop() + return + case r := <-gs.acache.R: + ineed := []*pb.ControlINeed{{MessageID: &r.mid}} + out := rpcWithControl(nil, nil, nil, nil, nil, nil, nil, ineed) + gs.sendRPC(r.pid, out, false) + case _ = <-gs.acache.T: + // TODO: penalize peers when timeout + } + } +} + func (gs *GossipSubRouter) AddPeer(p peer.ID, proto protocol.ID) { log.Debugf("PEERUP: Add new peer %s using %s", p, proto) gs.tracer.AddPeer(p, proto) @@ -720,6 +741,8 @@ func (gs *GossipSubRouter) AcceptFrom(p peer.ID) AcceptStatus { // PreValidation sends the IDONTWANT control messages to all the mesh // peers. They need to be sent right before the validation because they // should be seen by the peers as soon as possible. +// +// It also clear the announce cache to prevent sending further INEEDs. func (gs *GossipSubRouter) PreValidation(msgs []*Message) { tmids := make(map[string][]string) for _, msg := range msgs { @@ -727,7 +750,10 @@ func (gs *GossipSubRouter) PreValidation(msgs []*Message) { continue } topic := msg.GetTopic() - tmids[topic] = append(tmids[topic], gs.p.idGen.ID(msg)) + mid := gs.p.idGen.ID(msg) + tmids[topic] = append(tmids[topic], mid) + // Clear the announce cache + gs.acache.Clear(mid) } for topic, mids := range tmids { if len(mids) == 0 { @@ -756,6 +782,7 @@ func (gs *GossipSubRouter) HandleRPC(rpc *RPC) { iwant := gs.handleIHave(rpc.from, ctl) ihave := gs.handleIWant(rpc.from, ctl) prune := gs.handleGraft(rpc.from, ctl) + gs.handleIAnnounce(rpc.from, ctl) gs.handlePrune(rpc.from, ctl) gs.handleIDontWant(rpc.from, ctl) @@ -1036,6 +1063,28 @@ func (gs *GossipSubRouter) handleIDontWant(p peer.ID, ctl *pb.ControlMessage) { } } +func (gs *GossipSubRouter) handleIAnnounce(p peer.ID, ctl *pb.ControlMessage) { + for _, iannounce := range ctl.GetIannounce() { + gmap, ok := gs.mesh[iannounce.GetTopicID()] + if !ok { + // Not in mesh, no need ot handle IANNOUNCE + continue + } + _, isMesh := gmap[p] + _, isDirect := gs.direct[p] + // We handle IANNOUNCE only from mesh peers or direct peers + if !isMesh && !isDirect { + continue + } + + mid := iannounce.GetMessageID() + if !gs.p.seenMessage(mid) { + // If the message has not been seen, add it to the announce cache + gs.acache.Add(mid, p) + } + } +} + func (gs *GossipSubRouter) addBackoff(p peer.ID, topic string, isUnsubscribe bool) { backoff := gs.params.PruneBackoff if isUnsubscribe { diff --git a/gossipsub_test.go b/gossipsub_test.go index 3396816b..9907f3ee 100644 --- a/gossipsub_test.go +++ b/gossipsub_test.go @@ -3634,6 +3634,239 @@ func TestGossipsubIannounceFanout(t *testing.T) { <-ctx.Done() } +// Test that INEED is sent to mesh peers +func TestGossipsubIneedMeshPeers(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + hosts := getDefaultHosts(t, 2) + + msgID := func(pmsg *pb.Message) string { + // silly content-based test message-ID: just use the data as whole + return base64.URLEncoding.EncodeToString(pmsg.Data) + } + + params := DefaultGossipSubParams() + params.Dannounce = params.D + psub := getGossipsub(ctx, hosts[0], WithGossipSubParams(params), WithMessageIdFn(msgID)) + _, err := psub.Subscribe("foobar") + if err != nil { + t.Fatal(err) + } + + // Wait a bit after the last message before checking we got the right messages + msgTimer := time.NewTimer(1 * time.Second) + + // Checks we received the right messages + ineedCount := 0 + checkMsgs := func() { + if ineedCount != 1 { + t.Fatalf("Expected 1 INEED received, got %d", ineedCount) + } + } + + // Wait for the timer to expire + go func() { + select { + case <-msgTimer.C: + checkMsgs() + cancel() + return + case <-ctx.Done(): + checkMsgs() + } + }() + + var mid string + newMockGS(ctx, t, hosts[1], func(writeMsg func(*pb.RPC), irpc *pb.RPC) { + // When the first peer connects it will send us its subscriptions + for _, sub := range irpc.GetSubscriptions() { + if sub.GetSubscribe() { + // Reply by subcribing to the topic and grafting to the first peer + writeMsg(&pb.RPC{ + Subscriptions: []*pb.RPC_SubOpts{{Subscribe: sub.Subscribe, Topicid: sub.Topicid}}, + Control: &pb.ControlMessage{Graft: []*pb.ControlGraft{{TopicID: sub.Topicid}}}, + }) + + go func() { + // Wait for a short interval to make sure the first peer + // received and processed the subscribe + graft + time.Sleep(100 * time.Millisecond) + // Send IANNOUNCE + mid = msgID(&pb.Message{Data: []byte("mymessage")}) + writeMsg(&pb.RPC{ + Control: &pb.ControlMessage{Iannounce: []*pb.ControlIAnnounce{{TopicID: sub.Topicid, MessageID: &mid}}}, + }) + }() + } + } + ineedCount += len(irpc.GetControl().GetIneed()) + if len(irpc.GetControl().GetIneed()) > 0 { + ineed_mid := irpc.GetControl().GetIneed()[0].GetMessageID() + if mid != ineed_mid { + t.Fatalf("Wrong message id in INEED expected %s got %s", mid, ineed_mid) + } + } + }) + + connect(t, hosts[0], hosts[1]) + + <-ctx.Done() +} + +// Test that INEED is sent to direct peers +func TestGossipsubIneedDirectPeers(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + hosts := getDefaultHosts(t, 2) + + msgID := func(pmsg *pb.Message) string { + // silly content-based test message-ID: just use the data as whole + return base64.URLEncoding.EncodeToString(pmsg.Data) + } + + params := DefaultGossipSubParams() + params.Dannounce = params.D + psub := getGossipsub(ctx, hosts[0], + WithGossipSubParams(params), + WithMessageIdFn(msgID), + WithDirectPeers([]peer.AddrInfo{{ID: hosts[1].ID(), Addrs: hosts[1].Addrs()}})) + _, err := psub.Subscribe("foobar") + if err != nil { + t.Fatal(err) + } + + // Wait a bit after the last message before checking we got the right messages + msgTimer := time.NewTimer(1 * time.Second) + + // Checks we received the right messages + ineedCount := 0 + checkMsgs := func() { + if ineedCount != 1 { + t.Fatalf("Expected 1 INEED received, got %d", ineedCount) + } + } + + // Wait for the timer to expire + go func() { + select { + case <-msgTimer.C: + checkMsgs() + cancel() + return + case <-ctx.Done(): + checkMsgs() + } + }() + + var mid string + newMockGS(ctx, t, hosts[1], func(writeMsg func(*pb.RPC), irpc *pb.RPC) { + // When the first peer connects it will send us its subscriptions + for _, sub := range irpc.GetSubscriptions() { + if sub.GetSubscribe() { + // Reply by subcribing to the topic and grafting to the first peer + writeMsg(&pb.RPC{ + Subscriptions: []*pb.RPC_SubOpts{{Subscribe: sub.Subscribe, Topicid: sub.Topicid}}, + }) + + go func() { + // Wait for a short interval to make sure the first peer + // received and processed the subscribe + graft + time.Sleep(100 * time.Millisecond) + // Send IANNOUNCE + mid = msgID(&pb.Message{Data: []byte("mymessage")}) + writeMsg(&pb.RPC{ + Control: &pb.ControlMessage{Iannounce: []*pb.ControlIAnnounce{{TopicID: sub.Topicid, MessageID: &mid}}}, + }) + }() + } + } + ineedCount += len(irpc.GetControl().GetIneed()) + if len(irpc.GetControl().GetIneed()) > 0 { + ineed_mid := irpc.GetControl().GetIneed()[0].GetMessageID() + if mid != ineed_mid { + t.Fatalf("Wrong message id in INEED expected %s got %s", mid, ineed_mid) + } + } + }) + + connect(t, hosts[0], hosts[1]) + + <-ctx.Done() +} + +// Test that INEED is not sent to indirect non-mesh peers +func TestGossipsubIneedIndirectNonmeshPeers(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + hosts := getDefaultHosts(t, 2) + + msgID := func(pmsg *pb.Message) string { + // silly content-based test message-ID: just use the data as whole + return base64.URLEncoding.EncodeToString(pmsg.Data) + } + + params := DefaultGossipSubParams() + params.Dannounce = params.D + psub := getGossipsub(ctx, hosts[0], WithGossipSubParams(params), WithMessageIdFn(msgID)) + _, err := psub.Subscribe("foobar") + if err != nil { + t.Fatal(err) + } + + // Wait a bit after the last message before checking we got the right messages + msgTimer := time.NewTimer(1 * time.Second) + + // Checks we received the right messages + ineedCount := 0 + checkMsgs := func() { + if ineedCount != 0 { + t.Fatalf("Expected no INEED received, got %d", ineedCount) + } + } + + // Wait for the timer to expire + go func() { + select { + case <-msgTimer.C: + checkMsgs() + cancel() + return + case <-ctx.Done(): + checkMsgs() + } + }() + + newMockGS(ctx, t, hosts[1], func(writeMsg func(*pb.RPC), irpc *pb.RPC) { + // When the first peer connects it will send us its subscriptions + for _, sub := range irpc.GetSubscriptions() { + if sub.GetSubscribe() { + // Reply by subcribing to the topic and pruning to the first peer to make sure + // that it's not in the mesh + writeMsg(&pb.RPC{ + Subscriptions: []*pb.RPC_SubOpts{{Subscribe: sub.Subscribe, Topicid: sub.Topicid}}, + Control: &pb.ControlMessage{Prune: []*pb.ControlPrune{{TopicID: sub.Topicid}}}, + }) + + go func() { + // Wait for a short interval to make sure the first peer + // received and processed the subscribe + graft + time.Sleep(100 * time.Millisecond) + // Send IANNOUNCE + mid := msgID(&pb.Message{Data: []byte("mymessage")}) + writeMsg(&pb.RPC{ + Control: &pb.ControlMessage{Iannounce: []*pb.ControlIAnnounce{{TopicID: sub.Topicid, MessageID: &mid}}}, + }) + }() + } + } + ineedCount += len(irpc.GetControl().GetIneed()) + }) + + connect(t, hosts[0], hosts[1]) + + <-ctx.Done() +} + func BenchmarkAllocDoDropRPC(b *testing.B) { gs := GossipSubRouter{tracer: &pubsubTracer{}}