Skip to content

Commit

Permalink
Always use basicnode.Prototype.Any as not prototype. (#226)
Browse files Browse the repository at this point in the history
  • Loading branch information
gammazero authored Sep 6, 2024
1 parent 416ed6d commit 3406b2c
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 66 deletions.
5 changes: 2 additions & 3 deletions dagsync/announce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ func TestAnnounceReplace(t *testing.T) {
blocksSeenByHook[c] = struct{}{}
}

sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic, dagsync.RecvAnnounce(),
dagsync.BlockHook(blockHook), dagsync.WithCidSchemaHint(false))
sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic, dagsync.RecvAnnounce(), dagsync.BlockHook(blockHook))
require.NoError(t, err)
defer sub.Close()

Expand Down Expand Up @@ -458,7 +457,7 @@ func initPubSub(t *testing.T, srcStore, dstStore datastore.Batching, allowPeer f
dstHost.Peerstore().AddAddrs(srcHost.ID(), srcHost.Addrs(), time.Hour)
dstLnkS := test.MkLinkSystem(dstStore)

sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic, dagsync.WithCidSchemaHint(false),
sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic,
dagsync.RecvAnnounce(announce.WithTopic(topics[1]), announce.WithAllowPeer(allowPeer)))
require.NoError(t, err)

Expand Down
15 changes: 1 addition & 14 deletions dagsync/ipnisync/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,9 @@ import (
"github.com/ipfs/go-cid"
"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/codec/dagjson"
"github.com/ipld/go-ipld-prime/datamodel"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
basicnode "github.com/ipld/go-ipld-prime/node/basic"
headschema "github.com/ipni/go-libipni/dagsync/ipnisync/head"
"github.com/ipni/go-libipni/ingest/schema"
"github.com/ipni/go-libipni/maurl"
ic "github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
Expand Down Expand Up @@ -229,7 +227,6 @@ func (p *Publisher) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ipldCtx := ipld.LinkContext{}
reqType := r.Header.Get(CidSchemaHeader)
if reqType != "" {
log.Debug("sync request has cid schema type hint", "hint", reqType)
ipldCtx.Ctx, err = CtxWithCidSchema(context.Background(), reqType)
if err != nil {
// Log warning about unknown cid schema type, but continue on since
Expand All @@ -238,17 +235,7 @@ func (p *Publisher) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
}

var ipldProto datamodel.NodePrototype
switch reqType {
case CidSchemaAdvertisement:
ipldProto = schema.AdvertisementPrototype
case CidSchemaEntryChunk:
ipldProto = schema.EntryChunkPrototype
default:
ipldProto = basicnode.Prototype.Any
}

item, err := p.lsys.Load(ipldCtx, cidlink.Link{Cid: c}, ipldProto)
item, err := p.lsys.Load(ipldCtx, cidlink.Link{Cid: c}, basicnode.Prototype.Any)
if err != nil {
if errors.Is(err, ipld.ErrNotExists{}) {
http.Error(w, "cid not found", http.StatusNotFound)
Expand Down
25 changes: 7 additions & 18 deletions dagsync/ipnisync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/ipld/go-ipld-prime/traversal"
"github.com/ipld/go-ipld-prime/traversal/selector"
headschema "github.com/ipni/go-libipni/dagsync/ipnisync/head"
"github.com/ipni/go-libipni/ingest/schema"
"github.com/ipni/go-libipni/maurl"
"github.com/ipni/go-libipni/mautil"
"github.com/libp2p/go-libp2p/core/network"
Expand Down Expand Up @@ -244,22 +243,12 @@ func (s *Syncer) Sync(ctx context.Context, nextCid cid.Cid, sel ipld.Node) error
}

// Check for valid cid schema type if set.
reqType, err := CidSchemaFromCtx(ctx)
_, err = CidSchemaFromCtx(ctx)
if err != nil {
return err
}

var ipldProto datamodel.NodePrototype
switch reqType {
case CidSchemaAdvertisement:
ipldProto = schema.AdvertisementPrototype
case CidSchemaEntryChunk:
ipldProto = schema.EntryChunkPrototype
default:
ipldProto = basicnode.Prototype.Any
}

cids, err := s.walkFetch(ctx, nextCid, xsel, ipldProto)
cids, err := s.walkFetch(ctx, nextCid, xsel)
if err != nil {
return fmt.Errorf("failed to traverse requested dag: %w", err)
}
Expand All @@ -285,7 +274,7 @@ func (s *Syncer) Sync(ctx context.Context, nextCid cid.Cid, sel ipld.Node) error
// walkFetch is run by a traversal of the selector. For each block that the
// selector walks over, walkFetch will look to see if it can find it in the
// local data store. If it cannot, it will then go and get it over HTTP.
func (s *Syncer) walkFetch(ctx context.Context, rootCid cid.Cid, sel selector.Selector, ipldProto datamodel.NodePrototype) ([]cid.Cid, error) {
func (s *Syncer) walkFetch(ctx context.Context, rootCid cid.Cid, sel selector.Selector) ([]cid.Cid, error) {
// Track the order of cids seen during traversal so that the block hook
// function gets called in the same order.
var traversalOrder []cid.Cid
Expand All @@ -296,7 +285,7 @@ func (s *Syncer) walkFetch(ctx context.Context, rootCid cid.Cid, sel selector.Se
getMissingLs.StorageReadOpener = func(lc ipld.LinkContext, l ipld.Link) (io.Reader, error) {
c := l.(cidlink.Link).Cid
// fetchBlock checks if the node is already present in storage.
err := s.fetchBlock(ctx, c, ipldProto)
err := s.fetchBlock(ctx, c)
if err != nil {
return nil, fmt.Errorf("failed to fetch block for cid %s: %w", c, err)
}
Expand All @@ -318,7 +307,7 @@ func (s *Syncer) walkFetch(ctx context.Context, rootCid cid.Cid, sel selector.Se
}

// get the direct node.
rootNode, err := getMissingLs.Load(ipld.LinkContext{Ctx: ctx}, cidlink.Link{Cid: rootCid}, ipldProto)
rootNode, err := getMissingLs.Load(ipld.LinkContext{Ctx: ctx}, cidlink.Link{Cid: rootCid}, basicnode.Prototype.Any)
if err != nil {
return nil, fmt.Errorf("failed to load node for root cid %s: %w", rootCid, err)
}
Expand Down Expand Up @@ -401,8 +390,8 @@ retry:
}

// fetchBlock fetches an item into the datastore at c if not locally available.
func (s *Syncer) fetchBlock(ctx context.Context, c cid.Cid, ipldProto datamodel.NodePrototype) error {
n, err := s.sync.lsys.Load(ipld.LinkContext{Ctx: ctx}, cidlink.Link{Cid: c}, ipldProto)
func (s *Syncer) fetchBlock(ctx context.Context, c cid.Cid) error {
n, err := s.sync.lsys.Load(ipld.LinkContext{Ctx: ctx}, cidlink.Link{Cid: c}, basicnode.Prototype.Any)
// node is already present.
if n != nil && err == nil {
return nil
Expand Down
9 changes: 0 additions & 9 deletions dagsync/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ type config struct {
gsMaxInRequests uint64
gsMaxOutRequests uint64

cidSchemaHint bool
strictAdsSelSeq bool

httpTimeout time.Duration
Expand All @@ -74,7 +73,6 @@ func getOpts(opts []Option) (config, error) {
segDepthLimit: defaultSegDepthLimit,
gsMaxInRequests: defaultGsMaxInRequests,
gsMaxOutRequests: defaultGsMaxOutRequests,
cidSchemaHint: true,
strictAdsSelSeq: true,
}

Expand Down Expand Up @@ -357,10 +355,3 @@ func MakeGeneralBlockHook(prevAdCid func(adCid cid.Cid) (cid.Cid, error)) BlockH
}
}
}

func WithCidSchemaHint(enable bool) Option {
return func(c *config) error {
c.cidSchemaHint = enable
return nil
}
}
32 changes: 10 additions & 22 deletions dagsync/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,6 @@ type Subscriber struct {
receiver *announce.Receiver
topicName string

// cidSchemaHint enables sending the cid schema type hint as
// an HTTP header in sync requests.
cidSchemaHint bool

// Track explicit Sync calls in progress and allow them to complete before
// closing subscriber.
expSyncClosed bool
Expand Down Expand Up @@ -248,8 +244,6 @@ func NewSubscriber(host host.Host, ds datastore.Batching, lsys ipld.LinkSystem,
ssb.ExploreFields(func(efsb builder.ExploreFieldsSpecBuilder) {
efsb.Insert("Next", ssb.ExploreRecursiveEdge()) // Next field in EntryChunk
})).Node(),

cidSchemaHint: opts.cidSchemaHint,
}

if opts.strictAdsSelSeq {
Expand All @@ -258,7 +252,6 @@ func NewSubscriber(host host.Host, ds datastore.Batching, lsys ipld.LinkSystem,
}).Node()
} else {
s.adsSelectorSeq = ssb.ExploreAll(ssb.ExploreRecursiveEdge()).Node()
s.cidSchemaHint = false
}

if opts.hasRcvr {
Expand Down Expand Up @@ -504,11 +497,9 @@ func (s *Subscriber) SyncAdChain(ctx context.Context, peerInfo peer.AddrInfo, op

sel := ExploreRecursiveWithStopNode(depthLimit, s.adsSelectorSeq, stopLnk)

if s.cidSchemaHint {
ctx, err = ipnisync.CtxWithCidSchema(ctx, ipnisync.CidSchemaAdvertisement)
if err != nil {
panic(err.Error())
}
ctx, err = ipnisync.CtxWithCidSchema(ctx, ipnisync.CidSchemaAdvertisement)
if err != nil {
panic(err.Error())
}
syncCount, err := hnd.handle(ctx, nextCid, sel, syncer, opts.blockHook, segdl, stopAtCid)
if err != nil {
Expand Down Expand Up @@ -593,11 +584,9 @@ func (s *Subscriber) syncEntries(ctx context.Context, peerInfo peer.AddrInfo, en

log.Debugw("Start entries sync", "peer", peerInfo.ID, "cid", entCid)

if s.cidSchemaHint {
ctx, err = ipnisync.CtxWithCidSchema(ctx, ipnisync.CidSchemaAdvertisement)
if err != nil {
panic(err.Error())
}
ctx, err = ipnisync.CtxWithCidSchema(ctx, ipnisync.CidSchemaEntryChunk)
if err != nil {
panic(err.Error())
}
_, err = hnd.handle(ctx, entCid, sel, syncer, bh, segdl, cid.Undef)
if err != nil {
Expand Down Expand Up @@ -901,11 +890,10 @@ func (h *handler) asyncSyncAdChain(ctx context.Context) {
log.Errorw("Cannot make syncer for announce", "err", err, "peer", h.peerID)
return
}
if h.subscriber.cidSchemaHint {
ctx, err = ipnisync.CtxWithCidSchema(ctx, ipnisync.CidSchemaAdvertisement)
if err != nil {
panic(err.Error())
}

ctx, err = ipnisync.CtxWithCidSchema(ctx, ipnisync.CidSchemaAdvertisement)
if err != nil {
panic(err.Error())
}
sel := ExploreRecursiveWithStopNode(adsDepthLimit, h.subscriber.adsSelectorSeq, latestSyncLink)
syncCount, err := h.handle(ctx, nextCid, sel, syncer, h.subscriber.generalBlockHook, h.subscriber.segDepthLimit, stopAtCid)
Expand Down

0 comments on commit 3406b2c

Please sign in to comment.