Skip to content

Commit

Permalink
Reuse sync client unless peer addresses changed
Browse files Browse the repository at this point in the history
A new sync client was being created for each advertisement that entries were synced for. This was not a problem for plain HTTP since the HTTP client was reused. However for libp2phttp a new namespaced client was created each time. This established another connection between the indexer and the index-provider. Long advertisement chains could eat up all network resources and cause indexing to grind to a halt.

This PR fixes this by reusing each index-provider's sync client to sync advertisement chains and entries chains. A sync client is kept with the index-provider dagsync subscription handler. A new sync client is created only if there is no sync client yet or if the index-provider's addresses have changed.
  • Loading branch information
gammazero committed Mar 8, 2024
1 parent fd4c85f commit 1f29049
Show file tree
Hide file tree
Showing 8 changed files with 152 additions and 49 deletions.
4 changes: 2 additions & 2 deletions dagsync/dtsync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,9 @@ func (s *Sync) Close() error {
}

// NewSyncer creates a new Syncer to use for a single sync operation against a peer.
func (s *Sync) NewSyncer(peerID peer.ID, topicName string) *Syncer {
func (s *Sync) NewSyncer(peerInfo peer.AddrInfo, topicName string) *Syncer {
return &Syncer{
peerID: peerID,
peerInfo: peerInfo,
sync: s,
topicName: topicName,
ls: s.ls,
Expand Down
20 changes: 13 additions & 7 deletions dagsync/dtsync/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,26 @@ import (
"github.com/ipld/go-ipld-prime/traversal"
"github.com/ipld/go-ipld-prime/traversal/selector"
"github.com/ipni/go-libipni/dagsync/dtsync/head"
"github.com/ipni/go-libipni/mautil"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
)

// Syncer handles a single sync with a provider.
type Syncer struct {
peerID peer.ID
peerInfo peer.AddrInfo
sync *Sync
ls *ipld.LinkSystem
topicName string
}

// GetHead queries a provider for the latest CID.
func (s *Syncer) GetHead(ctx context.Context) (cid.Cid, error) {
return head.QueryRootCid(ctx, s.sync.host, s.topicName, s.peerID)
return head.QueryRootCid(ctx, s.sync.host, s.topicName, s.peerInfo.ID)
}

func (s *Syncer) SameAddrs(maddrs []multiaddr.Multiaddr) bool {
return mautil.MultiaddrsEqual(s.peerInfo.Addrs, maddrs)
}

// Sync opens a datatransfer data channel and uses the selector to pull data
Expand All @@ -42,21 +48,21 @@ func (s *Syncer) Sync(ctx context.Context, nextCid cid.Cid, sel ipld.Node) error
// help with determining what the "next" CID would be if a DAG is partially
// present. Similar to what SegmentSyncActions does.
if cids, ok := s.has(ctx, nextCid, sel); ok {
s.sync.signalLocallyFoundCids(s.peerID, cids)
inProgressSyncK := inProgressSyncKey{nextCid, s.peerID}
s.sync.signalLocallyFoundCids(s.peerInfo.ID, cids)
inProgressSyncK := inProgressSyncKey{nextCid, s.peerInfo.ID}
s.sync.signalSyncDone(inProgressSyncK, nil)
return nil
}

inProgressSyncK := inProgressSyncKey{nextCid, s.peerID}
inProgressSyncK := inProgressSyncKey{nextCid, s.peerInfo.ID}
syncDone := s.sync.notifyOnSyncDone(inProgressSyncK)

log.Debugw("Starting data channel for message source", "cid", nextCid, "source_peer", s.peerID)
log.Debugw("Starting data channel for message source", "cid", nextCid, "source_peer", s.peerInfo.ID)

v := Voucher{&nextCid}
// Do not pass cancelable context into OpenPullDataChannel because a
// canceled context causes it to hang.
_, err := s.sync.dtManager.OpenPullDataChannel(context.Background(), s.peerID, v.AsVoucher(), nextCid, sel)
_, err := s.sync.dtManager.OpenPullDataChannel(context.Background(), s.peerInfo.ID, v.AsVoucher(), nextCid, sel)
if err != nil {
s.sync.signalSyncDone(inProgressSyncK, nil)
return fmt.Errorf("cannot open data channel: %w", err)
Expand Down
12 changes: 10 additions & 2 deletions dagsync/dtsync/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,11 @@ func TestDTSync_CallsBlockHookWhenCIDsAreFullyFoundLocally(t *testing.T) {
t.Cleanup(func() { require.NoError(t, subject.Close()) })

// Sync l3 from the publisher.
syncer := subject.NewSyncer(pubh.ID(), topic)
pubInfo := peer.AddrInfo{
ID: pubh.ID(),
Addrs: pubh.Addrs(),
}
syncer := subject.NewSyncer(pubInfo, topic)
require.NoError(t, syncer.Sync(ctx, l3.(cidlink.Link).Cid, selectorparse.CommonSelector_ExploreAllRecursively))

// Assert there are three synced CIDs.
Expand Down Expand Up @@ -175,7 +179,11 @@ func TestDTSync_CallsBlockHookWhenCIDsArePartiallyFoundLocally(t *testing.T) {
t.Cleanup(func() { require.NoError(t, subject.Close()) })

// Sync l3 from the publisher.
syncer := subject.NewSyncer(pubh.ID(), topic)
pubInfo := peer.AddrInfo{
ID: pubh.ID(),
Addrs: pubh.Addrs(),
}
syncer := subject.NewSyncer(pubInfo, topic)
require.NoError(t, syncer.Sync(ctx, l3.(cidlink.Link).Cid, selectorparse.CommonSelector_ExploreAllRecursively))

// Assert there are three synced CIDs.
Expand Down
1 change: 1 addition & 0 deletions dagsync/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@ type Publisher interface {
type Syncer interface {
GetHead(context.Context) (cid.Cid, error)
Sync(ctx context.Context, nextCid cid.Cid, sel ipld.Node) error
SameAddrs([]multiaddr.Multiaddr) bool
}
35 changes: 20 additions & 15 deletions dagsync/ipnisync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
libp2phttp "github.com/libp2p/go-libp2p/p2p/http"
"github.com/multiformats/go-multiaddr"
"github.com/multiformats/go-multihash"
)

Expand All @@ -50,11 +51,11 @@ type Sync struct {

// Syncer provides sync functionality for a single sync with a peer.
type Syncer struct {
client *http.Client
peerID peer.ID
rootURL url.URL
urls []*url.URL
sync *Sync
client *http.Client
peerInfo peer.AddrInfo
rootURL url.URL
urls []*url.URL
sync *Sync

// For legacy HTTP and external server support without IPNI path.
noPath bool
Expand Down Expand Up @@ -173,11 +174,11 @@ func (s *Sync) NewSyncer(peerInfo peer.AddrInfo) (*Syncer, error) {
}

return &Syncer{
client: httpClient,
peerID: peerInfo.ID,
rootURL: *urls[0],
urls: urls[1:],
sync: s,
client: httpClient,
peerInfo: peerInfo,
rootURL: *urls[0],
urls: urls[1:],
sync: s,

plainHTTP: plainHTTP,
}, nil
Expand Down Expand Up @@ -206,10 +207,10 @@ func (s *Syncer) GetHead(ctx context.Context) (cid.Cid, error) {
if err != nil {
return cid.Undef, err
}
if s.peerID == "" {
if s.peerInfo.ID == "" {
log.Warn("Cannot verify publisher signature without peer ID")
} else if signerID != s.peerID {
return cid.Undef, fmt.Errorf("found head signed by an unexpected peer, peerID: %s, signed-by: %s", s.peerID.String(), signerID.String())
} else if signerID != s.peerInfo.ID {
return cid.Undef, fmt.Errorf("found head signed by an unexpected peer, peerID: %s, signed-by: %s", s.peerInfo.ID.String(), signerID.String())
}

// TODO: Check that the returned topic, if any, matches the expected topic.
Expand All @@ -222,6 +223,10 @@ func (s *Syncer) GetHead(ctx context.Context) (cid.Cid, error) {
return signedHead.Head.(cidlink.Link).Cid, nil
}

func (s *Syncer) SameAddrs(maddrs []multiaddr.Multiaddr) bool {
return mautil.MultiaddrsEqual(s.peerInfo.Addrs, maddrs)
}

// Sync syncs the peer's advertisement chain or entries chain.
func (s *Syncer) Sync(ctx context.Context, nextCid cid.Cid, sel ipld.Node) error {
xsel, err := selector.CompileSelector(sel)
Expand All @@ -244,7 +249,7 @@ func (s *Syncer) Sync(ctx context.Context, nextCid cid.Cid, sel ipld.Node) error
// hook at the end when we no longer care what it does with the blocks.
if s.sync.blockHook != nil {
for _, c := range cids {
s.sync.blockHook(s.peerID, c)
s.sync.blockHook(s.peerInfo.ID, c)
}
}

Expand Down Expand Up @@ -322,7 +327,7 @@ retry:
goto nextURL
}
if !doneRetry && errors.Is(err, network.ErrReset) {
log.Errorw("stream reset err, retrying", "publisher", s.peerID, "url", fetchURL.String())
log.Errorw("stream reset err, retrying", "publisher", s.peerInfo.ID, "url", fetchURL.String())
// Only retry the same fetch once.
doneRetry = true
goto retry
Expand Down
54 changes: 31 additions & 23 deletions dagsync/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ type handler struct {
pendingMsg atomic.Pointer[announce.Announce]
// expires is the time the handler is removed if it remains idle.
expires time.Time
// syncer is a sync client for this handler's peer.
syncer Syncer
}

// wrapBlockHook wraps a possibly nil block hook func to allow a for
Expand Down Expand Up @@ -424,7 +426,9 @@ func (s *Subscriber) SyncAdChain(ctx context.Context, peerInfo peer.AddrInfo, op

log := log.With("peer", peerInfo.ID)

syncer, updatePeerstore, err := s.makeSyncer(peerInfo, true)
hnd := s.getOrCreateHandler(peerInfo.ID)

syncer, updatePeerstore, err := hnd.makeSyncer(peerInfo, true)
if err != nil {
return cid.Undef, err
}
Expand Down Expand Up @@ -491,9 +495,6 @@ func (s *Subscriber) SyncAdChain(ctx context.Context, peerInfo peer.AddrInfo, op
segdl = opts.segDepthLimit
}

// Check for an existing handler for the specified peer (publisher). If
// none, create one if allowed.
hnd := s.getOrCreateHandler(peerInfo.ID)
sel := ExploreRecursiveWithStopNode(depthLimit, s.adsSelectorSeq, stopLnk)

syncCount, err := hnd.handle(ctx, nextCid, sel, syncer, opts.blockHook, segdl, stopAtCid)
Expand Down Expand Up @@ -570,17 +571,15 @@ func (s *Subscriber) syncEntries(ctx context.Context, peerInfo peer.AddrInfo, en
return err
}

syncer, _, err := s.makeSyncer(peerInfo, false)
hnd := s.getOrCreateHandler(peerInfo.ID)

syncer, _, err := hnd.makeSyncer(peerInfo, false)
if err != nil {
return err
}

log.Debugw("Start entries sync", "peer", peerInfo.ID, "cid", entCid)

// Check for an existing handler for the specified peer (publisher). If
// none, create one if allowed.
hnd := s.getOrCreateHandler(peerInfo.ID)

_, err = hnd.handle(ctx, entCid, sel, syncer, bh, segdl, cid.Undef)
if err != nil {
return fmt.Errorf("sync handler failed: %w", err)
Expand Down Expand Up @@ -775,7 +774,9 @@ func delNotPresent(peerStore peerstore.Peerstore, peerID peer.ID, addrs []multia
}
}

func (s *Subscriber) makeSyncer(peerInfo peer.AddrInfo, doUpdate bool) (Syncer, func(), error) {
func (h *handler) makeSyncer(peerInfo peer.AddrInfo, doUpdate bool) (Syncer, func(), error) {
s := h.subscriber

// Check for an HTTP address in peerAddrs, or if not given, in the http
// peerstore. This gives a preference to use ipnisync over dtsync.
var httpAddrs []multiaddr.Multiaddr
Expand All @@ -797,9 +798,12 @@ func (s *Subscriber) makeSyncer(peerInfo peer.AddrInfo, doUpdate bool) (Syncer,
ID: peerInfo.ID,
Addrs: httpAddrs,
}
syncer, err := s.ipniSync.NewSyncer(httpPeerInfo)
if err != nil {
return nil, nil, fmt.Errorf("cannot create ipni-sync handler: %w", err)
if h.syncer == nil || !h.syncer.SameAddrs(httpAddrs) {
syncer, err := s.ipniSync.NewSyncer(httpPeerInfo)
if err != nil {
return nil, nil, fmt.Errorf("cannot create ipni-sync handler: %w", err)
}
h.syncer = syncer
}
if doUpdate {
update = func() {
Expand All @@ -808,7 +812,7 @@ func (s *Subscriber) makeSyncer(peerInfo peer.AddrInfo, doUpdate bool) (Syncer,
s.httpPeerstore.AddAddrs(peerInfo.ID, httpAddrs, s.addrTTL)
}
}
return syncer, update, nil
return h.syncer, update, nil
}
if doUpdate {
peerStore := s.host.Peerstore()
Expand All @@ -827,16 +831,20 @@ func (s *Subscriber) makeSyncer(peerInfo peer.AddrInfo, doUpdate bool) (Syncer,
}
}

syncer, err := s.ipniSync.NewSyncer(peerInfo)
if err != nil {
if errors.Is(err, ipnisync.ErrNoHTTPServer) {
log.Warnw("Using data-transfer sync", "peer", peerInfo.ID, "reason", err.Error())
// Publisher is libp2p without HTTP, so use the dtSync.
return s.dtSync.NewSyncer(peerInfo.ID, s.topicName), update, nil
if h.syncer == nil || !h.syncer.SameAddrs(peerInfo.Addrs) {
syncer, err := s.ipniSync.NewSyncer(peerInfo)
if err != nil {
if errors.Is(err, ipnisync.ErrNoHTTPServer) {
log.Warnw("Using data-transfer sync", "peer", peerInfo.ID, "reason", err.Error())
// Publisher is libp2p without HTTP, so use the dtSync.
h.syncer = s.dtSync.NewSyncer(peerInfo, s.topicName)
return h.syncer, update, nil
}
return nil, nil, fmt.Errorf("cannot create ipni-sync handler: %w", err)
}
return nil, nil, fmt.Errorf("cannot create ipni-sync handler: %w", err)
h.syncer = syncer
}
return syncer, update, nil
return h.syncer, update, nil
}

// asyncSyncAdChain processes the latest announce message received over pubsub
Expand All @@ -854,7 +862,7 @@ func (h *handler) asyncSyncAdChain(ctx context.Context) {
ID: amsg.PeerID,
Addrs: amsg.Addrs,
}
syncer, updatePeerstore, err := h.subscriber.makeSyncer(peerInfo, true)
syncer, updatePeerstore, err := h.makeSyncer(peerInfo, true)
if err != nil {
log.Errorw("Cannot make syncer for announce", "err", err, "peer", h.peerID)
return
Expand Down
24 changes: 24 additions & 0 deletions mautil/mautil.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
package mautil

import (
"bytes"
"net"
"slices"

"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
Expand Down Expand Up @@ -85,10 +87,32 @@ func CleanPeerAddrInfo(target peer.AddrInfo) peer.AddrInfo {
for i := 0; i < len(target.Addrs); {
if target.Addrs[i] == nil {
target.Addrs[i] = target.Addrs[len(target.Addrs)-1]
target.Addrs[len(target.Addrs)-1] = nil
target.Addrs = target.Addrs[:len(target.Addrs)-1]
continue
}
i++
}
return target
}

func MultiaddrsEqual(ma1, ma2 []multiaddr.Multiaddr) bool {
if len(ma1) != len(ma2) {
return false
}
if len(ma1) == 0 {
return true // both are nil or empty
}
if len(ma1) == 1 {
return ma1[0].Equal(ma2[0])
}
// Use slices package, as sort function does not allocate (sort.Slice does).
slices.SortFunc(ma1, func(a, b multiaddr.Multiaddr) int { return bytes.Compare(a.Bytes(), b.Bytes()) })
slices.SortFunc(ma2, func(a, b multiaddr.Multiaddr) int { return bytes.Compare(a.Bytes(), b.Bytes()) })
for i := 0; i < len(ma1); i++ {
if !ma1[i].Equal(ma2[i]) {
return false
}
}
return true
}
Loading

0 comments on commit 1f29049

Please sign in to comment.