diff --git a/blacklist_test.go b/blacklist_test.go index 5b0bcc9f..9710e9c8 100644 --- a/blacklist_test.go +++ b/blacklist_test.go @@ -17,7 +17,6 @@ func TestMapBlacklist(t *testing.T) { if !b.Contains(p) { t.Fatal("peer not in the blacklist") } - } func TestTimeCachedBlacklist(t *testing.T) { diff --git a/pubsub.go b/pubsub.go index 2853cb98..bd7da1c7 100644 --- a/pubsub.go +++ b/pubsub.go @@ -493,8 +493,8 @@ func (p *PubSub) processLoop(ctx context.Context) { case s := <-p.newPeerStream: pid := s.Conn().RemotePeer() - ch, ok := p.peers[pid] + if !ok { log.Warn("new stream for unknown peer: ", pid) s.Reset() @@ -504,6 +504,7 @@ func (p *PubSub) processLoop(ctx context.Context) { if p.blacklist.Contains(pid) { log.Warn("closing stream for blacklisted peer: ", pid) close(ch) + delete(p.peers, pid) s.Reset() continue } diff --git a/pubsub_test.go b/pubsub_test.go new file mode 100644 index 00000000..4a033159 --- /dev/null +++ b/pubsub_test.go @@ -0,0 +1,49 @@ +package pubsub + +import ( + "context" + "testing" + "time" +) + +// See https://github.com/libp2p/go-libp2p-pubsub/issues/426 +func TestPubSubRemovesBlacklistedPeer(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + + hosts := getNetHosts(t, ctx, 2) + + bl := NewMapBlacklist() + + psubs0 := getPubsub(ctx, hosts[0]) + psubs1 := getPubsub(ctx, hosts[1], WithBlacklist(bl)) + connect(t, hosts[0], hosts[1]) + + // Bad peer is blacklisted after it has connected. + // Calling p.BlacklistPeer directly does the right thing but we should also clean + // up the peer if it has been added the the blacklist by another means. + bl.Add(hosts[0].ID()) + + _, err := psubs0.Subscribe("test") + if err != nil { + t.Fatal(err) + } + + sub1, err := psubs1.Subscribe("test") + if err != nil { + t.Fatal(err) + } + + time.Sleep(time.Millisecond * 100) + + psubs0.Publish("test", []byte("message")) + + wctx, cancel2 := context.WithTimeout(ctx, 1*time.Second) + defer cancel2() + + _, _ = sub1.Next(wctx) + + // Explicitly cancel context so PubSub cleans up peer channels. + // Issue 426 reports a panic due to a peer channel being closed twice. + cancel() + time.Sleep(time.Millisecond * 100) +}