Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add function to create general dagsync blockhook function #144

Merged
merged 6 commits into from
Jan 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions dagsync/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,3 +310,35 @@ func ScopedBlockHook(hook BlockHookFunc) SyncOption {
sc.blockHook = hook
}
}

// MakeGeneralBlockHook creates a block hook function that sets the next sync
// action based on whether the specified advertisement has a previous
// advertisement in the chain..
//
// Use this when segmented sync is enabled and no other blockhook is defined.
//
// The supplied prevAdCid function takes the CID of the current advertisement
// and returns the CID of the previous advertisement in the chain. This would
// typically be done my loading the specified advertisement from the
// ipld.LinkSystem and getting the previous CID.
func MakeGeneralBlockHook(prevAdCid func(adCid cid.Cid) (cid.Cid, error)) BlockHookFunc {
return func(_ peer.ID, adCid cid.Cid, actions SegmentSyncActions) {
// The only kind of block we should get by loading CIDs here should be
// Advertisement.
//
// Because:
// - The default subscription selector only selects advertisements.
// - Entries are synced with an explicit selector separate from
// advertisement syncs and should use dagsync.ScopedBlockHook to
// override this hook and decode chunks instead.
//
// Therefore, we only attempt to load advertisements here and signal
// failure if the load fails.
prevCid, err := prevAdCid(adCid)
if err != nil {
actions.FailSync(err)
} else {
actions.SetNextSyncCid(prevCid)
}
}
}
61 changes: 61 additions & 0 deletions ingest/schema/types.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,21 @@
package schema

import (
"bytes"
"errors"
"fmt"
"io"

"github.com/ipfs/go-cid"
"github.com/ipld/go-ipld-prime"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipld/go-ipld-prime/multicodec"
"github.com/ipld/go-ipld-prime/node/bindnode"
"github.com/multiformats/go-multihash"

// Import so these codecs get registered.
_ "github.com/ipld/go-ipld-prime/codec/dagcbor"
_ "github.com/ipld/go-ipld-prime/codec/dagjson"
)

type (
Expand Down Expand Up @@ -82,6 +91,15 @@ func UnwrapAdvertisement(node ipld.Node) (*Advertisement, error) {
return ad, nil
}

// Return the Advertisement's previous CID, or cid.Undef if there is no
// previous CID.
func (a Advertisement) PreviousCid() cid.Cid {
if a.PreviousID == nil {
return cid.Undef
}
return a.PreviousID.(cidlink.Link).Cid
}

func (a Advertisement) Validate() error {
if len(a.ContextID) > MaxContextIDLen {
return errors.New("context id too long")
Expand All @@ -94,6 +112,49 @@ func (a Advertisement) Validate() error {
return nil
}

// BytesToAdvertisement deserializes an Advertisement from a buffer. It does
// not check that the given CID matches the data, as this should have been done
// when the data was acquired.
func BytesToAdvertisement(adCid cid.Cid, data []byte) (Advertisement, error) {
adNode, err := decodeIPLDNode(adCid.Prefix().Codec, bytes.NewBuffer(data), AdvertisementPrototype)
if err != nil {
return Advertisement{}, err
}
ad, err := UnwrapAdvertisement(adNode)
if err != nil {
return Advertisement{}, err
}
return *ad, nil
}

// BytesToEntryChunk deserializes an EntryChunk from a buffer. It does not
// check that the given CID matches the data, as this should have been done
// when the data was acquired.
func BytesToEntryChunk(entCid cid.Cid, data []byte) (EntryChunk, error) {
entNode, err := decodeIPLDNode(entCid.Prefix().Codec, bytes.NewBuffer(data), EntryChunkPrototype)
if err != nil {
return EntryChunk{}, err
}
ent, err := UnwrapEntryChunk(entNode)
if err != nil {
return EntryChunk{}, err
}
return *ent, nil
}

// decodeIPLDNode decodes an ipld.Node from bytes read from an io.Reader.
func decodeIPLDNode(codec uint64, r io.Reader, prototype ipld.NodePrototype) (ipld.Node, error) {
nb := prototype.NewBuilder()
decoder, err := multicodec.LookupDecoder(codec)
if err != nil {
return nil, err
}
if err = decoder(nb, r); err != nil {
return nil, err
}
return nb.Build(), nil
}

// UnwrapEntryChunk unwraps the given node as an entry chunk.
//
// Note that the node is reassigned to EntryChunkPrototype if its prototype is different.
Expand Down