From fe0831e1df3e0f7f50d35308cbab1f14183423f6 Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Fri, 30 Aug 2024 11:57:34 -0700 Subject: [PATCH 1/5] Include optional Ipni-Cid-Schema-Type HTTP header This optional header, when present, serves as an indication to advertisement publishers what type of data is being requested and is identified by the CID. This may help some publishers more quickly lookup the data. The publisher, who receives the Ipni-Cid-Schema-Type HTTP header, does not validate the value, because newer values may need to be received by consumer that is using an older version of library. Implements fix for https://github.com/ipni/storetheindex/issues/2662 --- dagsync/ipnisync/cid_schema_hint.go | 24 ++++++++++++ dagsync/ipnisync/publisher.go | 14 ++++++- dagsync/ipnisync/sync.go | 16 ++++++++ dagsync/ipnisync/sync_test.go | 59 +++++++++++++++++++++++++++++ dagsync/subscriber.go | 3 ++ dagsync/test/util.go | 8 +++- 6 files changed, 121 insertions(+), 3 deletions(-) create mode 100644 dagsync/ipnisync/cid_schema_hint.go diff --git a/dagsync/ipnisync/cid_schema_hint.go b/dagsync/ipnisync/cid_schema_hint.go new file mode 100644 index 0000000..839f2e1 --- /dev/null +++ b/dagsync/ipnisync/cid_schema_hint.go @@ -0,0 +1,24 @@ +package ipnisync + +const ( + // CidSchemaHeader is the HTTP header used as an optional hint about the + // type of data requested by a CID. + CidSchemaHeader = "Ipni-Cid-Schema-Type" + // CidSchemaAd is a value for the CidSchemaHeader specifying advertiesement + // data is being requested. + CidSchemaAd = "advertisement" + // CidSchemaEntries is a value for the CidSchemaHeader specifying + // advertisement entries (multihash chunks) data is being requested. + CidSchemaEntries = "entries" +) + +// cidSchemaTypeKey is the type used for the key of CidSchemaHeader when set as +// a context value. +type cidSchemaTypeCtxKey string + +// CidSchemaCtxKey is used as the key when creating a context with a value or extracting the cid schema from a context. Examples: +// +// ctx := context.WithValue(ctx, CidSchemaCtxKey, CidSchemaAd) +// +// cidSchemaType, ok := ctx.Value(CidSchemaCtxKey).(string) +const CidSchemaCtxKey cidSchemaTypeCtxKey = CidSchemaHeader diff --git a/dagsync/ipnisync/publisher.go b/dagsync/ipnisync/publisher.go index 358f01e..0f0297e 100644 --- a/dagsync/ipnisync/publisher.go +++ b/dagsync/ipnisync/publisher.go @@ -1,6 +1,7 @@ package ipnisync import ( + "context" "errors" "fmt" "net/http" @@ -41,6 +42,10 @@ var _ http.Handler = (*Publisher)(nil) // NewPublisher creates a new ipni-sync publisher. Optionally, a libp2p stream // host can be provided to serve HTTP over libp2p. +// +// If the publisher receives a request that contains a valid CidSchemaHeader +// header, then the ipld.Context passed to the lsys Load function contains a +// context that has that header's value stored under the CidSchemaCtxKey key. func NewPublisher(lsys ipld.LinkSystem, privKey ic.PrivKey, options ...Option) (*Publisher, error) { opts, err := getOpts(options) if err != nil { @@ -218,7 +223,14 @@ func (p *Publisher) ServeHTTP(w http.ResponseWriter, r *http.Request) { http.Error(w, "invalid request: not a cid", http.StatusBadRequest) return } - item, err := p.lsys.Load(ipld.LinkContext{}, cidlink.Link{Cid: c}, basicnode.Prototype.Any) + + ipldCtx := ipld.LinkContext{} + reqType := r.Header.Get(CidSchemaHeader) + if reqType != "" { + ipldCtx.Ctx = context.WithValue(context.Background(), CidSchemaCtxKey, reqType) + } + + item, err := p.lsys.Load(ipldCtx, cidlink.Link{Cid: c}, basicnode.Prototype.Any) if err != nil { if errors.Is(err, ipld.ErrNotExists{}) { http.Error(w, "cid not found", http.StatusNotFound) diff --git a/dagsync/ipnisync/sync.go b/dagsync/ipnisync/sync.go index e4e18e4..fff68d2 100644 --- a/dagsync/ipnisync/sync.go +++ b/dagsync/ipnisync/sync.go @@ -226,6 +226,16 @@ func (s *Syncer) Sync(ctx context.Context, nextCid cid.Cid, sel ipld.Node) error return fmt.Errorf("failed to compile selector: %w", err) } + // Check for valid cid schema type if set. + cidSchemaType, ok := ctx.Value(CidSchemaCtxKey).(string) + if ok { + switch cidSchemaType { + case CidSchemaAd, CidSchemaEntries: + default: + return fmt.Errorf("invalid cid schema type value: %s", cidSchemaType) + } + } + cids, err := s.walkFetch(ctx, nextCid, xsel) if err != nil { return fmt.Errorf("failed to traverse requested dag: %w", err) @@ -307,6 +317,12 @@ retry: return err } + // Value already checked in Sync. + reqType, ok := ctx.Value(CidSchemaCtxKey).(string) + if ok { + req.Header.Set(CidSchemaHeader, reqType) + } + resp, err := s.client.Do(req) if err != nil { if len(s.urls) != 0 { diff --git a/dagsync/ipnisync/sync_test.go b/dagsync/ipnisync/sync_test.go index c748b27..055e02f 100644 --- a/dagsync/ipnisync/sync_test.go +++ b/dagsync/ipnisync/sync_test.go @@ -230,3 +230,62 @@ func TestIPNIsync_NotFoundReturnsContentNotFoundErr(t *testing.T) { require.NotNil(t, err) require.Contains(t, err.Error(), "content not found") } + +func TestRequestTypeHint(t *testing.T) { + pubPrK, _, err := crypto.GenerateKeyPairWithReader(crypto.RSA, 2048, rand.Reader) + require.NoError(t, err) + pubID, err := peer.IDFromPrivateKey(pubPrK) + require.NoError(t, err) + + var lastReqTypeHint string + + // Instantiate a dagsync publisher. + publs := cidlink.DefaultLinkSystem() + + publs.StorageReadOpener = func(lnkCtx linking.LinkContext, lnk datamodel.Link) (io.Reader, error) { + if lnkCtx.Ctx != nil { + hint, ok := lnkCtx.Ctx.Value(ipnisync.CidSchemaCtxKey).(string) + require.True(t, ok) + require.NotEmpty(t, hint) + lastReqTypeHint = hint + t.Log("Request type hint:", hint) + } else { + lastReqTypeHint = "" + } + + require.NotEmpty(t, lastReqTypeHint, "missing expected context value") + return nil, ipld.ErrNotExists{} + } + + pub, err := ipnisync.NewPublisher(publs, pubPrK, ipnisync.WithHTTPListenAddrs("0.0.0.0:0")) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, pub.Close()) }) + + ls := cidlink.DefaultLinkSystem() + store := &memstore.Store{} + ls.SetWriteStorage(store) + ls.SetReadStorage(store) + + sync := ipnisync.NewSync(ls, nil) + pubInfo := peer.AddrInfo{ + ID: pubID, + Addrs: pub.Addrs(), + } + syncer, err := sync.NewSyncer(pubInfo) + require.NoError(t, err) + + testCid, err := cid.Decode(sampleNFTStorageCid) + require.NoError(t, err) + + ctx := context.WithValue(context.Background(), ipnisync.CidSchemaCtxKey, ipnisync.CidSchemaAd) + _ = syncer.Sync(ctx, testCid, selectorparse.CommonSelector_MatchPoint) + require.Equal(t, ipnisync.CidSchemaAd, lastReqTypeHint) + + ctx = context.WithValue(context.Background(), ipnisync.CidSchemaCtxKey, ipnisync.CidSchemaEntries) + _ = syncer.Sync(ctx, testCid, selectorparse.CommonSelector_MatchPoint) + require.Equal(t, ipnisync.CidSchemaEntries, lastReqTypeHint) + + ctx = context.WithValue(context.Background(), ipnisync.CidSchemaCtxKey, "bad") + err = syncer.Sync(ctx, testCid, selectorparse.CommonSelector_MatchPoint) + require.ErrorContains(t, err, "invalid cid schema type value") +} diff --git a/dagsync/subscriber.go b/dagsync/subscriber.go index 4170ad9..1c64335 100644 --- a/dagsync/subscriber.go +++ b/dagsync/subscriber.go @@ -488,6 +488,7 @@ func (s *Subscriber) SyncAdChain(ctx context.Context, peerInfo peer.AddrInfo, op sel := ExploreRecursiveWithStopNode(depthLimit, s.adsSelectorSeq, stopLnk) + ctx = context.WithValue(ctx, ipnisync.CidSchemaCtxKey, ipnisync.CidSchemaAd) syncCount, err := hnd.handle(ctx, nextCid, sel, syncer, opts.blockHook, segdl, stopAtCid) if err != nil { return cid.Undef, fmt.Errorf("sync handler failed: %w", err) @@ -571,6 +572,7 @@ func (s *Subscriber) syncEntries(ctx context.Context, peerInfo peer.AddrInfo, en log.Debugw("Start entries sync", "peer", peerInfo.ID, "cid", entCid) + ctx = context.WithValue(ctx, ipnisync.CidSchemaCtxKey, ipnisync.CidSchemaEntries) _, err = hnd.handle(ctx, entCid, sel, syncer, bh, segdl, cid.Undef) if err != nil { return fmt.Errorf("sync handler failed: %w", err) @@ -872,6 +874,7 @@ func (h *handler) asyncSyncAdChain(ctx context.Context) { return } + ctx = context.WithValue(ctx, ipnisync.CidSchemaCtxKey, ipnisync.CidSchemaAd) sel := ExploreRecursiveWithStopNode(adsDepthLimit, h.subscriber.adsSelectorSeq, latestSyncLink) syncCount, err := h.handle(ctx, nextCid, sel, syncer, h.subscriber.generalBlockHook, h.subscriber.segDepthLimit, stopAtCid) if err != nil { diff --git a/dagsync/test/util.go b/dagsync/test/util.go index aa4bd8c..4e62f48 100644 --- a/dagsync/test/util.go +++ b/dagsync/test/util.go @@ -170,8 +170,12 @@ func encode(lsys ipld.LinkSystem, n ipld.Node) (ipld.Node, ipld.Link) { func MkLinkSystem(ds datastore.Batching) ipld.LinkSystem { lsys := cidlink.DefaultLinkSystem() - lsys.StorageReadOpener = func(_ ipld.LinkContext, lnk ipld.Link) (io.Reader, error) { - val, err := ds.Get(context.Background(), datastore.NewKey(lnk.String())) + lsys.StorageReadOpener = func(ipldCtx ipld.LinkContext, lnk ipld.Link) (io.Reader, error) { + ctx := ipldCtx.Ctx + if ctx == nil { + ctx = context.Background() + } + val, err := ds.Get(ctx, datastore.NewKey(lnk.String())) if err != nil { return nil, err } From 57beee1a1783dab28f57a3046c70cc1cc3a4040a Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Tue, 3 Sep 2024 16:55:48 -0700 Subject: [PATCH 2/5] Review changes --- dagsync/ipnisync/cid_schema_hint.go | 69 +++++++++++++++++++++--- dagsync/ipnisync/cid_schema_hint_test.go | 50 +++++++++++++++++ dagsync/ipnisync/publisher.go | 8 +-- dagsync/ipnisync/sync.go | 16 +++--- dagsync/ipnisync/sync_test.go | 20 +++---- dagsync/subscriber.go | 15 ++++-- 6 files changed, 145 insertions(+), 33 deletions(-) create mode 100644 dagsync/ipnisync/cid_schema_hint_test.go diff --git a/dagsync/ipnisync/cid_schema_hint.go b/dagsync/ipnisync/cid_schema_hint.go index 839f2e1..5062040 100644 --- a/dagsync/ipnisync/cid_schema_hint.go +++ b/dagsync/ipnisync/cid_schema_hint.go @@ -1,24 +1,77 @@ package ipnisync +import ( + "context" + "errors" +) + const ( // CidSchemaHeader is the HTTP header used as an optional hint about the // type of data requested by a CID. CidSchemaHeader = "Ipni-Cid-Schema-Type" - // CidSchemaAd is a value for the CidSchemaHeader specifying advertiesement - // data is being requested. - CidSchemaAd = "advertisement" + // CidSchemaAdvertisement is a value for the CidSchemaHeader specifying + // advertiesement data is being requested. Referrs to Advertisement in + // https://github.com/ipni/go-libipni/blob/main/ingest/schema/schema.ipldsch + CidSchemaAdvertisement = "Advertisement" // CidSchemaEntries is a value for the CidSchemaHeader specifying // advertisement entries (multihash chunks) data is being requested. - CidSchemaEntries = "entries" + // Referrs to Entry chunk in + // https://github.com/ipni/go-libipni/blob/main/ingest/schema/schema.ipldsch + CidSchemaEntryChunk = "EntryChunk" ) +var ErrUnknownCidSchema = errors.New("unknown cid schema type value") + // cidSchemaTypeKey is the type used for the key of CidSchemaHeader when set as // a context value. type cidSchemaTypeCtxKey string -// CidSchemaCtxKey is used as the key when creating a context with a value or extracting the cid schema from a context. Examples: +// cidSchemaCtxKey is used to get the key used to store or extract the cid +// schema value in a context. +const cidSchemaCtxKey cidSchemaTypeCtxKey = CidSchemaHeader + +// CidSchemaFromCtx extracts the CID schema name from the context. If the +// scheam value is not set, then returns "". If the schema value is set, but is +// not recognized, then ErrUnknownCidSchema is returned along with the value. // -// ctx := context.WithValue(ctx, CidSchemaCtxKey, CidSchemaAd) +// Returning unrecognized values with an error allows consumers to retrieved +// newer values that are not recognized by an older version of this library. +func CidSchemaFromCtx(ctx context.Context) (string, error) { + if ctx == nil { + return "", nil + } + cidSchemaType, ok := ctx.Value(cidSchemaCtxKey).(string) + if !ok { + return "", nil + } + + var err error + switch cidSchemaType { + case CidSchemaAdvertisement, CidSchemaEntryChunk: + default: + err = ErrUnknownCidSchema + } + return cidSchemaType, err +} + +// CtxWithCidSchema creates a derived context that has the specified value for +// the CID schema type. // -// cidSchemaType, ok := ctx.Value(CidSchemaCtxKey).(string) -const CidSchemaCtxKey cidSchemaTypeCtxKey = CidSchemaHeader +// Setting an unrecognized value, even when an error is retruned, allows +// producers to set context values that are not recognized by an older version +// of this library. +func CtxWithCidSchema(ctx context.Context, cidSchemaType string) (context.Context, error) { + if cidSchemaType == "" { + return ctx, nil + } + if ctx == nil { + ctx = context.Background() + } + var err error + switch cidSchemaType { + case CidSchemaAdvertisement, CidSchemaEntryChunk: + default: + err = ErrUnknownCidSchema + } + return context.WithValue(ctx, cidSchemaCtxKey, cidSchemaType), err +} diff --git a/dagsync/ipnisync/cid_schema_hint_test.go b/dagsync/ipnisync/cid_schema_hint_test.go new file mode 100644 index 0000000..b7fde9f --- /dev/null +++ b/dagsync/ipnisync/cid_schema_hint_test.go @@ -0,0 +1,50 @@ +package ipnisync_test + +import ( + "context" + "testing" + + "github.com/ipni/go-libipni/dagsync/ipnisync" + "github.com/stretchr/testify/require" +) + +func TestCtxWithCidSchema(t *testing.T) { + ctx, err := ipnisync.CtxWithCidSchema(nil, "") + require.NoError(t, err) + require.Nil(t, ctx) + + ctxOrig := context.Background() + ctx, err = ipnisync.CtxWithCidSchema(ctxOrig, "") + require.NoError(t, err) + require.Equal(t, ctxOrig, ctx) + + ctx, err = ipnisync.CtxWithCidSchema(ctxOrig, ipnisync.CidSchemaAdvertisement) + require.NoError(t, err) + require.NotEqual(t, ctxOrig, ctx) + + value, err := ipnisync.CidSchemaFromCtx(ctx) + require.NoError(t, err) + require.Equal(t, ipnisync.CidSchemaAdvertisement, value) + + ctx, err = ipnisync.CtxWithCidSchema(nil, ipnisync.CidSchemaEntryChunk) + require.NoError(t, err) + value, err = ipnisync.CidSchemaFromCtx(ctx) + require.NoError(t, err) + require.Equal(t, ipnisync.CidSchemaEntryChunk, value) + + value, err = ipnisync.CidSchemaFromCtx(ctxOrig) + require.NoError(t, err) + require.Empty(t, value) + + const unknownVal = "unknown" + + // Setting unknown value returns error as well as context with value. + ctx, err = ipnisync.CtxWithCidSchema(ctxOrig, unknownVal) + require.ErrorIs(t, err, ipnisync.ErrUnknownCidSchema) + require.NotNil(t, ctxOrig, ctx) + + // Getting unknown value returns error as well as value. + value, err = ipnisync.CidSchemaFromCtx(ctx) + require.ErrorIs(t, err, ipnisync.ErrUnknownCidSchema) + require.Equal(t, unknownVal, value) +} diff --git a/dagsync/ipnisync/publisher.go b/dagsync/ipnisync/publisher.go index 0f0297e..f8b635a 100644 --- a/dagsync/ipnisync/publisher.go +++ b/dagsync/ipnisync/publisher.go @@ -1,7 +1,6 @@ package ipnisync import ( - "context" "errors" "fmt" "net/http" @@ -45,7 +44,7 @@ var _ http.Handler = (*Publisher)(nil) // // If the publisher receives a request that contains a valid CidSchemaHeader // header, then the ipld.Context passed to the lsys Load function contains a -// context that has that header's value stored under the CidSchemaCtxKey key. +// context that has that header's value retrievable with CidSchemaFromCtx. func NewPublisher(lsys ipld.LinkSystem, privKey ic.PrivKey, options ...Option) (*Publisher, error) { opts, err := getOpts(options) if err != nil { @@ -227,7 +226,10 @@ func (p *Publisher) ServeHTTP(w http.ResponseWriter, r *http.Request) { ipldCtx := ipld.LinkContext{} reqType := r.Header.Get(CidSchemaHeader) if reqType != "" { - ipldCtx.Ctx = context.WithValue(context.Background(), CidSchemaCtxKey, reqType) + ipldCtx.Ctx, err = CtxWithCidSchema(ipldCtx.Ctx, reqType) + if err != nil { + log.Warnw(err.Error(), "value", reqType) + } } item, err := p.lsys.Load(ipldCtx, cidlink.Link{Cid: c}, basicnode.Prototype.Any) diff --git a/dagsync/ipnisync/sync.go b/dagsync/ipnisync/sync.go index fff68d2..ceb8587 100644 --- a/dagsync/ipnisync/sync.go +++ b/dagsync/ipnisync/sync.go @@ -227,13 +227,9 @@ func (s *Syncer) Sync(ctx context.Context, nextCid cid.Cid, sel ipld.Node) error } // Check for valid cid schema type if set. - cidSchemaType, ok := ctx.Value(CidSchemaCtxKey).(string) - if ok { - switch cidSchemaType { - case CidSchemaAd, CidSchemaEntries: - default: - return fmt.Errorf("invalid cid schema type value: %s", cidSchemaType) - } + _, err = CidSchemaFromCtx(ctx) + if err != nil { + return err } cids, err := s.walkFetch(ctx, nextCid, xsel) @@ -317,9 +313,9 @@ retry: return err } - // Value already checked in Sync. - reqType, ok := ctx.Value(CidSchemaCtxKey).(string) - if ok { + // Error already checked in Sync. + reqType, _ := CidSchemaFromCtx(ctx) + if reqType != "" { req.Header.Set(CidSchemaHeader, reqType) } diff --git a/dagsync/ipnisync/sync_test.go b/dagsync/ipnisync/sync_test.go index 055e02f..18606e1 100644 --- a/dagsync/ipnisync/sync_test.go +++ b/dagsync/ipnisync/sync_test.go @@ -244,11 +244,10 @@ func TestRequestTypeHint(t *testing.T) { publs.StorageReadOpener = func(lnkCtx linking.LinkContext, lnk datamodel.Link) (io.Reader, error) { if lnkCtx.Ctx != nil { - hint, ok := lnkCtx.Ctx.Value(ipnisync.CidSchemaCtxKey).(string) - require.True(t, ok) + hint, err := ipnisync.CidSchemaFromCtx(lnkCtx.Ctx) + require.NoError(t, err) require.NotEmpty(t, hint) lastReqTypeHint = hint - t.Log("Request type hint:", hint) } else { lastReqTypeHint = "" } @@ -277,15 +276,18 @@ func TestRequestTypeHint(t *testing.T) { testCid, err := cid.Decode(sampleNFTStorageCid) require.NoError(t, err) - ctx := context.WithValue(context.Background(), ipnisync.CidSchemaCtxKey, ipnisync.CidSchemaAd) + ctx, err := ipnisync.CtxWithCidSchema(context.Background(), ipnisync.CidSchemaAdvertisement) + require.NoError(t, err) _ = syncer.Sync(ctx, testCid, selectorparse.CommonSelector_MatchPoint) - require.Equal(t, ipnisync.CidSchemaAd, lastReqTypeHint) + require.Equal(t, ipnisync.CidSchemaAdvertisement, lastReqTypeHint) - ctx = context.WithValue(context.Background(), ipnisync.CidSchemaCtxKey, ipnisync.CidSchemaEntries) + ctx, err = ipnisync.CtxWithCidSchema(context.Background(), ipnisync.CidSchemaEntryChunk) + require.NoError(t, err) _ = syncer.Sync(ctx, testCid, selectorparse.CommonSelector_MatchPoint) - require.Equal(t, ipnisync.CidSchemaEntries, lastReqTypeHint) + require.Equal(t, ipnisync.CidSchemaEntryChunk, lastReqTypeHint) - ctx = context.WithValue(context.Background(), ipnisync.CidSchemaCtxKey, "bad") + ctx, err = ipnisync.CtxWithCidSchema(context.Background(), "bad") + require.ErrorIs(t, err, ipnisync.ErrUnknownCidSchema) err = syncer.Sync(ctx, testCid, selectorparse.CommonSelector_MatchPoint) - require.ErrorContains(t, err, "invalid cid schema type value") + require.ErrorIs(t, err, ipnisync.ErrUnknownCidSchema) } diff --git a/dagsync/subscriber.go b/dagsync/subscriber.go index 1c64335..e05e50e 100644 --- a/dagsync/subscriber.go +++ b/dagsync/subscriber.go @@ -488,7 +488,10 @@ func (s *Subscriber) SyncAdChain(ctx context.Context, peerInfo peer.AddrInfo, op sel := ExploreRecursiveWithStopNode(depthLimit, s.adsSelectorSeq, stopLnk) - ctx = context.WithValue(ctx, ipnisync.CidSchemaCtxKey, ipnisync.CidSchemaAd) + ctx, err = ipnisync.CtxWithCidSchema(ctx, ipnisync.CidSchemaAdvertisement) + if err != nil { + panic(err.Error()) + } syncCount, err := hnd.handle(ctx, nextCid, sel, syncer, opts.blockHook, segdl, stopAtCid) if err != nil { return cid.Undef, fmt.Errorf("sync handler failed: %w", err) @@ -572,7 +575,10 @@ func (s *Subscriber) syncEntries(ctx context.Context, peerInfo peer.AddrInfo, en log.Debugw("Start entries sync", "peer", peerInfo.ID, "cid", entCid) - ctx = context.WithValue(ctx, ipnisync.CidSchemaCtxKey, ipnisync.CidSchemaEntries) + ctx, err = ipnisync.CtxWithCidSchema(ctx, ipnisync.CidSchemaEntryChunk) + if err != nil { + panic(err.Error()) + } _, err = hnd.handle(ctx, entCid, sel, syncer, bh, segdl, cid.Undef) if err != nil { return fmt.Errorf("sync handler failed: %w", err) @@ -874,7 +880,10 @@ func (h *handler) asyncSyncAdChain(ctx context.Context) { return } - ctx = context.WithValue(ctx, ipnisync.CidSchemaCtxKey, ipnisync.CidSchemaAd) + ctx, err = ipnisync.CtxWithCidSchema(ctx, ipnisync.CidSchemaAdvertisement) + if err != nil { + panic(err.Error()) + } sel := ExploreRecursiveWithStopNode(adsDepthLimit, h.subscriber.adsSelectorSeq, latestSyncLink) syncCount, err := h.handle(ctx, nextCid, sel, syncer, h.subscriber.generalBlockHook, h.subscriber.segDepthLimit, stopAtCid) if err != nil { From bbbeb76664a8ca2a774187e63125752d14edb5cf Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Tue, 3 Sep 2024 20:14:55 -0700 Subject: [PATCH 3/5] review changes --- dagsync/announce_test.go | 4 ++-- dagsync/ipnisync/publisher.go | 20 +++++++++++++++++++- dagsync/option.go | 9 +++++++++ dagsync/subscriber.go | 31 ++++++++++++++++++++++--------- 4 files changed, 52 insertions(+), 12 deletions(-) diff --git a/dagsync/announce_test.go b/dagsync/announce_test.go index 1740a0d..6db216c 100644 --- a/dagsync/announce_test.go +++ b/dagsync/announce_test.go @@ -34,7 +34,7 @@ func TestAnnounceReplace(t *testing.T) { } sub, err := dagsync.NewSubscriber(dstHost, dstLnkS, dagsync.RecvAnnounce(testTopic), - dagsync.BlockHook(blockHook)) + dagsync.BlockHook(blockHook), dagsync.WithCidSchemaHint(false)) require.NoError(t, err) defer sub.Close() @@ -377,7 +377,7 @@ func initPubSub(t *testing.T, srcStore, dstStore datastore.Batching, allowPeer f dstHost.Peerstore().AddAddrs(srcHost.ID(), srcHost.Addrs(), time.Hour) dstLnkS := test.MkLinkSystem(dstStore) - sub, err := dagsync.NewSubscriber(dstHost, dstLnkS, + sub, err := dagsync.NewSubscriber(dstHost, dstLnkS, dagsync.WithCidSchemaHint(false), dagsync.RecvAnnounce(testTopic, announce.WithTopic(topics[1]), announce.WithAllowPeer(allowPeer))) require.NoError(t, err) diff --git a/dagsync/ipnisync/publisher.go b/dagsync/ipnisync/publisher.go index f8b635a..7f46185 100644 --- a/dagsync/ipnisync/publisher.go +++ b/dagsync/ipnisync/publisher.go @@ -12,9 +12,11 @@ import ( "github.com/ipfs/go-cid" "github.com/ipld/go-ipld-prime" "github.com/ipld/go-ipld-prime/codec/dagjson" + ipldmodel "github.com/ipld/go-ipld-prime/datamodel" cidlink "github.com/ipld/go-ipld-prime/linking/cid" basicnode "github.com/ipld/go-ipld-prime/node/basic" headschema "github.com/ipni/go-libipni/dagsync/ipnisync/head" + "github.com/ipni/go-libipni/ingest/schema" "github.com/ipni/go-libipni/maurl" ic "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/peer" @@ -226,13 +228,29 @@ func (p *Publisher) ServeHTTP(w http.ResponseWriter, r *http.Request) { ipldCtx := ipld.LinkContext{} reqType := r.Header.Get(CidSchemaHeader) if reqType != "" { + log.Debug("sync request has cid schema type hint", "hint", reqType) ipldCtx.Ctx, err = CtxWithCidSchema(ipldCtx.Ctx, reqType) if err != nil { + // Log warning about unknown cid schema type, but continue on since + // the linksystem might recognize it. log.Warnw(err.Error(), "value", reqType) } } - item, err := p.lsys.Load(ipldCtx, cidlink.Link{Cid: c}, basicnode.Prototype.Any) + var ipldProto ipldmodel.NodePrototype + + switch reqType { + case CidSchemaAdvertisement: + ipldProto = schema.AdvertisementPrototype + case CidSchemaEntryChunk: + ipldProto = schema.EntryChunkPrototype + default: + ipldProto = basicnode.Prototype.Any + } + + //ipldProto = basicnode.Prototype.Any + + item, err := p.lsys.Load(ipldCtx, cidlink.Link{Cid: c}, ipldProto) if err != nil { if errors.Is(err, ipld.ErrNotExists{}) { http.Error(w, "cid not found", http.StatusNotFound) diff --git a/dagsync/option.go b/dagsync/option.go index f031481..e5dafe8 100644 --- a/dagsync/option.go +++ b/dagsync/option.go @@ -48,6 +48,7 @@ type config struct { firstSyncDepth int64 segDepthLimit int64 + cidSchemaHint bool strictAdsSelSeq bool httpTimeout time.Duration @@ -66,6 +67,7 @@ func getOpts(opts []Option) (config, error) { httpTimeout: defaultHttpTimeout, idleHandlerTTL: defaultIdleHandlerTTL, segDepthLimit: defaultSegDepthLimit, + cidSchemaHint: true, strictAdsSelSeq: true, } @@ -339,3 +341,10 @@ func MakeGeneralBlockHook(prevAdCid func(adCid cid.Cid) (cid.Cid, error)) BlockH } } } + +func WithCidSchemaHint(enable bool) Option { + return func(c *config) error { + c.cidSchemaHint = enable + return nil + } +} diff --git a/dagsync/subscriber.go b/dagsync/subscriber.go index e05e50e..d0a93b6 100644 --- a/dagsync/subscriber.go +++ b/dagsync/subscriber.go @@ -118,6 +118,10 @@ type Subscriber struct { // specifies the selection sequence itself. adsSelectorSeq ipld.Node + // cidSchemaHint enables sending the cid schema type hint as + // an HTTP header in sync requests. + cidSchemaHint bool + // selectorOne selects one multihash entries or HAMT block. selectorOne ipld.Node // selectorAll selects all multihash HAMT blocks. @@ -236,6 +240,8 @@ func NewSubscriber(host host.Host, lsys ipld.LinkSystem, options ...Option) (*Su ssb.ExploreFields(func(efsb builder.ExploreFieldsSpecBuilder) { efsb.Insert("Next", ssb.ExploreRecursiveEdge()) // Next field in EntryChunk })).Node(), + + cidSchemaHint: opts.cidSchemaHint, } if opts.strictAdsSelSeq { @@ -244,6 +250,7 @@ func NewSubscriber(host host.Host, lsys ipld.LinkSystem, options ...Option) (*Su }).Node() } else { s.adsSelectorSeq = ssb.ExploreAll(ssb.ExploreRecursiveEdge()).Node() + s.cidSchemaHint = false } if opts.hasRcvr { @@ -488,9 +495,11 @@ func (s *Subscriber) SyncAdChain(ctx context.Context, peerInfo peer.AddrInfo, op sel := ExploreRecursiveWithStopNode(depthLimit, s.adsSelectorSeq, stopLnk) - ctx, err = ipnisync.CtxWithCidSchema(ctx, ipnisync.CidSchemaAdvertisement) - if err != nil { - panic(err.Error()) + if s.cidSchemaHint { + ctx, err = ipnisync.CtxWithCidSchema(ctx, ipnisync.CidSchemaAdvertisement) + if err != nil { + panic(err.Error()) + } } syncCount, err := hnd.handle(ctx, nextCid, sel, syncer, opts.blockHook, segdl, stopAtCid) if err != nil { @@ -575,9 +584,11 @@ func (s *Subscriber) syncEntries(ctx context.Context, peerInfo peer.AddrInfo, en log.Debugw("Start entries sync", "peer", peerInfo.ID, "cid", entCid) - ctx, err = ipnisync.CtxWithCidSchema(ctx, ipnisync.CidSchemaEntryChunk) - if err != nil { - panic(err.Error()) + if s.cidSchemaHint { + ctx, err = ipnisync.CtxWithCidSchema(ctx, ipnisync.CidSchemaEntryChunk) + if err != nil { + panic(err.Error()) + } } _, err = hnd.handle(ctx, entCid, sel, syncer, bh, segdl, cid.Undef) if err != nil { @@ -880,9 +891,11 @@ func (h *handler) asyncSyncAdChain(ctx context.Context) { return } - ctx, err = ipnisync.CtxWithCidSchema(ctx, ipnisync.CidSchemaAdvertisement) - if err != nil { - panic(err.Error()) + if h.subscriber.cidSchemaHint { + ctx, err = ipnisync.CtxWithCidSchema(ctx, ipnisync.CidSchemaAdvertisement) + if err != nil { + panic(err.Error()) + } } sel := ExploreRecursiveWithStopNode(adsDepthLimit, h.subscriber.adsSelectorSeq, latestSyncLink) syncCount, err := h.handle(ctx, nextCid, sel, syncer, h.subscriber.generalBlockHook, h.subscriber.segDepthLimit, stopAtCid) From 53291a3a2f71be0ffc5645eae05c38d608169bfc Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Thu, 5 Sep 2024 10:35:42 -0700 Subject: [PATCH 4/5] Do not pass or accpet nil ctx. Always set ipld prototype if type hint is present. --- dagsync/ipnisync/cid_schema_hint.go | 6 ------ dagsync/ipnisync/cid_schema_hint_test.go | 8 ++------ dagsync/ipnisync/publisher.go | 6 ++---- dagsync/ipnisync/sync.go | 26 +++++++++++++++++------- 4 files changed, 23 insertions(+), 23 deletions(-) diff --git a/dagsync/ipnisync/cid_schema_hint.go b/dagsync/ipnisync/cid_schema_hint.go index 5062040..461c033 100644 --- a/dagsync/ipnisync/cid_schema_hint.go +++ b/dagsync/ipnisync/cid_schema_hint.go @@ -37,9 +37,6 @@ const cidSchemaCtxKey cidSchemaTypeCtxKey = CidSchemaHeader // Returning unrecognized values with an error allows consumers to retrieved // newer values that are not recognized by an older version of this library. func CidSchemaFromCtx(ctx context.Context) (string, error) { - if ctx == nil { - return "", nil - } cidSchemaType, ok := ctx.Value(cidSchemaCtxKey).(string) if !ok { return "", nil @@ -64,9 +61,6 @@ func CtxWithCidSchema(ctx context.Context, cidSchemaType string) (context.Contex if cidSchemaType == "" { return ctx, nil } - if ctx == nil { - ctx = context.Background() - } var err error switch cidSchemaType { case CidSchemaAdvertisement, CidSchemaEntryChunk: diff --git a/dagsync/ipnisync/cid_schema_hint_test.go b/dagsync/ipnisync/cid_schema_hint_test.go index b7fde9f..8462cc7 100644 --- a/dagsync/ipnisync/cid_schema_hint_test.go +++ b/dagsync/ipnisync/cid_schema_hint_test.go @@ -9,12 +9,8 @@ import ( ) func TestCtxWithCidSchema(t *testing.T) { - ctx, err := ipnisync.CtxWithCidSchema(nil, "") - require.NoError(t, err) - require.Nil(t, ctx) - ctxOrig := context.Background() - ctx, err = ipnisync.CtxWithCidSchema(ctxOrig, "") + ctx, err := ipnisync.CtxWithCidSchema(ctxOrig, "") require.NoError(t, err) require.Equal(t, ctxOrig, ctx) @@ -26,7 +22,7 @@ func TestCtxWithCidSchema(t *testing.T) { require.NoError(t, err) require.Equal(t, ipnisync.CidSchemaAdvertisement, value) - ctx, err = ipnisync.CtxWithCidSchema(nil, ipnisync.CidSchemaEntryChunk) + ctx, err = ipnisync.CtxWithCidSchema(ctx, ipnisync.CidSchemaEntryChunk) require.NoError(t, err) value, err = ipnisync.CidSchemaFromCtx(ctx) require.NoError(t, err) diff --git a/dagsync/ipnisync/publisher.go b/dagsync/ipnisync/publisher.go index 7f46185..ac96481 100644 --- a/dagsync/ipnisync/publisher.go +++ b/dagsync/ipnisync/publisher.go @@ -1,6 +1,7 @@ package ipnisync import ( + "context" "errors" "fmt" "net/http" @@ -229,7 +230,7 @@ func (p *Publisher) ServeHTTP(w http.ResponseWriter, r *http.Request) { reqType := r.Header.Get(CidSchemaHeader) if reqType != "" { log.Debug("sync request has cid schema type hint", "hint", reqType) - ipldCtx.Ctx, err = CtxWithCidSchema(ipldCtx.Ctx, reqType) + ipldCtx.Ctx, err = CtxWithCidSchema(context.Background(), reqType) if err != nil { // Log warning about unknown cid schema type, but continue on since // the linksystem might recognize it. @@ -238,7 +239,6 @@ func (p *Publisher) ServeHTTP(w http.ResponseWriter, r *http.Request) { } var ipldProto ipldmodel.NodePrototype - switch reqType { case CidSchemaAdvertisement: ipldProto = schema.AdvertisementPrototype @@ -248,8 +248,6 @@ func (p *Publisher) ServeHTTP(w http.ResponseWriter, r *http.Request) { ipldProto = basicnode.Prototype.Any } - //ipldProto = basicnode.Prototype.Any - item, err := p.lsys.Load(ipldCtx, cidlink.Link{Cid: c}, ipldProto) if err != nil { if errors.Is(err, ipld.ErrNotExists{}) { diff --git a/dagsync/ipnisync/sync.go b/dagsync/ipnisync/sync.go index ceb8587..7bd7554 100644 --- a/dagsync/ipnisync/sync.go +++ b/dagsync/ipnisync/sync.go @@ -17,11 +17,13 @@ import ( logging "github.com/ipfs/go-log/v2" "github.com/ipld/go-ipld-prime" "github.com/ipld/go-ipld-prime/datamodel" + ipldmodel "github.com/ipld/go-ipld-prime/datamodel" cidlink "github.com/ipld/go-ipld-prime/linking/cid" basicnode "github.com/ipld/go-ipld-prime/node/basic" "github.com/ipld/go-ipld-prime/traversal" "github.com/ipld/go-ipld-prime/traversal/selector" headschema "github.com/ipni/go-libipni/dagsync/ipnisync/head" + "github.com/ipni/go-libipni/ingest/schema" "github.com/ipni/go-libipni/maurl" "github.com/ipni/go-libipni/mautil" "github.com/libp2p/go-libp2p/core/network" @@ -227,12 +229,22 @@ func (s *Syncer) Sync(ctx context.Context, nextCid cid.Cid, sel ipld.Node) error } // Check for valid cid schema type if set. - _, err = CidSchemaFromCtx(ctx) + reqType, err := CidSchemaFromCtx(ctx) if err != nil { return err } - cids, err := s.walkFetch(ctx, nextCid, xsel) + var ipldProto ipldmodel.NodePrototype + switch reqType { + case CidSchemaAdvertisement: + ipldProto = schema.AdvertisementPrototype + case CidSchemaEntryChunk: + ipldProto = schema.EntryChunkPrototype + default: + ipldProto = basicnode.Prototype.Any + } + + cids, err := s.walkFetch(ctx, nextCid, xsel, ipldProto) if err != nil { return fmt.Errorf("failed to traverse requested dag: %w", err) } @@ -258,7 +270,7 @@ func (s *Syncer) Sync(ctx context.Context, nextCid cid.Cid, sel ipld.Node) error // walkFetch is run by a traversal of the selector. For each block that the // selector walks over, walkFetch will look to see if it can find it in the // local data store. If it cannot, it will then go and get it over HTTP. -func (s *Syncer) walkFetch(ctx context.Context, rootCid cid.Cid, sel selector.Selector) ([]cid.Cid, error) { +func (s *Syncer) walkFetch(ctx context.Context, rootCid cid.Cid, sel selector.Selector, ipldProto ipldmodel.NodePrototype) ([]cid.Cid, error) { // Track the order of cids seen during traversal so that the block hook // function gets called in the same order. var traversalOrder []cid.Cid @@ -269,7 +281,7 @@ func (s *Syncer) walkFetch(ctx context.Context, rootCid cid.Cid, sel selector.Se getMissingLs.StorageReadOpener = func(lc ipld.LinkContext, l ipld.Link) (io.Reader, error) { c := l.(cidlink.Link).Cid // fetchBlock checks if the node is already present in storage. - err := s.fetchBlock(ctx, c) + err := s.fetchBlock(ctx, c, ipldProto) if err != nil { return nil, fmt.Errorf("failed to fetch block for cid %s: %w", c, err) } @@ -291,7 +303,7 @@ func (s *Syncer) walkFetch(ctx context.Context, rootCid cid.Cid, sel selector.Se } // get the direct node. - rootNode, err := getMissingLs.Load(ipld.LinkContext{Ctx: ctx}, cidlink.Link{Cid: rootCid}, basicnode.Prototype.Any) + rootNode, err := getMissingLs.Load(ipld.LinkContext{Ctx: ctx}, cidlink.Link{Cid: rootCid}, ipldProto) if err != nil { return nil, fmt.Errorf("failed to load node for root cid %s: %w", rootCid, err) } @@ -374,8 +386,8 @@ retry: } // fetchBlock fetches an item into the datastore at c if not locally available. -func (s *Syncer) fetchBlock(ctx context.Context, c cid.Cid) error { - n, err := s.sync.lsys.Load(ipld.LinkContext{Ctx: ctx}, cidlink.Link{Cid: c}, basicnode.Prototype.Any) +func (s *Syncer) fetchBlock(ctx context.Context, c cid.Cid, ipldProto ipldmodel.NodePrototype) error { + n, err := s.sync.lsys.Load(ipld.LinkContext{Ctx: ctx}, cidlink.Link{Cid: c}, ipldProto) // node is already present. if n != nil && err == nil { return nil From 42c96e2b038505483dca26788337afcd3835b890 Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Thu, 5 Sep 2024 10:48:52 -0700 Subject: [PATCH 5/5] lint: fix import --- dagsync/ipnisync/publisher.go | 4 ++-- dagsync/ipnisync/sync.go | 7 +++---- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/dagsync/ipnisync/publisher.go b/dagsync/ipnisync/publisher.go index ac96481..7831bda 100644 --- a/dagsync/ipnisync/publisher.go +++ b/dagsync/ipnisync/publisher.go @@ -13,7 +13,7 @@ import ( "github.com/ipfs/go-cid" "github.com/ipld/go-ipld-prime" "github.com/ipld/go-ipld-prime/codec/dagjson" - ipldmodel "github.com/ipld/go-ipld-prime/datamodel" + "github.com/ipld/go-ipld-prime/datamodel" cidlink "github.com/ipld/go-ipld-prime/linking/cid" basicnode "github.com/ipld/go-ipld-prime/node/basic" headschema "github.com/ipni/go-libipni/dagsync/ipnisync/head" @@ -238,7 +238,7 @@ func (p *Publisher) ServeHTTP(w http.ResponseWriter, r *http.Request) { } } - var ipldProto ipldmodel.NodePrototype + var ipldProto datamodel.NodePrototype switch reqType { case CidSchemaAdvertisement: ipldProto = schema.AdvertisementPrototype diff --git a/dagsync/ipnisync/sync.go b/dagsync/ipnisync/sync.go index 7bd7554..13a2b9a 100644 --- a/dagsync/ipnisync/sync.go +++ b/dagsync/ipnisync/sync.go @@ -17,7 +17,6 @@ import ( logging "github.com/ipfs/go-log/v2" "github.com/ipld/go-ipld-prime" "github.com/ipld/go-ipld-prime/datamodel" - ipldmodel "github.com/ipld/go-ipld-prime/datamodel" cidlink "github.com/ipld/go-ipld-prime/linking/cid" basicnode "github.com/ipld/go-ipld-prime/node/basic" "github.com/ipld/go-ipld-prime/traversal" @@ -234,7 +233,7 @@ func (s *Syncer) Sync(ctx context.Context, nextCid cid.Cid, sel ipld.Node) error return err } - var ipldProto ipldmodel.NodePrototype + var ipldProto datamodel.NodePrototype switch reqType { case CidSchemaAdvertisement: ipldProto = schema.AdvertisementPrototype @@ -270,7 +269,7 @@ func (s *Syncer) Sync(ctx context.Context, nextCid cid.Cid, sel ipld.Node) error // walkFetch is run by a traversal of the selector. For each block that the // selector walks over, walkFetch will look to see if it can find it in the // local data store. If it cannot, it will then go and get it over HTTP. -func (s *Syncer) walkFetch(ctx context.Context, rootCid cid.Cid, sel selector.Selector, ipldProto ipldmodel.NodePrototype) ([]cid.Cid, error) { +func (s *Syncer) walkFetch(ctx context.Context, rootCid cid.Cid, sel selector.Selector, ipldProto datamodel.NodePrototype) ([]cid.Cid, error) { // Track the order of cids seen during traversal so that the block hook // function gets called in the same order. var traversalOrder []cid.Cid @@ -386,7 +385,7 @@ retry: } // fetchBlock fetches an item into the datastore at c if not locally available. -func (s *Syncer) fetchBlock(ctx context.Context, c cid.Cid, ipldProto ipldmodel.NodePrototype) error { +func (s *Syncer) fetchBlock(ctx context.Context, c cid.Cid, ipldProto datamodel.NodePrototype) error { n, err := s.sync.lsys.Load(ipld.LinkContext{Ctx: ctx}, cidlink.Link{Cid: c}, ipldProto) // node is already present. if n != nil && err == nil {