diff --git a/dagsync/option.go b/dagsync/option.go index 3c2d49e..70dc044 100644 --- a/dagsync/option.go +++ b/dagsync/option.go @@ -310,3 +310,35 @@ func ScopedBlockHook(hook BlockHookFunc) SyncOption { sc.blockHook = hook } } + +// MakeGeneralBlockHook creates a block hook function that sets the next sync +// action based on whether the specified advertisement has a previous +// advertisement in the chain.. +// +// Use this when segmented sync is enabled and no other blockhook is defined. +// +// The supplied prevAdCid function takes the CID of the current advertisement +// and returns the CID of the previous advertisement in the chain. This would +// typically be done my loading the specified advertisement from the +// ipld.LinkSystem and getting the previous CID. +func MakeGeneralBlockHook(prevAdCid func(adCid cid.Cid) (cid.Cid, error)) BlockHookFunc { + return func(_ peer.ID, adCid cid.Cid, actions SegmentSyncActions) { + // The only kind of block we should get by loading CIDs here should be + // Advertisement. + // + // Because: + // - The default subscription selector only selects advertisements. + // - Entries are synced with an explicit selector separate from + // advertisement syncs and should use dagsync.ScopedBlockHook to + // override this hook and decode chunks instead. + // + // Therefore, we only attempt to load advertisements here and signal + // failure if the load fails. + prevCid, err := prevAdCid(adCid) + if err != nil { + actions.FailSync(err) + } else { + actions.SetNextSyncCid(prevCid) + } + } +} diff --git a/ingest/schema/types.go b/ingest/schema/types.go index f9c0716..bbfa213 100644 --- a/ingest/schema/types.go +++ b/ingest/schema/types.go @@ -1,12 +1,21 @@ package schema import ( + "bytes" "errors" "fmt" + "io" + "github.com/ipfs/go-cid" "github.com/ipld/go-ipld-prime" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" + "github.com/ipld/go-ipld-prime/multicodec" "github.com/ipld/go-ipld-prime/node/bindnode" "github.com/multiformats/go-multihash" + + // Import so these codecs get registered. + _ "github.com/ipld/go-ipld-prime/codec/dagcbor" + _ "github.com/ipld/go-ipld-prime/codec/dagjson" ) type ( @@ -82,6 +91,15 @@ func UnwrapAdvertisement(node ipld.Node) (*Advertisement, error) { return ad, nil } +// Return the Advertisement's previous CID, or cid.Undef if there is no +// previous CID. +func (a Advertisement) PreviousCid() cid.Cid { + if a.PreviousID == nil { + return cid.Undef + } + return a.PreviousID.(cidlink.Link).Cid +} + func (a Advertisement) Validate() error { if len(a.ContextID) > MaxContextIDLen { return errors.New("context id too long") @@ -94,6 +112,49 @@ func (a Advertisement) Validate() error { return nil } +// BytesToAdvertisement deserializes an Advertisement from a buffer. It does +// not check that the given CID matches the data, as this should have been done +// when the data was acquired. +func BytesToAdvertisement(adCid cid.Cid, data []byte) (Advertisement, error) { + adNode, err := decodeIPLDNode(adCid.Prefix().Codec, bytes.NewBuffer(data), AdvertisementPrototype) + if err != nil { + return Advertisement{}, err + } + ad, err := UnwrapAdvertisement(adNode) + if err != nil { + return Advertisement{}, err + } + return *ad, nil +} + +// BytesToEntryChunk deserializes an EntryChunk from a buffer. It does not +// check that the given CID matches the data, as this should have been done +// when the data was acquired. +func BytesToEntryChunk(entCid cid.Cid, data []byte) (EntryChunk, error) { + entNode, err := decodeIPLDNode(entCid.Prefix().Codec, bytes.NewBuffer(data), EntryChunkPrototype) + if err != nil { + return EntryChunk{}, err + } + ent, err := UnwrapEntryChunk(entNode) + if err != nil { + return EntryChunk{}, err + } + return *ent, nil +} + +// decodeIPLDNode decodes an ipld.Node from bytes read from an io.Reader. +func decodeIPLDNode(codec uint64, r io.Reader, prototype ipld.NodePrototype) (ipld.Node, error) { + nb := prototype.NewBuilder() + decoder, err := multicodec.LookupDecoder(codec) + if err != nil { + return nil, err + } + if err = decoder(nb, r); err != nil { + return nil, err + } + return nb.Build(), nil +} + // UnwrapEntryChunk unwraps the given node as an entry chunk. // // Note that the node is reassigned to EntryChunkPrototype if its prototype is different.