diff --git a/dagsync/dtsync/sync.go b/dagsync/dtsync/sync.go index cece6a8..17f5953 100644 --- a/dagsync/dtsync/sync.go +++ b/dagsync/dtsync/sync.go @@ -102,9 +102,9 @@ func (s *Sync) Close() error { } // NewSyncer creates a new Syncer to use for a single sync operation against a peer. -func (s *Sync) NewSyncer(peerID peer.ID, topicName string) *Syncer { +func (s *Sync) NewSyncer(peerInfo peer.AddrInfo, topicName string) *Syncer { return &Syncer{ - peerID: peerID, + peerInfo: peerInfo, sync: s, topicName: topicName, ls: s.ls, diff --git a/dagsync/dtsync/syncer.go b/dagsync/dtsync/syncer.go index e8a8141..0c41ffe 100644 --- a/dagsync/dtsync/syncer.go +++ b/dagsync/dtsync/syncer.go @@ -13,12 +13,14 @@ import ( "github.com/ipld/go-ipld-prime/traversal" "github.com/ipld/go-ipld-prime/traversal/selector" "github.com/ipni/go-libipni/dagsync/dtsync/head" + "github.com/ipni/go-libipni/mautil" "github.com/libp2p/go-libp2p/core/peer" + "github.com/multiformats/go-multiaddr" ) // Syncer handles a single sync with a provider. type Syncer struct { - peerID peer.ID + peerInfo peer.AddrInfo sync *Sync ls *ipld.LinkSystem topicName string @@ -26,7 +28,11 @@ type Syncer struct { // GetHead queries a provider for the latest CID. func (s *Syncer) GetHead(ctx context.Context) (cid.Cid, error) { - return head.QueryRootCid(ctx, s.sync.host, s.topicName, s.peerID) + return head.QueryRootCid(ctx, s.sync.host, s.topicName, s.peerInfo.ID) +} + +func (s *Syncer) SameAddrs(maddrs []multiaddr.Multiaddr) bool { + return mautil.MultiaddrsEqual(s.peerInfo.Addrs, maddrs) } // Sync opens a datatransfer data channel and uses the selector to pull data @@ -42,21 +48,21 @@ func (s *Syncer) Sync(ctx context.Context, nextCid cid.Cid, sel ipld.Node) error // help with determining what the "next" CID would be if a DAG is partially // present. Similar to what SegmentSyncActions does. if cids, ok := s.has(ctx, nextCid, sel); ok { - s.sync.signalLocallyFoundCids(s.peerID, cids) - inProgressSyncK := inProgressSyncKey{nextCid, s.peerID} + s.sync.signalLocallyFoundCids(s.peerInfo.ID, cids) + inProgressSyncK := inProgressSyncKey{nextCid, s.peerInfo.ID} s.sync.signalSyncDone(inProgressSyncK, nil) return nil } - inProgressSyncK := inProgressSyncKey{nextCid, s.peerID} + inProgressSyncK := inProgressSyncKey{nextCid, s.peerInfo.ID} syncDone := s.sync.notifyOnSyncDone(inProgressSyncK) - log.Debugw("Starting data channel for message source", "cid", nextCid, "source_peer", s.peerID) + log.Debugw("Starting data channel for message source", "cid", nextCid, "source_peer", s.peerInfo.ID) v := Voucher{&nextCid} // Do not pass cancelable context into OpenPullDataChannel because a // canceled context causes it to hang. - _, err := s.sync.dtManager.OpenPullDataChannel(context.Background(), s.peerID, v.AsVoucher(), nextCid, sel) + _, err := s.sync.dtManager.OpenPullDataChannel(context.Background(), s.peerInfo.ID, v.AsVoucher(), nextCid, sel) if err != nil { s.sync.signalSyncDone(inProgressSyncK, nil) return fmt.Errorf("cannot open data channel: %w", err) diff --git a/dagsync/dtsync/syncer_test.go b/dagsync/dtsync/syncer_test.go index f0d79e6..01c9e95 100644 --- a/dagsync/dtsync/syncer_test.go +++ b/dagsync/dtsync/syncer_test.go @@ -81,7 +81,11 @@ func TestDTSync_CallsBlockHookWhenCIDsAreFullyFoundLocally(t *testing.T) { t.Cleanup(func() { require.NoError(t, subject.Close()) }) // Sync l3 from the publisher. - syncer := subject.NewSyncer(pubh.ID(), topic) + pubInfo := peer.AddrInfo{ + ID: pubh.ID(), + Addrs: pubh.Addrs(), + } + syncer := subject.NewSyncer(pubInfo, topic) require.NoError(t, syncer.Sync(ctx, l3.(cidlink.Link).Cid, selectorparse.CommonSelector_ExploreAllRecursively)) // Assert there are three synced CIDs. @@ -175,7 +179,11 @@ func TestDTSync_CallsBlockHookWhenCIDsArePartiallyFoundLocally(t *testing.T) { t.Cleanup(func() { require.NoError(t, subject.Close()) }) // Sync l3 from the publisher. - syncer := subject.NewSyncer(pubh.ID(), topic) + pubInfo := peer.AddrInfo{ + ID: pubh.ID(), + Addrs: pubh.Addrs(), + } + syncer := subject.NewSyncer(pubInfo, topic) require.NoError(t, syncer.Sync(ctx, l3.(cidlink.Link).Cid, selectorparse.CommonSelector_ExploreAllRecursively)) // Assert there are three synced CIDs. diff --git a/dagsync/interface.go b/dagsync/interface.go index 3fa6643..259cab0 100644 --- a/dagsync/interface.go +++ b/dagsync/interface.go @@ -27,4 +27,5 @@ type Publisher interface { type Syncer interface { GetHead(context.Context) (cid.Cid, error) Sync(ctx context.Context, nextCid cid.Cid, sel ipld.Node) error + SameAddrs([]multiaddr.Multiaddr) bool } diff --git a/dagsync/ipnisync/sync.go b/dagsync/ipnisync/sync.go index dab2572..297cd51 100644 --- a/dagsync/ipnisync/sync.go +++ b/dagsync/ipnisync/sync.go @@ -27,6 +27,7 @@ import ( "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" libp2phttp "github.com/libp2p/go-libp2p/p2p/http" + "github.com/multiformats/go-multiaddr" "github.com/multiformats/go-multihash" ) @@ -50,11 +51,11 @@ type Sync struct { // Syncer provides sync functionality for a single sync with a peer. type Syncer struct { - client *http.Client - peerID peer.ID - rootURL url.URL - urls []*url.URL - sync *Sync + client *http.Client + peerInfo peer.AddrInfo + rootURL url.URL + urls []*url.URL + sync *Sync // For legacy HTTP and external server support without IPNI path. noPath bool @@ -173,11 +174,11 @@ func (s *Sync) NewSyncer(peerInfo peer.AddrInfo) (*Syncer, error) { } return &Syncer{ - client: httpClient, - peerID: peerInfo.ID, - rootURL: *urls[0], - urls: urls[1:], - sync: s, + client: httpClient, + peerInfo: peerInfo, + rootURL: *urls[0], + urls: urls[1:], + sync: s, plainHTTP: plainHTTP, }, nil @@ -206,10 +207,10 @@ func (s *Syncer) GetHead(ctx context.Context) (cid.Cid, error) { if err != nil { return cid.Undef, err } - if s.peerID == "" { + if s.peerInfo.ID == "" { log.Warn("Cannot verify publisher signature without peer ID") - } else if signerID != s.peerID { - return cid.Undef, fmt.Errorf("found head signed by an unexpected peer, peerID: %s, signed-by: %s", s.peerID.String(), signerID.String()) + } else if signerID != s.peerInfo.ID { + return cid.Undef, fmt.Errorf("found head signed by an unexpected peer, peerID: %s, signed-by: %s", s.peerInfo.ID.String(), signerID.String()) } // TODO: Check that the returned topic, if any, matches the expected topic. @@ -222,6 +223,10 @@ func (s *Syncer) GetHead(ctx context.Context) (cid.Cid, error) { return signedHead.Head.(cidlink.Link).Cid, nil } +func (s *Syncer) SameAddrs(maddrs []multiaddr.Multiaddr) bool { + return mautil.MultiaddrsEqual(s.peerInfo.Addrs, maddrs) +} + // Sync syncs the peer's advertisement chain or entries chain. func (s *Syncer) Sync(ctx context.Context, nextCid cid.Cid, sel ipld.Node) error { xsel, err := selector.CompileSelector(sel) @@ -244,7 +249,7 @@ func (s *Syncer) Sync(ctx context.Context, nextCid cid.Cid, sel ipld.Node) error // hook at the end when we no longer care what it does with the blocks. if s.sync.blockHook != nil { for _, c := range cids { - s.sync.blockHook(s.peerID, c) + s.sync.blockHook(s.peerInfo.ID, c) } } @@ -322,7 +327,7 @@ retry: goto nextURL } if !doneRetry && errors.Is(err, network.ErrReset) { - log.Errorw("stream reset err, retrying", "publisher", s.peerID, "url", fetchURL.String()) + log.Errorw("stream reset err, retrying", "publisher", s.peerInfo.ID, "url", fetchURL.String()) // Only retry the same fetch once. doneRetry = true goto retry diff --git a/dagsync/subscriber.go b/dagsync/subscriber.go index 342d4aa..e765782 100644 --- a/dagsync/subscriber.go +++ b/dagsync/subscriber.go @@ -160,6 +160,8 @@ type handler struct { pendingMsg atomic.Pointer[announce.Announce] // expires is the time the handler is removed if it remains idle. expires time.Time + // syncer is a sync client for this handler's peer. + syncer Syncer } // wrapBlockHook wraps a possibly nil block hook func to allow a for @@ -424,7 +426,9 @@ func (s *Subscriber) SyncAdChain(ctx context.Context, peerInfo peer.AddrInfo, op log := log.With("peer", peerInfo.ID) - syncer, updatePeerstore, err := s.makeSyncer(peerInfo, true) + hnd := s.getOrCreateHandler(peerInfo.ID) + + syncer, updatePeerstore, err := hnd.makeSyncer(peerInfo, true) if err != nil { return cid.Undef, err } @@ -491,9 +495,6 @@ func (s *Subscriber) SyncAdChain(ctx context.Context, peerInfo peer.AddrInfo, op segdl = opts.segDepthLimit } - // Check for an existing handler for the specified peer (publisher). If - // none, create one if allowed. - hnd := s.getOrCreateHandler(peerInfo.ID) sel := ExploreRecursiveWithStopNode(depthLimit, s.adsSelectorSeq, stopLnk) syncCount, err := hnd.handle(ctx, nextCid, sel, syncer, opts.blockHook, segdl, stopAtCid) @@ -570,17 +571,15 @@ func (s *Subscriber) syncEntries(ctx context.Context, peerInfo peer.AddrInfo, en return err } - syncer, _, err := s.makeSyncer(peerInfo, false) + hnd := s.getOrCreateHandler(peerInfo.ID) + + syncer, _, err := hnd.makeSyncer(peerInfo, false) if err != nil { return err } log.Debugw("Start entries sync", "peer", peerInfo.ID, "cid", entCid) - // Check for an existing handler for the specified peer (publisher). If - // none, create one if allowed. - hnd := s.getOrCreateHandler(peerInfo.ID) - _, err = hnd.handle(ctx, entCid, sel, syncer, bh, segdl, cid.Undef) if err != nil { return fmt.Errorf("sync handler failed: %w", err) @@ -775,7 +774,9 @@ func delNotPresent(peerStore peerstore.Peerstore, peerID peer.ID, addrs []multia } } -func (s *Subscriber) makeSyncer(peerInfo peer.AddrInfo, doUpdate bool) (Syncer, func(), error) { +func (h *handler) makeSyncer(peerInfo peer.AddrInfo, doUpdate bool) (Syncer, func(), error) { + s := h.subscriber + // Check for an HTTP address in peerAddrs, or if not given, in the http // peerstore. This gives a preference to use ipnisync over dtsync. var httpAddrs []multiaddr.Multiaddr @@ -797,9 +798,12 @@ func (s *Subscriber) makeSyncer(peerInfo peer.AddrInfo, doUpdate bool) (Syncer, ID: peerInfo.ID, Addrs: httpAddrs, } - syncer, err := s.ipniSync.NewSyncer(httpPeerInfo) - if err != nil { - return nil, nil, fmt.Errorf("cannot create ipni-sync handler: %w", err) + if h.syncer == nil || !h.syncer.SameAddrs(httpAddrs) { + syncer, err := s.ipniSync.NewSyncer(httpPeerInfo) + if err != nil { + return nil, nil, fmt.Errorf("cannot create ipni-sync handler: %w", err) + } + h.syncer = syncer } if doUpdate { update = func() { @@ -808,7 +812,7 @@ func (s *Subscriber) makeSyncer(peerInfo peer.AddrInfo, doUpdate bool) (Syncer, s.httpPeerstore.AddAddrs(peerInfo.ID, httpAddrs, s.addrTTL) } } - return syncer, update, nil + return h.syncer, update, nil } if doUpdate { peerStore := s.host.Peerstore() @@ -827,16 +831,20 @@ func (s *Subscriber) makeSyncer(peerInfo peer.AddrInfo, doUpdate bool) (Syncer, } } - syncer, err := s.ipniSync.NewSyncer(peerInfo) - if err != nil { - if errors.Is(err, ipnisync.ErrNoHTTPServer) { - log.Warnw("Using data-transfer sync", "peer", peerInfo.ID, "reason", err.Error()) - // Publisher is libp2p without HTTP, so use the dtSync. - return s.dtSync.NewSyncer(peerInfo.ID, s.topicName), update, nil + if h.syncer == nil || !h.syncer.SameAddrs(peerInfo.Addrs) { + syncer, err := s.ipniSync.NewSyncer(peerInfo) + if err != nil { + if errors.Is(err, ipnisync.ErrNoHTTPServer) { + log.Warnw("Using data-transfer sync", "peer", peerInfo.ID, "reason", err.Error()) + // Publisher is libp2p without HTTP, so use the dtSync. + h.syncer = s.dtSync.NewSyncer(peerInfo, s.topicName) + return h.syncer, update, nil + } + return nil, nil, fmt.Errorf("cannot create ipni-sync handler: %w", err) } - return nil, nil, fmt.Errorf("cannot create ipni-sync handler: %w", err) + h.syncer = syncer } - return syncer, update, nil + return h.syncer, update, nil } // asyncSyncAdChain processes the latest announce message received over pubsub @@ -854,7 +862,7 @@ func (h *handler) asyncSyncAdChain(ctx context.Context) { ID: amsg.PeerID, Addrs: amsg.Addrs, } - syncer, updatePeerstore, err := h.subscriber.makeSyncer(peerInfo, true) + syncer, updatePeerstore, err := h.makeSyncer(peerInfo, true) if err != nil { log.Errorw("Cannot make syncer for announce", "err", err, "peer", h.peerID) return diff --git a/mautil/mautil.go b/mautil/mautil.go index 4d5a9bc..92c703a 100644 --- a/mautil/mautil.go +++ b/mautil/mautil.go @@ -2,7 +2,9 @@ package mautil import ( + "bytes" "net" + "slices" "github.com/libp2p/go-libp2p/core/peer" "github.com/multiformats/go-multiaddr" @@ -85,6 +87,7 @@ func CleanPeerAddrInfo(target peer.AddrInfo) peer.AddrInfo { for i := 0; i < len(target.Addrs); { if target.Addrs[i] == nil { target.Addrs[i] = target.Addrs[len(target.Addrs)-1] + target.Addrs[len(target.Addrs)-1] = nil target.Addrs = target.Addrs[:len(target.Addrs)-1] continue } @@ -92,3 +95,24 @@ func CleanPeerAddrInfo(target peer.AddrInfo) peer.AddrInfo { } return target } + +func MultiaddrsEqual(ma1, ma2 []multiaddr.Multiaddr) bool { + if len(ma1) != len(ma2) { + return false + } + if len(ma1) == 0 { + return true // both are nil or empty + } + if len(ma1) == 1 { + return ma1[0].Equal(ma2[0]) + } + // Use slices package, as sort function does not allocate (sort.Slice does). + slices.SortFunc(ma1, func(a, b multiaddr.Multiaddr) int { return bytes.Compare(a.Bytes(), b.Bytes()) }) + slices.SortFunc(ma2, func(a, b multiaddr.Multiaddr) int { return bytes.Compare(a.Bytes(), b.Bytes()) }) + for i := 0; i < len(ma1); i++ { + if !ma1[i].Equal(ma2[i]) { + return false + } + } + return true +} diff --git a/mautil/mautil_test.go b/mautil/mautil_test.go index 59c8479..3eb61a3 100644 --- a/mautil/mautil_test.go +++ b/mautil/mautil_test.go @@ -125,3 +125,54 @@ func TestCleanPeerAddrInfo(t *testing.T) { require.ElementsMatch(t, goodAddrs, cleaned.Addrs) }) } + +func TestMultiaddrsEqual(t *testing.T) { + maddrs, err := mautil.StringsToMultiaddrs([]string{ + "/ip4/11.0.0.0/tcp/80/http", + "/ip6/fc00::/tcp/1717", + "/ip6/fe00::/tcp/8080/https", + "/dns4/example.net/tcp/1234", + }) + require.NoError(t, err) + rev := make([]multiaddr.Multiaddr, len(maddrs)) + j := len(maddrs) - 1 + for i := 0; i <= j; i++ { + rev[i] = maddrs[j-i] + } + m1 := make([]multiaddr.Multiaddr, len(maddrs)) + m2 := make([]multiaddr.Multiaddr, len(maddrs)) + + copy(m1, maddrs) + copy(m2, rev) + require.True(t, mautil.MultiaddrsEqual(m1, m2)) + + copy(m1, maddrs) + copy(m2, rev) + require.Truef(t, mautil.MultiaddrsEqual(m1[1:], m2[:len(m2)-1]), "m1=%v, m2=%v", m1[1:], m2[:len(m2)-1]) + + copy(m1, maddrs) + copy(m2, rev) + require.True(t, mautil.MultiaddrsEqual(m1[2:3], m2[1:2])) + + copy(m1, maddrs) + copy(m2, rev) + require.True(t, mautil.MultiaddrsEqual(m1[:0], m2[:0])) + + require.True(t, mautil.MultiaddrsEqual(nil, nil)) + + copy(m1, maddrs) + copy(m2, rev) + require.True(t, mautil.MultiaddrsEqual(m1[:0], nil)) + + copy(m1, maddrs) + copy(m2, rev) + require.False(t, mautil.MultiaddrsEqual(m1[1:], m2[1:])) + + copy(m1, maddrs) + copy(m2, rev) + require.False(t, mautil.MultiaddrsEqual(m1, m2[:len(m2)-1])) + + copy(m1, maddrs) + copy(m2, rev) + require.False(t, mautil.MultiaddrsEqual(m1[:1], m2[:1])) +}