Skip to content

Commit

Permalink
deduplicate inbound streams
Browse files Browse the repository at this point in the history
  • Loading branch information
vyzo committed Jan 12, 2021
1 parent 352c6b9 commit 8676a0e
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 1 deletion.
20 changes: 19 additions & 1 deletion comm.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,17 @@ func (p *PubSub) getHelloPacket() *RPC {
}

func (p *PubSub) handleNewStream(s network.Stream) {
peer := s.Conn().RemotePeer()

p.inboundStreamsMx.Lock()
other, dup := p.inboundStreams[peer]
if dup {
log.Debugf("duplicate inbound stream from %s; resetting other stream", peer)
other.Reset()
}
p.inboundStreams[peer] = s
p.inboundStreamsMx.Unlock()

r := protoio.NewDelimitedReader(s, p.maxMessageSize)
for {
rpc := new(RPC)
Expand All @@ -54,10 +65,17 @@ func (p *PubSub) handleNewStream(s network.Stream) {
// but it doesn't hurt to send it.
s.Close()
}

p.inboundStreamsMx.Lock()
if p.inboundStreams[peer] == s {
delete(p.inboundStreams, peer)
}
p.inboundStreamsMx.Unlock()

return
}

rpc.from = s.Conn().RemotePeer()
rpc.from = peer
select {
case p.incoming <- rpc:
case <-p.ctx.Done():
Expand Down
4 changes: 4 additions & 0 deletions pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,9 @@ type PubSub struct {

peers map[peer.ID]chan *RPC

inboundStreamsMx sync.Mutex
inboundStreams map[peer.ID]network.Stream

seenMessagesMx sync.Mutex
seenMessages *timecache.TimeCache

Expand Down Expand Up @@ -253,6 +256,7 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option
myRelays: make(map[string]int),
topics: make(map[string]map[peer.ID]struct{}),
peers: make(map[peer.ID]chan *RPC),
inboundStreams: make(map[peer.ID]network.Stream),
blacklist: NewMapBlacklist(),
blacklistPeer: make(chan peer.ID),
seenMessages: timecache.NewTimeCache(TimeCacheDuration),
Expand Down

0 comments on commit 8676a0e

Please sign in to comment.