Skip to content

Commit

Permalink
Reuse sync client unless peer addresses changed (#166)
Browse files Browse the repository at this point in the history
A new sync client was being created for each advertisement that entries were synced for. This was not a problem for plain HTTP since the HTTP client was reused. However for libp2phttp a new namespaced client was created each time. This established another connection between the indexer and the index-provider. Long advertisement chains could eat up all network resources and cause indexing to grind to a halt.

This PR fixes this by reusing each index-provider's sync client to sync advertisement chains and entries chains. A sync client is kept with the index-provider dagsync subscription handler. A new sync client is created only if there is no sync client yet or if the index-provider's addresses have changed.
  • Loading branch information
gammazero authored Mar 8, 2024
1 parent a15d27c commit fa82b6f
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 34 deletions.
1 change: 1 addition & 0 deletions dagsync/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@ type Publisher interface {
type Syncer interface {
GetHead(context.Context) (cid.Cid, error)
Sync(ctx context.Context, nextCid cid.Cid, sel ipld.Node) error
SameAddrs([]multiaddr.Multiaddr) bool
}
35 changes: 20 additions & 15 deletions dagsync/ipnisync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
libp2phttp "github.com/libp2p/go-libp2p/p2p/http"
"github.com/multiformats/go-multiaddr"
"github.com/multiformats/go-multihash"
)

Expand All @@ -50,11 +51,11 @@ type Sync struct {

// Syncer provides sync functionality for a single sync with a peer.
type Syncer struct {
client *http.Client
peerID peer.ID
rootURL url.URL
urls []*url.URL
sync *Sync
client *http.Client
peerInfo peer.AddrInfo
rootURL url.URL
urls []*url.URL
sync *Sync

// For legacy HTTP and external server support without IPNI path.
noPath bool
Expand Down Expand Up @@ -157,11 +158,11 @@ func (s *Sync) NewSyncer(peerInfo peer.AddrInfo) (*Syncer, error) {
}

return &Syncer{
client: httpClient,
peerID: peerInfo.ID,
rootURL: *urls[0],
urls: urls[1:],
sync: s,
client: httpClient,
peerInfo: peerInfo,
rootURL: *urls[0],
urls: urls[1:],
sync: s,

plainHTTP: plainHTTP,
}, nil
Expand Down Expand Up @@ -190,10 +191,10 @@ func (s *Syncer) GetHead(ctx context.Context) (cid.Cid, error) {
if err != nil {
return cid.Undef, err
}
if s.peerID == "" {
if s.peerInfo.ID == "" {
log.Warn("Cannot verify publisher signature without peer ID")
} else if signerID != s.peerID {
return cid.Undef, fmt.Errorf("found head signed by an unexpected peer, peerID: %s, signed-by: %s", s.peerID.String(), signerID.String())
} else if signerID != s.peerInfo.ID {
return cid.Undef, fmt.Errorf("found head signed by an unexpected peer, peerID: %s, signed-by: %s", s.peerInfo.ID.String(), signerID.String())
}

// TODO: Check that the returned topic, if any, matches the expected topic.
Expand All @@ -206,6 +207,10 @@ func (s *Syncer) GetHead(ctx context.Context) (cid.Cid, error) {
return signedHead.Head.(cidlink.Link).Cid, nil
}

func (s *Syncer) SameAddrs(maddrs []multiaddr.Multiaddr) bool {
return mautil.MultiaddrsEqual(s.peerInfo.Addrs, maddrs)
}

// Sync syncs the peer's advertisement chain or entries chain.
func (s *Syncer) Sync(ctx context.Context, nextCid cid.Cid, sel ipld.Node) error {
xsel, err := selector.CompileSelector(sel)
Expand All @@ -228,7 +233,7 @@ func (s *Syncer) Sync(ctx context.Context, nextCid cid.Cid, sel ipld.Node) error
// hook at the end when we no longer care what it does with the blocks.
if s.sync.blockHook != nil {
for _, c := range cids {
s.sync.blockHook(s.peerID, c)
s.sync.blockHook(s.peerInfo.ID, c)
}
}

Expand Down Expand Up @@ -306,7 +311,7 @@ retry:
goto nextURL
}
if !doneRetry && errors.Is(err, network.ErrReset) {
log.Errorw("stream reset err, retrying", "publisher", s.peerID, "url", fetchURL.String())
log.Errorw("stream reset err, retrying", "publisher", s.peerInfo.ID, "url", fetchURL.String())
// Only retry the same fetch once.
doneRetry = true
goto retry
Expand Down
48 changes: 29 additions & 19 deletions dagsync/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ type handler struct {
pendingMsg atomic.Pointer[announce.Announce]
// expires is the time the handler is removed if it remains idle.
expires time.Time
// syncer is a sync client for this handler's peer.
syncer Syncer
}

// wrapBlockHook wraps a possibly nil block hook func to allow dispatching to a
Expand Down Expand Up @@ -415,7 +417,9 @@ func (s *Subscriber) SyncAdChain(ctx context.Context, peerInfo peer.AddrInfo, op

log := log.With("peer", peerInfo.ID)

syncer, updatePeerstore, err := s.makeSyncer(peerInfo, true)
hnd := s.getOrCreateHandler(peerInfo.ID)

syncer, updatePeerstore, err := hnd.makeSyncer(peerInfo, true)
if err != nil {
return cid.Undef, err
}
Expand Down Expand Up @@ -482,8 +486,6 @@ func (s *Subscriber) SyncAdChain(ctx context.Context, peerInfo peer.AddrInfo, op
segdl = opts.segDepthLimit
}

// Get existing or create new handler for the specified peer (publisher).
hnd := s.getOrCreateHandler(peerInfo.ID)
sel := ExploreRecursiveWithStopNode(depthLimit, s.adsSelectorSeq, stopLnk)

syncCount, err := hnd.handle(ctx, nextCid, sel, syncer, opts.blockHook, segdl, stopAtCid)
Expand Down Expand Up @@ -560,16 +562,15 @@ func (s *Subscriber) syncEntries(ctx context.Context, peerInfo peer.AddrInfo, en
return err
}

syncer, _, err := s.makeSyncer(peerInfo, false)
hnd := s.getOrCreateHandler(peerInfo.ID)

syncer, _, err := hnd.makeSyncer(peerInfo, false)
if err != nil {
return err
}

log.Debugw("Start entries sync", "peer", peerInfo.ID, "cid", entCid)

// Get existing or create new handler for the specified peer (publisher).
hnd := s.getOrCreateHandler(peerInfo.ID)

_, err = hnd.handle(ctx, entCid, sel, syncer, bh, segdl, cid.Undef)
if err != nil {
return fmt.Errorf("sync handler failed: %w", err)
Expand Down Expand Up @@ -765,7 +766,9 @@ func delNotPresent(peerStore peerstore.Peerstore, peerID peer.ID, addrs []multia
}
}

func (s *Subscriber) makeSyncer(peerInfo peer.AddrInfo, doUpdate bool) (Syncer, func(), error) {
func (h *handler) makeSyncer(peerInfo peer.AddrInfo, doUpdate bool) (Syncer, func(), error) {
s := h.subscriber

// Check for an HTTP address in peerAddrs, or if not given, in the http
// peerstore.
var httpAddrs []multiaddr.Multiaddr
Expand All @@ -787,9 +790,12 @@ func (s *Subscriber) makeSyncer(peerInfo peer.AddrInfo, doUpdate bool) (Syncer,
ID: peerInfo.ID,
Addrs: httpAddrs,
}
syncer, err := s.ipniSync.NewSyncer(httpPeerInfo)
if err != nil {
return nil, nil, fmt.Errorf("cannot create ipni-sync handler: %w", err)
if h.syncer == nil || !h.syncer.SameAddrs(httpAddrs) {
syncer, err := s.ipniSync.NewSyncer(httpPeerInfo)
if err != nil {
return nil, nil, fmt.Errorf("cannot create ipni-sync handler: %w", err)
}
h.syncer = syncer
}
if doUpdate {
update = func() {
Expand All @@ -798,7 +804,7 @@ func (s *Subscriber) makeSyncer(peerInfo peer.AddrInfo, doUpdate bool) (Syncer,
s.httpPeerstore.AddAddrs(peerInfo.ID, httpAddrs, s.addrTTL)
}
}
return syncer, update, nil
return h.syncer, update, nil
}
if doUpdate {
peerStore := s.host.Peerstore()
Expand All @@ -817,14 +823,17 @@ func (s *Subscriber) makeSyncer(peerInfo peer.AddrInfo, doUpdate bool) (Syncer,
}
}

syncer, err := s.ipniSync.NewSyncer(peerInfo)
if err != nil {
if errors.Is(err, ipnisync.ErrNoHTTPServer) {
err = fmt.Errorf("data-transfer sync is not supported: %w", err)
if h.syncer == nil || !h.syncer.SameAddrs(peerInfo.Addrs) {
syncer, err := s.ipniSync.NewSyncer(peerInfo)
if err != nil {
if errors.Is(err, ipnisync.ErrNoHTTPServer) {
err = fmt.Errorf("data-transfer sync is not supported: %w", err)
}
return nil, nil, fmt.Errorf("cannot create ipni-sync handler: %w", err)
}
return nil, nil, fmt.Errorf("cannot create ipni-sync handler: %w", err)
h.syncer = syncer
}
return syncer, update, nil
return h.syncer, update, nil
}

// asyncSyncAdChain processes the latest announce message received over pubsub
Expand All @@ -842,7 +851,8 @@ func (h *handler) asyncSyncAdChain(ctx context.Context) {
ID: amsg.PeerID,
Addrs: amsg.Addrs,
}
syncer, updatePeerstore, err := h.subscriber.makeSyncer(peerInfo, true)

syncer, updatePeerstore, err := h.makeSyncer(peerInfo, true)
if err != nil {
log.Errorw("Cannot make syncer for announce", "err", err, "peer", h.peerID)
return
Expand Down
24 changes: 24 additions & 0 deletions mautil/mautil.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
package mautil

import (
"bytes"
"net"
"slices"

"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
Expand Down Expand Up @@ -85,10 +87,32 @@ func CleanPeerAddrInfo(target peer.AddrInfo) peer.AddrInfo {
for i := 0; i < len(target.Addrs); {
if target.Addrs[i] == nil {
target.Addrs[i] = target.Addrs[len(target.Addrs)-1]
target.Addrs[len(target.Addrs)-1] = nil
target.Addrs = target.Addrs[:len(target.Addrs)-1]
continue
}
i++
}
return target
}

func MultiaddrsEqual(ma1, ma2 []multiaddr.Multiaddr) bool {
if len(ma1) != len(ma2) {
return false
}
if len(ma1) == 0 {
return true // both are nil or empty
}
if len(ma1) == 1 {
return ma1[0].Equal(ma2[0])
}
// Use slices package, as sort function does not allocate (sort.Slice does).
slices.SortFunc(ma1, func(a, b multiaddr.Multiaddr) int { return bytes.Compare(a.Bytes(), b.Bytes()) })
slices.SortFunc(ma2, func(a, b multiaddr.Multiaddr) int { return bytes.Compare(a.Bytes(), b.Bytes()) })
for i := 0; i < len(ma1); i++ {
if !ma1[i].Equal(ma2[i]) {
return false
}
}
return true
}
51 changes: 51 additions & 0 deletions mautil/mautil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,3 +125,54 @@ func TestCleanPeerAddrInfo(t *testing.T) {
require.ElementsMatch(t, goodAddrs, cleaned.Addrs)
})
}

func TestMultiaddrsEqual(t *testing.T) {
maddrs, err := mautil.StringsToMultiaddrs([]string{
"/ip4/11.0.0.0/tcp/80/http",
"/ip6/fc00::/tcp/1717",
"/ip6/fe00::/tcp/8080/https",
"/dns4/example.net/tcp/1234",
})
require.NoError(t, err)
rev := make([]multiaddr.Multiaddr, len(maddrs))
j := len(maddrs) - 1
for i := 0; i <= j; i++ {
rev[i] = maddrs[j-i]
}
m1 := make([]multiaddr.Multiaddr, len(maddrs))
m2 := make([]multiaddr.Multiaddr, len(maddrs))

copy(m1, maddrs)
copy(m2, rev)
require.True(t, mautil.MultiaddrsEqual(m1, m2))

copy(m1, maddrs)
copy(m2, rev)
require.Truef(t, mautil.MultiaddrsEqual(m1[1:], m2[:len(m2)-1]), "m1=%v, m2=%v", m1[1:], m2[:len(m2)-1])

copy(m1, maddrs)
copy(m2, rev)
require.True(t, mautil.MultiaddrsEqual(m1[2:3], m2[1:2]))

copy(m1, maddrs)
copy(m2, rev)
require.True(t, mautil.MultiaddrsEqual(m1[:0], m2[:0]))

require.True(t, mautil.MultiaddrsEqual(nil, nil))

copy(m1, maddrs)
copy(m2, rev)
require.True(t, mautil.MultiaddrsEqual(m1[:0], nil))

copy(m1, maddrs)
copy(m2, rev)
require.False(t, mautil.MultiaddrsEqual(m1[1:], m2[1:]))

copy(m1, maddrs)
copy(m2, rev)
require.False(t, mautil.MultiaddrsEqual(m1, m2[:len(m2)-1]))

copy(m1, maddrs)
copy(m2, rev)
require.False(t, mautil.MultiaddrsEqual(m1[:1], m2[:1]))
}

0 comments on commit fa82b6f

Please sign in to comment.