diff --git a/dagsync/announce_test.go b/dagsync/announce_test.go index 1740a0d..6db216c 100644 --- a/dagsync/announce_test.go +++ b/dagsync/announce_test.go @@ -34,7 +34,7 @@ func TestAnnounceReplace(t *testing.T) { } sub, err := dagsync.NewSubscriber(dstHost, dstLnkS, dagsync.RecvAnnounce(testTopic), - dagsync.BlockHook(blockHook)) + dagsync.BlockHook(blockHook), dagsync.WithCidSchemaHint(false)) require.NoError(t, err) defer sub.Close() @@ -377,7 +377,7 @@ func initPubSub(t *testing.T, srcStore, dstStore datastore.Batching, allowPeer f dstHost.Peerstore().AddAddrs(srcHost.ID(), srcHost.Addrs(), time.Hour) dstLnkS := test.MkLinkSystem(dstStore) - sub, err := dagsync.NewSubscriber(dstHost, dstLnkS, + sub, err := dagsync.NewSubscriber(dstHost, dstLnkS, dagsync.WithCidSchemaHint(false), dagsync.RecvAnnounce(testTopic, announce.WithTopic(topics[1]), announce.WithAllowPeer(allowPeer))) require.NoError(t, err) diff --git a/dagsync/ipnisync/cid_schema_hint.go b/dagsync/ipnisync/cid_schema_hint.go new file mode 100644 index 0000000..461c033 --- /dev/null +++ b/dagsync/ipnisync/cid_schema_hint.go @@ -0,0 +1,71 @@ +package ipnisync + +import ( + "context" + "errors" +) + +const ( + // CidSchemaHeader is the HTTP header used as an optional hint about the + // type of data requested by a CID. + CidSchemaHeader = "Ipni-Cid-Schema-Type" + // CidSchemaAdvertisement is a value for the CidSchemaHeader specifying + // advertiesement data is being requested. Referrs to Advertisement in + // https://github.com/ipni/go-libipni/blob/main/ingest/schema/schema.ipldsch + CidSchemaAdvertisement = "Advertisement" + // CidSchemaEntries is a value for the CidSchemaHeader specifying + // advertisement entries (multihash chunks) data is being requested. + // Referrs to Entry chunk in + // https://github.com/ipni/go-libipni/blob/main/ingest/schema/schema.ipldsch + CidSchemaEntryChunk = "EntryChunk" +) + +var ErrUnknownCidSchema = errors.New("unknown cid schema type value") + +// cidSchemaTypeKey is the type used for the key of CidSchemaHeader when set as +// a context value. +type cidSchemaTypeCtxKey string + +// cidSchemaCtxKey is used to get the key used to store or extract the cid +// schema value in a context. +const cidSchemaCtxKey cidSchemaTypeCtxKey = CidSchemaHeader + +// CidSchemaFromCtx extracts the CID schema name from the context. If the +// scheam value is not set, then returns "". If the schema value is set, but is +// not recognized, then ErrUnknownCidSchema is returned along with the value. +// +// Returning unrecognized values with an error allows consumers to retrieved +// newer values that are not recognized by an older version of this library. +func CidSchemaFromCtx(ctx context.Context) (string, error) { + cidSchemaType, ok := ctx.Value(cidSchemaCtxKey).(string) + if !ok { + return "", nil + } + + var err error + switch cidSchemaType { + case CidSchemaAdvertisement, CidSchemaEntryChunk: + default: + err = ErrUnknownCidSchema + } + return cidSchemaType, err +} + +// CtxWithCidSchema creates a derived context that has the specified value for +// the CID schema type. +// +// Setting an unrecognized value, even when an error is retruned, allows +// producers to set context values that are not recognized by an older version +// of this library. +func CtxWithCidSchema(ctx context.Context, cidSchemaType string) (context.Context, error) { + if cidSchemaType == "" { + return ctx, nil + } + var err error + switch cidSchemaType { + case CidSchemaAdvertisement, CidSchemaEntryChunk: + default: + err = ErrUnknownCidSchema + } + return context.WithValue(ctx, cidSchemaCtxKey, cidSchemaType), err +} diff --git a/dagsync/ipnisync/cid_schema_hint_test.go b/dagsync/ipnisync/cid_schema_hint_test.go new file mode 100644 index 0000000..8462cc7 --- /dev/null +++ b/dagsync/ipnisync/cid_schema_hint_test.go @@ -0,0 +1,46 @@ +package ipnisync_test + +import ( + "context" + "testing" + + "github.com/ipni/go-libipni/dagsync/ipnisync" + "github.com/stretchr/testify/require" +) + +func TestCtxWithCidSchema(t *testing.T) { + ctxOrig := context.Background() + ctx, err := ipnisync.CtxWithCidSchema(ctxOrig, "") + require.NoError(t, err) + require.Equal(t, ctxOrig, ctx) + + ctx, err = ipnisync.CtxWithCidSchema(ctxOrig, ipnisync.CidSchemaAdvertisement) + require.NoError(t, err) + require.NotEqual(t, ctxOrig, ctx) + + value, err := ipnisync.CidSchemaFromCtx(ctx) + require.NoError(t, err) + require.Equal(t, ipnisync.CidSchemaAdvertisement, value) + + ctx, err = ipnisync.CtxWithCidSchema(ctx, ipnisync.CidSchemaEntryChunk) + require.NoError(t, err) + value, err = ipnisync.CidSchemaFromCtx(ctx) + require.NoError(t, err) + require.Equal(t, ipnisync.CidSchemaEntryChunk, value) + + value, err = ipnisync.CidSchemaFromCtx(ctxOrig) + require.NoError(t, err) + require.Empty(t, value) + + const unknownVal = "unknown" + + // Setting unknown value returns error as well as context with value. + ctx, err = ipnisync.CtxWithCidSchema(ctxOrig, unknownVal) + require.ErrorIs(t, err, ipnisync.ErrUnknownCidSchema) + require.NotNil(t, ctxOrig, ctx) + + // Getting unknown value returns error as well as value. + value, err = ipnisync.CidSchemaFromCtx(ctx) + require.ErrorIs(t, err, ipnisync.ErrUnknownCidSchema) + require.Equal(t, unknownVal, value) +} diff --git a/dagsync/ipnisync/publisher.go b/dagsync/ipnisync/publisher.go index 358f01e..7831bda 100644 --- a/dagsync/ipnisync/publisher.go +++ b/dagsync/ipnisync/publisher.go @@ -1,6 +1,7 @@ package ipnisync import ( + "context" "errors" "fmt" "net/http" @@ -12,9 +13,11 @@ import ( "github.com/ipfs/go-cid" "github.com/ipld/go-ipld-prime" "github.com/ipld/go-ipld-prime/codec/dagjson" + "github.com/ipld/go-ipld-prime/datamodel" cidlink "github.com/ipld/go-ipld-prime/linking/cid" basicnode "github.com/ipld/go-ipld-prime/node/basic" headschema "github.com/ipni/go-libipni/dagsync/ipnisync/head" + "github.com/ipni/go-libipni/ingest/schema" "github.com/ipni/go-libipni/maurl" ic "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/peer" @@ -41,6 +44,10 @@ var _ http.Handler = (*Publisher)(nil) // NewPublisher creates a new ipni-sync publisher. Optionally, a libp2p stream // host can be provided to serve HTTP over libp2p. +// +// If the publisher receives a request that contains a valid CidSchemaHeader +// header, then the ipld.Context passed to the lsys Load function contains a +// context that has that header's value retrievable with CidSchemaFromCtx. func NewPublisher(lsys ipld.LinkSystem, privKey ic.PrivKey, options ...Option) (*Publisher, error) { opts, err := getOpts(options) if err != nil { @@ -218,7 +225,30 @@ func (p *Publisher) ServeHTTP(w http.ResponseWriter, r *http.Request) { http.Error(w, "invalid request: not a cid", http.StatusBadRequest) return } - item, err := p.lsys.Load(ipld.LinkContext{}, cidlink.Link{Cid: c}, basicnode.Prototype.Any) + + ipldCtx := ipld.LinkContext{} + reqType := r.Header.Get(CidSchemaHeader) + if reqType != "" { + log.Debug("sync request has cid schema type hint", "hint", reqType) + ipldCtx.Ctx, err = CtxWithCidSchema(context.Background(), reqType) + if err != nil { + // Log warning about unknown cid schema type, but continue on since + // the linksystem might recognize it. + log.Warnw(err.Error(), "value", reqType) + } + } + + var ipldProto datamodel.NodePrototype + switch reqType { + case CidSchemaAdvertisement: + ipldProto = schema.AdvertisementPrototype + case CidSchemaEntryChunk: + ipldProto = schema.EntryChunkPrototype + default: + ipldProto = basicnode.Prototype.Any + } + + item, err := p.lsys.Load(ipldCtx, cidlink.Link{Cid: c}, ipldProto) if err != nil { if errors.Is(err, ipld.ErrNotExists{}) { http.Error(w, "cid not found", http.StatusNotFound) diff --git a/dagsync/ipnisync/sync.go b/dagsync/ipnisync/sync.go index e4e18e4..13a2b9a 100644 --- a/dagsync/ipnisync/sync.go +++ b/dagsync/ipnisync/sync.go @@ -22,6 +22,7 @@ import ( "github.com/ipld/go-ipld-prime/traversal" "github.com/ipld/go-ipld-prime/traversal/selector" headschema "github.com/ipni/go-libipni/dagsync/ipnisync/head" + "github.com/ipni/go-libipni/ingest/schema" "github.com/ipni/go-libipni/maurl" "github.com/ipni/go-libipni/mautil" "github.com/libp2p/go-libp2p/core/network" @@ -226,7 +227,23 @@ func (s *Syncer) Sync(ctx context.Context, nextCid cid.Cid, sel ipld.Node) error return fmt.Errorf("failed to compile selector: %w", err) } - cids, err := s.walkFetch(ctx, nextCid, xsel) + // Check for valid cid schema type if set. + reqType, err := CidSchemaFromCtx(ctx) + if err != nil { + return err + } + + var ipldProto datamodel.NodePrototype + switch reqType { + case CidSchemaAdvertisement: + ipldProto = schema.AdvertisementPrototype + case CidSchemaEntryChunk: + ipldProto = schema.EntryChunkPrototype + default: + ipldProto = basicnode.Prototype.Any + } + + cids, err := s.walkFetch(ctx, nextCid, xsel, ipldProto) if err != nil { return fmt.Errorf("failed to traverse requested dag: %w", err) } @@ -252,7 +269,7 @@ func (s *Syncer) Sync(ctx context.Context, nextCid cid.Cid, sel ipld.Node) error // walkFetch is run by a traversal of the selector. For each block that the // selector walks over, walkFetch will look to see if it can find it in the // local data store. If it cannot, it will then go and get it over HTTP. -func (s *Syncer) walkFetch(ctx context.Context, rootCid cid.Cid, sel selector.Selector) ([]cid.Cid, error) { +func (s *Syncer) walkFetch(ctx context.Context, rootCid cid.Cid, sel selector.Selector, ipldProto datamodel.NodePrototype) ([]cid.Cid, error) { // Track the order of cids seen during traversal so that the block hook // function gets called in the same order. var traversalOrder []cid.Cid @@ -263,7 +280,7 @@ func (s *Syncer) walkFetch(ctx context.Context, rootCid cid.Cid, sel selector.Se getMissingLs.StorageReadOpener = func(lc ipld.LinkContext, l ipld.Link) (io.Reader, error) { c := l.(cidlink.Link).Cid // fetchBlock checks if the node is already present in storage. - err := s.fetchBlock(ctx, c) + err := s.fetchBlock(ctx, c, ipldProto) if err != nil { return nil, fmt.Errorf("failed to fetch block for cid %s: %w", c, err) } @@ -285,7 +302,7 @@ func (s *Syncer) walkFetch(ctx context.Context, rootCid cid.Cid, sel selector.Se } // get the direct node. - rootNode, err := getMissingLs.Load(ipld.LinkContext{Ctx: ctx}, cidlink.Link{Cid: rootCid}, basicnode.Prototype.Any) + rootNode, err := getMissingLs.Load(ipld.LinkContext{Ctx: ctx}, cidlink.Link{Cid: rootCid}, ipldProto) if err != nil { return nil, fmt.Errorf("failed to load node for root cid %s: %w", rootCid, err) } @@ -307,6 +324,12 @@ retry: return err } + // Error already checked in Sync. + reqType, _ := CidSchemaFromCtx(ctx) + if reqType != "" { + req.Header.Set(CidSchemaHeader, reqType) + } + resp, err := s.client.Do(req) if err != nil { if len(s.urls) != 0 { @@ -362,8 +385,8 @@ retry: } // fetchBlock fetches an item into the datastore at c if not locally available. -func (s *Syncer) fetchBlock(ctx context.Context, c cid.Cid) error { - n, err := s.sync.lsys.Load(ipld.LinkContext{Ctx: ctx}, cidlink.Link{Cid: c}, basicnode.Prototype.Any) +func (s *Syncer) fetchBlock(ctx context.Context, c cid.Cid, ipldProto datamodel.NodePrototype) error { + n, err := s.sync.lsys.Load(ipld.LinkContext{Ctx: ctx}, cidlink.Link{Cid: c}, ipldProto) // node is already present. if n != nil && err == nil { return nil diff --git a/dagsync/ipnisync/sync_test.go b/dagsync/ipnisync/sync_test.go index c748b27..18606e1 100644 --- a/dagsync/ipnisync/sync_test.go +++ b/dagsync/ipnisync/sync_test.go @@ -230,3 +230,64 @@ func TestIPNIsync_NotFoundReturnsContentNotFoundErr(t *testing.T) { require.NotNil(t, err) require.Contains(t, err.Error(), "content not found") } + +func TestRequestTypeHint(t *testing.T) { + pubPrK, _, err := crypto.GenerateKeyPairWithReader(crypto.RSA, 2048, rand.Reader) + require.NoError(t, err) + pubID, err := peer.IDFromPrivateKey(pubPrK) + require.NoError(t, err) + + var lastReqTypeHint string + + // Instantiate a dagsync publisher. + publs := cidlink.DefaultLinkSystem() + + publs.StorageReadOpener = func(lnkCtx linking.LinkContext, lnk datamodel.Link) (io.Reader, error) { + if lnkCtx.Ctx != nil { + hint, err := ipnisync.CidSchemaFromCtx(lnkCtx.Ctx) + require.NoError(t, err) + require.NotEmpty(t, hint) + lastReqTypeHint = hint + } else { + lastReqTypeHint = "" + } + + require.NotEmpty(t, lastReqTypeHint, "missing expected context value") + return nil, ipld.ErrNotExists{} + } + + pub, err := ipnisync.NewPublisher(publs, pubPrK, ipnisync.WithHTTPListenAddrs("0.0.0.0:0")) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, pub.Close()) }) + + ls := cidlink.DefaultLinkSystem() + store := &memstore.Store{} + ls.SetWriteStorage(store) + ls.SetReadStorage(store) + + sync := ipnisync.NewSync(ls, nil) + pubInfo := peer.AddrInfo{ + ID: pubID, + Addrs: pub.Addrs(), + } + syncer, err := sync.NewSyncer(pubInfo) + require.NoError(t, err) + + testCid, err := cid.Decode(sampleNFTStorageCid) + require.NoError(t, err) + + ctx, err := ipnisync.CtxWithCidSchema(context.Background(), ipnisync.CidSchemaAdvertisement) + require.NoError(t, err) + _ = syncer.Sync(ctx, testCid, selectorparse.CommonSelector_MatchPoint) + require.Equal(t, ipnisync.CidSchemaAdvertisement, lastReqTypeHint) + + ctx, err = ipnisync.CtxWithCidSchema(context.Background(), ipnisync.CidSchemaEntryChunk) + require.NoError(t, err) + _ = syncer.Sync(ctx, testCid, selectorparse.CommonSelector_MatchPoint) + require.Equal(t, ipnisync.CidSchemaEntryChunk, lastReqTypeHint) + + ctx, err = ipnisync.CtxWithCidSchema(context.Background(), "bad") + require.ErrorIs(t, err, ipnisync.ErrUnknownCidSchema) + err = syncer.Sync(ctx, testCid, selectorparse.CommonSelector_MatchPoint) + require.ErrorIs(t, err, ipnisync.ErrUnknownCidSchema) +} diff --git a/dagsync/option.go b/dagsync/option.go index f031481..e5dafe8 100644 --- a/dagsync/option.go +++ b/dagsync/option.go @@ -48,6 +48,7 @@ type config struct { firstSyncDepth int64 segDepthLimit int64 + cidSchemaHint bool strictAdsSelSeq bool httpTimeout time.Duration @@ -66,6 +67,7 @@ func getOpts(opts []Option) (config, error) { httpTimeout: defaultHttpTimeout, idleHandlerTTL: defaultIdleHandlerTTL, segDepthLimit: defaultSegDepthLimit, + cidSchemaHint: true, strictAdsSelSeq: true, } @@ -339,3 +341,10 @@ func MakeGeneralBlockHook(prevAdCid func(adCid cid.Cid) (cid.Cid, error)) BlockH } } } + +func WithCidSchemaHint(enable bool) Option { + return func(c *config) error { + c.cidSchemaHint = enable + return nil + } +} diff --git a/dagsync/subscriber.go b/dagsync/subscriber.go index 4170ad9..d0a93b6 100644 --- a/dagsync/subscriber.go +++ b/dagsync/subscriber.go @@ -118,6 +118,10 @@ type Subscriber struct { // specifies the selection sequence itself. adsSelectorSeq ipld.Node + // cidSchemaHint enables sending the cid schema type hint as + // an HTTP header in sync requests. + cidSchemaHint bool + // selectorOne selects one multihash entries or HAMT block. selectorOne ipld.Node // selectorAll selects all multihash HAMT blocks. @@ -236,6 +240,8 @@ func NewSubscriber(host host.Host, lsys ipld.LinkSystem, options ...Option) (*Su ssb.ExploreFields(func(efsb builder.ExploreFieldsSpecBuilder) { efsb.Insert("Next", ssb.ExploreRecursiveEdge()) // Next field in EntryChunk })).Node(), + + cidSchemaHint: opts.cidSchemaHint, } if opts.strictAdsSelSeq { @@ -244,6 +250,7 @@ func NewSubscriber(host host.Host, lsys ipld.LinkSystem, options ...Option) (*Su }).Node() } else { s.adsSelectorSeq = ssb.ExploreAll(ssb.ExploreRecursiveEdge()).Node() + s.cidSchemaHint = false } if opts.hasRcvr { @@ -488,6 +495,12 @@ func (s *Subscriber) SyncAdChain(ctx context.Context, peerInfo peer.AddrInfo, op sel := ExploreRecursiveWithStopNode(depthLimit, s.adsSelectorSeq, stopLnk) + if s.cidSchemaHint { + ctx, err = ipnisync.CtxWithCidSchema(ctx, ipnisync.CidSchemaAdvertisement) + if err != nil { + panic(err.Error()) + } + } syncCount, err := hnd.handle(ctx, nextCid, sel, syncer, opts.blockHook, segdl, stopAtCid) if err != nil { return cid.Undef, fmt.Errorf("sync handler failed: %w", err) @@ -571,6 +584,12 @@ func (s *Subscriber) syncEntries(ctx context.Context, peerInfo peer.AddrInfo, en log.Debugw("Start entries sync", "peer", peerInfo.ID, "cid", entCid) + if s.cidSchemaHint { + ctx, err = ipnisync.CtxWithCidSchema(ctx, ipnisync.CidSchemaEntryChunk) + if err != nil { + panic(err.Error()) + } + } _, err = hnd.handle(ctx, entCid, sel, syncer, bh, segdl, cid.Undef) if err != nil { return fmt.Errorf("sync handler failed: %w", err) @@ -872,6 +891,12 @@ func (h *handler) asyncSyncAdChain(ctx context.Context) { return } + if h.subscriber.cidSchemaHint { + ctx, err = ipnisync.CtxWithCidSchema(ctx, ipnisync.CidSchemaAdvertisement) + if err != nil { + panic(err.Error()) + } + } sel := ExploreRecursiveWithStopNode(adsDepthLimit, h.subscriber.adsSelectorSeq, latestSyncLink) syncCount, err := h.handle(ctx, nextCid, sel, syncer, h.subscriber.generalBlockHook, h.subscriber.segDepthLimit, stopAtCid) if err != nil { diff --git a/dagsync/test/util.go b/dagsync/test/util.go index aa4bd8c..4e62f48 100644 --- a/dagsync/test/util.go +++ b/dagsync/test/util.go @@ -170,8 +170,12 @@ func encode(lsys ipld.LinkSystem, n ipld.Node) (ipld.Node, ipld.Link) { func MkLinkSystem(ds datastore.Batching) ipld.LinkSystem { lsys := cidlink.DefaultLinkSystem() - lsys.StorageReadOpener = func(_ ipld.LinkContext, lnk ipld.Link) (io.Reader, error) { - val, err := ds.Get(context.Background(), datastore.NewKey(lnk.String())) + lsys.StorageReadOpener = func(ipldCtx ipld.LinkContext, lnk ipld.Link) (io.Reader, error) { + ctx := ipldCtx.Ctx + if ctx == nil { + ctx = context.Background() + } + val, err := ds.Get(ctx, datastore.NewKey(lnk.String())) if err != nil { return nil, err }