From 6a234604b6b428998938a144a78f08877e317caa Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Tue, 3 Sep 2024 20:14:55 -0700 Subject: [PATCH] review changes --- dagsync/announce_test.go | 4 ++-- dagsync/ipnisync/publisher.go | 20 +++++++++++++++++++- dagsync/option.go | 9 +++++++++ dagsync/subscriber.go | 31 ++++++++++++++++++++++--------- 4 files changed, 52 insertions(+), 12 deletions(-) diff --git a/dagsync/announce_test.go b/dagsync/announce_test.go index 1740a0d..6db216c 100644 --- a/dagsync/announce_test.go +++ b/dagsync/announce_test.go @@ -34,7 +34,7 @@ func TestAnnounceReplace(t *testing.T) { } sub, err := dagsync.NewSubscriber(dstHost, dstLnkS, dagsync.RecvAnnounce(testTopic), - dagsync.BlockHook(blockHook)) + dagsync.BlockHook(blockHook), dagsync.WithCidSchemaHint(false)) require.NoError(t, err) defer sub.Close() @@ -377,7 +377,7 @@ func initPubSub(t *testing.T, srcStore, dstStore datastore.Batching, allowPeer f dstHost.Peerstore().AddAddrs(srcHost.ID(), srcHost.Addrs(), time.Hour) dstLnkS := test.MkLinkSystem(dstStore) - sub, err := dagsync.NewSubscriber(dstHost, dstLnkS, + sub, err := dagsync.NewSubscriber(dstHost, dstLnkS, dagsync.WithCidSchemaHint(false), dagsync.RecvAnnounce(testTopic, announce.WithTopic(topics[1]), announce.WithAllowPeer(allowPeer))) require.NoError(t, err) diff --git a/dagsync/ipnisync/publisher.go b/dagsync/ipnisync/publisher.go index f8b635a..7f46185 100644 --- a/dagsync/ipnisync/publisher.go +++ b/dagsync/ipnisync/publisher.go @@ -12,9 +12,11 @@ import ( "github.com/ipfs/go-cid" "github.com/ipld/go-ipld-prime" "github.com/ipld/go-ipld-prime/codec/dagjson" + ipldmodel "github.com/ipld/go-ipld-prime/datamodel" cidlink "github.com/ipld/go-ipld-prime/linking/cid" basicnode "github.com/ipld/go-ipld-prime/node/basic" headschema "github.com/ipni/go-libipni/dagsync/ipnisync/head" + "github.com/ipni/go-libipni/ingest/schema" "github.com/ipni/go-libipni/maurl" ic "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/peer" @@ -226,13 +228,29 @@ func (p *Publisher) ServeHTTP(w http.ResponseWriter, r *http.Request) { ipldCtx := ipld.LinkContext{} reqType := r.Header.Get(CidSchemaHeader) if reqType != "" { + log.Debug("sync request has cid schema type hint", "hint", reqType) ipldCtx.Ctx, err = CtxWithCidSchema(ipldCtx.Ctx, reqType) if err != nil { + // Log warning about unknown cid schema type, but continue on since + // the linksystem might recognize it. log.Warnw(err.Error(), "value", reqType) } } - item, err := p.lsys.Load(ipldCtx, cidlink.Link{Cid: c}, basicnode.Prototype.Any) + var ipldProto ipldmodel.NodePrototype + + switch reqType { + case CidSchemaAdvertisement: + ipldProto = schema.AdvertisementPrototype + case CidSchemaEntryChunk: + ipldProto = schema.EntryChunkPrototype + default: + ipldProto = basicnode.Prototype.Any + } + + //ipldProto = basicnode.Prototype.Any + + item, err := p.lsys.Load(ipldCtx, cidlink.Link{Cid: c}, ipldProto) if err != nil { if errors.Is(err, ipld.ErrNotExists{}) { http.Error(w, "cid not found", http.StatusNotFound) diff --git a/dagsync/option.go b/dagsync/option.go index f031481..e5dafe8 100644 --- a/dagsync/option.go +++ b/dagsync/option.go @@ -48,6 +48,7 @@ type config struct { firstSyncDepth int64 segDepthLimit int64 + cidSchemaHint bool strictAdsSelSeq bool httpTimeout time.Duration @@ -66,6 +67,7 @@ func getOpts(opts []Option) (config, error) { httpTimeout: defaultHttpTimeout, idleHandlerTTL: defaultIdleHandlerTTL, segDepthLimit: defaultSegDepthLimit, + cidSchemaHint: true, strictAdsSelSeq: true, } @@ -339,3 +341,10 @@ func MakeGeneralBlockHook(prevAdCid func(adCid cid.Cid) (cid.Cid, error)) BlockH } } } + +func WithCidSchemaHint(enable bool) Option { + return func(c *config) error { + c.cidSchemaHint = enable + return nil + } +} diff --git a/dagsync/subscriber.go b/dagsync/subscriber.go index e05e50e..d0a93b6 100644 --- a/dagsync/subscriber.go +++ b/dagsync/subscriber.go @@ -118,6 +118,10 @@ type Subscriber struct { // specifies the selection sequence itself. adsSelectorSeq ipld.Node + // cidSchemaHint enables sending the cid schema type hint as + // an HTTP header in sync requests. + cidSchemaHint bool + // selectorOne selects one multihash entries or HAMT block. selectorOne ipld.Node // selectorAll selects all multihash HAMT blocks. @@ -236,6 +240,8 @@ func NewSubscriber(host host.Host, lsys ipld.LinkSystem, options ...Option) (*Su ssb.ExploreFields(func(efsb builder.ExploreFieldsSpecBuilder) { efsb.Insert("Next", ssb.ExploreRecursiveEdge()) // Next field in EntryChunk })).Node(), + + cidSchemaHint: opts.cidSchemaHint, } if opts.strictAdsSelSeq { @@ -244,6 +250,7 @@ func NewSubscriber(host host.Host, lsys ipld.LinkSystem, options ...Option) (*Su }).Node() } else { s.adsSelectorSeq = ssb.ExploreAll(ssb.ExploreRecursiveEdge()).Node() + s.cidSchemaHint = false } if opts.hasRcvr { @@ -488,9 +495,11 @@ func (s *Subscriber) SyncAdChain(ctx context.Context, peerInfo peer.AddrInfo, op sel := ExploreRecursiveWithStopNode(depthLimit, s.adsSelectorSeq, stopLnk) - ctx, err = ipnisync.CtxWithCidSchema(ctx, ipnisync.CidSchemaAdvertisement) - if err != nil { - panic(err.Error()) + if s.cidSchemaHint { + ctx, err = ipnisync.CtxWithCidSchema(ctx, ipnisync.CidSchemaAdvertisement) + if err != nil { + panic(err.Error()) + } } syncCount, err := hnd.handle(ctx, nextCid, sel, syncer, opts.blockHook, segdl, stopAtCid) if err != nil { @@ -575,9 +584,11 @@ func (s *Subscriber) syncEntries(ctx context.Context, peerInfo peer.AddrInfo, en log.Debugw("Start entries sync", "peer", peerInfo.ID, "cid", entCid) - ctx, err = ipnisync.CtxWithCidSchema(ctx, ipnisync.CidSchemaEntryChunk) - if err != nil { - panic(err.Error()) + if s.cidSchemaHint { + ctx, err = ipnisync.CtxWithCidSchema(ctx, ipnisync.CidSchemaEntryChunk) + if err != nil { + panic(err.Error()) + } } _, err = hnd.handle(ctx, entCid, sel, syncer, bh, segdl, cid.Undef) if err != nil { @@ -880,9 +891,11 @@ func (h *handler) asyncSyncAdChain(ctx context.Context) { return } - ctx, err = ipnisync.CtxWithCidSchema(ctx, ipnisync.CidSchemaAdvertisement) - if err != nil { - panic(err.Error()) + if h.subscriber.cidSchemaHint { + ctx, err = ipnisync.CtxWithCidSchema(ctx, ipnisync.CidSchemaAdvertisement) + if err != nil { + panic(err.Error()) + } } sel := ExploreRecursiveWithStopNode(adsDepthLimit, h.subscriber.adsSelectorSeq, latestSyncLink) syncCount, err := h.handle(ctx, nextCid, sel, syncer, h.subscriber.generalBlockHook, h.subscriber.segDepthLimit, stopAtCid)