Skip to content

Commit

Permalink
Switch to the new peer notify mechanism (libp2p#564)
Browse files Browse the repository at this point in the history
1. Only listen for peers added and identify events.
2. Remove the old "Limited" check. Peers only show up as "Connected" if
they have non-limited connections.
3. Don't bother listening for new connections directly and/or
connectivity changes. We'll get a new identify event per new connection
regardless.

fixes libp2p#546
  • Loading branch information
Stebalien authored Jul 11, 2024
1 parent 1f5b81f commit b23b3ee
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 78 deletions.
75 changes: 0 additions & 75 deletions notify.go

This file was deleted.

112 changes: 112 additions & 0 deletions peer_notify.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package pubsub

import (
"context"

"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
)

func (ps *PubSub) watchForNewPeers(ctx context.Context) {
// We don't bother subscribing to "connectivity" events because we always run identify after
// every new connection.
sub, err := ps.host.EventBus().Subscribe([]interface{}{
&event.EvtPeerIdentificationCompleted{},
&event.EvtPeerProtocolsUpdated{},
})
if err != nil {
log.Errorf("failed to subscribe to peer identification events: %v", err)
return
}
defer sub.Close()

ps.newPeersPrioLk.RLock()
ps.newPeersMx.Lock()
for _, pid := range ps.host.Network().Peers() {
if ps.host.Network().Connectedness(pid) != network.Connected {
continue
}
ps.newPeersPend[pid] = struct{}{}
}
ps.newPeersMx.Unlock()
ps.newPeersPrioLk.RUnlock()

select {
case ps.newPeers <- struct{}{}:
default:
}

var supportsProtocol func(protocol.ID) bool
if ps.protoMatchFunc != nil {
var supportedProtocols []func(protocol.ID) bool
for _, proto := range ps.rt.Protocols() {

supportedProtocols = append(supportedProtocols, ps.protoMatchFunc(proto))
}
supportsProtocol = func(proto protocol.ID) bool {
for _, fn := range supportedProtocols {
if (fn)(proto) {
return true
}
}
return false
}
} else {
supportedProtocols := make(map[protocol.ID]struct{})
for _, proto := range ps.rt.Protocols() {
supportedProtocols[proto] = struct{}{}
}
supportsProtocol = func(proto protocol.ID) bool {
_, ok := supportedProtocols[proto]
return ok
}
}

for ctx.Err() == nil {
var ev any
select {
case <-ctx.Done():
return
case ev = <-sub.Out():
}

var protos []protocol.ID
var peer peer.ID
switch ev := ev.(type) {
case event.EvtPeerIdentificationCompleted:
peer = ev.Peer
protos = ev.Protocols
case event.EvtPeerProtocolsUpdated:
peer = ev.Peer
protos = ev.Added
default:
continue
}

// We don't bother checking connectivity (connected and non-"limited") here because
// we'll check when actually handling the new peer.

for _, p := range protos {
if supportsProtocol(p) {
ps.notifyNewPeer(peer)
break
}
}
}

}

func (ps *PubSub) notifyNewPeer(peer peer.ID) {
ps.newPeersPrioLk.RLock()
ps.newPeersMx.Lock()
ps.newPeersPend[peer] = struct{}{}
ps.newPeersMx.Unlock()
ps.newPeersPrioLk.RUnlock()

select {
case ps.newPeers <- struct{}{}:
default:
}
}
6 changes: 3 additions & 3 deletions pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,14 +327,12 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option
h.SetStreamHandler(id, ps.handleNewStream)
}
}
h.Network().Notify((*PubSubNotif)(ps))
go ps.watchForNewPeers(ctx)

ps.val.Start(ps)

go ps.processLoop(ctx)

(*PubSubNotif)(ps).Initialize()

return ps, nil
}

Expand Down Expand Up @@ -687,6 +685,8 @@ func (p *PubSub) handlePendingPeers() {
p.newPeersPrioLk.Unlock()

for pid := range newPeers {
// Make sure we have a non-limited connection. We do this late because we may have
// disconnected in the meantime.
if p.host.Network().Connectedness(pid) != network.Connected {
continue
}
Expand Down

0 comments on commit b23b3ee

Please sign in to comment.