diff --git a/backend/hyper/blockstore.go b/backend/hyper/blockstore.go deleted file mode 100644 index bfdb4f38..00000000 --- a/backend/hyper/blockstore.go +++ /dev/null @@ -1,330 +0,0 @@ -package hyper - -import ( - "context" - "fmt" - "seed/backend/hyper/hypersql" - "seed/backend/ipfs" - "seed/backend/util/dqb" - - "crawshaw.io/sqlite" - "crawshaw.io/sqlite/sqlitex" - blockstore "github.com/ipfs/boxo/blockstore" - blocks "github.com/ipfs/go-block-format" - "github.com/ipfs/go-cid" - format "github.com/ipfs/go-ipld-format" - "github.com/klauspost/compress/zstd" - "github.com/multiformats/go-multihash" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" -) - -var ( - mCallsTotal = promauto.NewCounterVec(prometheus.CounterOpts{ - Name: "seed_old_ipfs_blockstore_calls_total", - Help: "The total of method calls on the IPFS' Blockstore public interface.", - }, []string{"method"}) -) - -var _ blockstore.Blockstore = (*blockStore)(nil) - -// blockStore is an implementation of IPFS Blockstore. -type blockStore struct { - db *sqlitex.Pool - encoder *zstd.Encoder - decoder *zstd.Decoder -} - -// newBlockstore creates a new block store from a given connection pool. -// The corresponding table and columns must be created beforehand. -// Use DefaultConfig() for default table and column names. -func newBlockstore(db *sqlitex.Pool) *blockStore { - enc, err := zstd.NewWriter(nil) - if err != nil { - panic(err) - } - - dec, err := zstd.NewReader(nil) - if err != nil { - panic(err) - } - - return &blockStore{ - db: db, - encoder: enc, - decoder: dec, - } -} - -// Has implements blockstore.Blockstore interface. -func (b *blockStore) Has(ctx context.Context, c cid.Cid) (bool, error) { - mCallsTotal.WithLabelValues("Has").Inc() - - conn, release, err := b.db.Conn(ctx) - if err != nil { - return false, err - } - defer release() - - return b.has(conn, c) -} - -func (b *blockStore) has(conn *sqlite.Conn, c cid.Cid) (bool, error) { - res, err := hypersql.BlobsHave(conn, c.Hash()) - if err != nil { - return false, err - } - - if res.Have == 1 { - return true, nil - } - - return false, nil -} - -// Get implements blockstore.Blockstore interface. -func (b *blockStore) Get(ctx context.Context, c cid.Cid) (blocks.Block, error) { - mCallsTotal.WithLabelValues("Get").Inc() - - conn, release, err := b.db.Conn(ctx) - if err != nil { - return nil, err - } - defer release() - - return b.get(conn, c) -} - -func (b *blockStore) get(conn *sqlite.Conn, c cid.Cid) (blocks.Block, error) { - res, err := hypersql.BlobsGet(conn, c.Hash()) - if err != nil { - return nil, err - } - - if res.BlobsID == 0 { - return nil, format.ErrNotFound{Cid: c} - } - - // Size 0 means that data is stored inline in the CID. - if res.BlobsSize == 0 { - return blocks.NewBlockWithCid(nil, c) - } - - data, err := b.decompress(res.BlobsData, int(res.BlobsSize)) - if err != nil { - return nil, err - } - - return blocks.NewBlockWithCid(data, c) -} - -func (b *blockStore) decompress(data []byte, originalSize int) ([]byte, error) { - var err error - out := make([]byte, 0, originalSize) - out, err = b.decoder.DecodeAll(data, out) - if err != nil { - return nil, fmt.Errorf("failed to decompress blob: %w", err) - } - return out, nil -} - -// GetSize implements blockstore.Blockstore interface. -func (b *blockStore) GetSize(ctx context.Context, c cid.Cid) (int, error) { - mCallsTotal.WithLabelValues("GetSize").Inc() - - conn, release, err := b.db.Conn(ctx) - if err != nil { - return 0, err - } - defer release() - - res, err := hypersql.BlobsGetSize(conn, c.Hash()) - if err != nil { - return 0, err - } - - if res.BlobsID == 0 || res.BlobsSize < 0 { - return 0, format.ErrNotFound{Cid: c} - } - - return int(res.BlobsSize), nil -} - -// Put implements blockstore.Blockstore interface. -func (b *blockStore) Put(ctx context.Context, block blocks.Block) error { - mCallsTotal.WithLabelValues("Put").Inc() - - return b.withConn(ctx, func(conn *sqlite.Conn) error { - return sqlitex.WithTx(conn, func() error { - codec, hash := ipfs.DecodeCID(block.Cid()) - _, _, err := b.putBlock(conn, 0, codec, hash, block.RawData()) - return err - }) - }) -} - -// PutMany implements blockstore.Blockstore interface. -func (b *blockStore) PutMany(ctx context.Context, blocks []blocks.Block) error { - mCallsTotal.WithLabelValues("PutMany").Inc() - - return b.withConn(ctx, func(conn *sqlite.Conn) error { - return sqlitex.WithTx(conn, func() error { - for _, blk := range blocks { - codec, hash := ipfs.DecodeCID(blk.Cid()) - if _, _, err := b.putBlock(conn, 0, codec, hash, blk.RawData()); err != nil { - return err - } - } - return nil - }) - }) -} - -func (b *blockStore) putBlock(conn *sqlite.Conn, inID int64, codec uint64, hash multihash.Multihash, data []byte) (id int64, exists bool, err error) { - size, err := hypersql.BlobsGetSize(conn, hash) - if err != nil { - return 0, false, err - } - - var update bool - - switch { - // We have this blob already. Size can be 0 if data is inlined in the CID. - case size.BlobsID != 0 && size.BlobsSize >= 0: - return size.BlobsID, true, nil - // We know about the blob, but we don't have it. - case size.BlobsID != 0 && size.BlobsSize < 0: - update = true - // We don't have nor know anything about the blob. - case size.BlobsID == 0 && size.BlobsSize == 0: - default: - panic("BUG: unhandled blob insert case") - } - - var compressed []byte - // We store IPFS blocks compressed in the database. But for inline CIDs, there's no data (because it's inline), - // hence nothing to compress. It could be that compression doesn't actually bring much benefit, we'd have to - // measure at some point whether or not it's useful. As we're storing a lot of text, I assume storage-wise - // it should make a difference, but the performance hit needs to be measured. - // - // TODO(burdiyan): don't compress if original data is <= compressed data. - if len(data) > 0 { - compressed = make([]byte, 0, len(data)) - compressed = b.encoder.EncodeAll(data, compressed) - } - - if update { - newID, err := allocateBlobID(conn) - if err != nil { - return 0, false, err - } - return newID, false, blobsUpdateMissingData(conn, compressed, int64(len(data)), newID, size.BlobsID) - } - - ins, err := hypersql.BlobsInsert(conn, inID, hash, int64(codec), compressed, int64(len(data))) - return ins.BlobsID, false, err -} - -func allocateBlobID(conn *sqlite.Conn) (int64, error) { - var id int64 - if err := sqlitex.Exec(conn, qAllocateBlobID(), func(stmt *sqlite.Stmt) error { - id = stmt.ColumnInt64(0) - return nil - }); err != nil { - return 0, err - } - - if id == 0 { - return 0, fmt.Errorf("BUG: couldn't allocate blob ID for some reason") - } - - return id, nil -} - -var qAllocateBlobID = dqb.Str(` - UPDATE sqlite_sequence - SET seq = seq + 1 - WHERE name = 'blobs' - RETURNING seq; -`) - -// blobsUpdateMissingData updates a blob. -func blobsUpdateMissingData(conn *sqlite.Conn, blobsData []byte, blobsSize int64, newID, blobsID int64) error { - return sqlitex.Exec(conn, qBlobsUpdateMissingData(), nil, blobsData, blobsSize, newID, blobsID) -} - -var qBlobsUpdateMissingData = dqb.Str(` - UPDATE blobs - SET data = :blobsData, - size = :blobsSize, - id = :newID - WHERE id = :oldID; -`) - -// DeleteBlock implements blockstore.Blockstore interface. -func (b *blockStore) DeleteBlock(ctx context.Context, c cid.Cid) error { - mCallsTotal.WithLabelValues("DeleteBlock").Inc() - - conn, release, err := b.db.Conn(ctx) - if err != nil { - return err - } - defer release() - - _, err = b.deleteBlock(conn, c) - return err -} - -func (b *blockStore) deleteBlock(conn *sqlite.Conn, c cid.Cid) (oldid int64, err error) { - ret, err := hypersql.BlobsDelete(conn, c.Hash()) - return ret.BlobsID, err -} - -// AllKeysChan implements. blockstore.Blockstore interface. -func (b *blockStore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { - mCallsTotal.WithLabelValues("AllKeysChan").Inc() - - c := make(chan cid.Cid, 10) // The buffer is arbitrary. - - conn, release, err := b.db.Conn(ctx) - if err != nil { - return nil, err - } - - list, err := hypersql.BlobsListKnown(conn) - if err != nil { - return nil, err - } - - release() - - go func() { - defer close(c) - - for _, l := range list { - select { - case <-ctx.Done(): - return - case c <- cid.NewCidV1(uint64(l.BlobsCodec), l.BlobsMultihash): - // Written successfully. - } - } - }() - - return c, nil -} - -// HashOnRead satisfies blockstore.Blockstore interface, but is not actually implemented. -func (b *blockStore) HashOnRead(bool) { - panic("hash on read is not implemented for sqlite blockstore") -} - -func (b *blockStore) withConn(ctx context.Context, fn func(*sqlite.Conn) error) error { - conn, release, err := b.db.Conn(ctx) - if err != nil { - return err - } - defer release() - - return fn(conn) -} diff --git a/backend/hyper/entity.go b/backend/hyper/entity.go deleted file mode 100644 index a9c1c98b..00000000 --- a/backend/hyper/entity.go +++ /dev/null @@ -1,942 +0,0 @@ -package hyper - -import ( - "context" - "crypto/rand" - "crypto/sha256" - "encoding/binary" - "encoding/json" - "fmt" - "seed/backend/core" - "seed/backend/crdt2" - "seed/backend/hlc" - "seed/backend/hyper/hypersql" - "seed/backend/ipfs" - "seed/backend/util/dqb" - "seed/backend/util/strbytes" - "sort" - "strings" - - "crawshaw.io/sqlite" - "crawshaw.io/sqlite/sqlitex" - "github.com/ipfs/go-cid" - cbornode "github.com/ipfs/go-ipld-cbor" - "github.com/multiformats/go-multibase" - "github.com/multiformats/go-multicodec" - "github.com/multiformats/go-multihash" - "golang.org/x/exp/maps" - "golang.org/x/exp/slices" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" -) - -// EntityID is a type for IDs of mutable entities. -type EntityID string - -// EntityIDFromCID converts a previously CID-encoded Entity ID back into the initial form. -func EntityIDFromCID(c cid.Cid) (EntityID, error) { - codec, hash := ipfs.DecodeCID(c) - - if multicodec.Code(codec) != multicodec.Raw { - return "", fmt.Errorf("failed to convert CID %s into entity ID: unsupported codec %s", c, multicodec.Code(codec)) - } - - mh, err := multihash.Decode(hash) - if err != nil { - return "", fmt.Errorf("failed to decode multihash from CID %q: %w", c, err) - } - - if multicodec.Code(mh.Code) != multicodec.Identity { - return "", fmt.Errorf("failed to convert CID %s into entity ID: unsupported hash %s", c, multicodec.Code(mh.Code)) - } - - return EntityID(mh.Digest), nil -} - -// CID representation of the entity ID. Used for announcing on the DHT. -func (eid EntityID) CID() (cid.Cid, error) { - c, err := ipfs.NewCID(uint64(multicodec.Raw), uint64(multicodec.Identity), []byte(eid)) - if err != nil { - return c, fmt.Errorf("failed to convert entity ID %s into CID: %w", eid, err) - } - return c, nil -} - -// HasPrefix is a convenience function to avoid misplacing arguments -// for the corresponding function in package strings. -func (eid EntityID) HasPrefix(prefix string) bool { - return strings.HasPrefix(string(eid), prefix) -} - -// TrimPrefix is a convenience function to avoid misplacing arguments -// for the corresponding function in package strings. -func (eid EntityID) TrimPrefix(prefix string) string { - return strings.TrimPrefix(string(eid), prefix) -} - -// String representation of the entity ID. -func (eid EntityID) String() string { return string(eid) } - -// Entity is our CRDT mutable object. -type Entity struct { - id EntityID - changes []ParsedBlob[Change] - deps [][]int // deps for each change. - rdeps [][]int // reverse deps for each change. - applied map[cid.Cid]int - heads map[cid.Cid]struct{} - state *crdt2.Map - maxClock *hlc.Clock - actorsIntern map[string]string - vectorClock map[string]hlc.Timestamp -} - -// NewEntity creates a new entity with a given ID. -func NewEntity(id EntityID) *Entity { - return &Entity{ - id: id, - applied: make(map[cid.Cid]int), - heads: make(map[cid.Cid]struct{}), - state: crdt2.NewMap(), - maxClock: hlc.NewClock(), - actorsIntern: make(map[string]string), - vectorClock: make(map[string]hlc.Timestamp), - } -} - -// NewEntityWithClock creates a new entity with a provided clock. -func NewEntityWithClock(id EntityID, clock *hlc.Clock) *Entity { - e := NewEntity(id) - e.maxClock = clock - return e -} - -// ID returns the ID of the entity. -func (e *Entity) ID() EntityID { return e.id } - -// Get a property under a given path. -func (e *Entity) Get(path ...string) (value any, ok bool) { - return e.state.Get(path...) -} - -// LastChangeTime is max time tracked in the HLC. -func (e *Entity) LastChangeTime() hlc.Timestamp { - return e.maxClock.Max() -} - -// AppliedChanges returns the map of applied changes. -// This must be read-only. Not safe for concurrency. -func (e *Entity) AppliedChanges() []ParsedBlob[Change] { - return e.changes -} - -func (e *Entity) State() *crdt2.Map { - return e.state -} - -// Heads returns the map of head changes. -// This must be read only. Not safe for concurrency. -func (e *Entity) Heads() map[cid.Cid]struct{} { - return e.heads -} - -type Version string - -func NewVersion(cids ...cid.Cid) Version { - if len(cids) == 0 { - return "" - } - - out := make([]string, 0, len(cids)) - for _, k := range cids { - out = append(out, k.String()) - } - sort.Strings(out) - - return Version(strings.Join(out, ".")) -} - -func (v Version) String() string { return string(v) } - -func (v Version) Parse() ([]cid.Cid, error) { - if v == "" { - return nil, nil - } - - parts := strings.Split(string(v), ".") - out := make([]cid.Cid, len(parts)) - - for i, p := range parts { - c, err := cid.Decode(p) - if err != nil { - return nil, fmt.Errorf("failed to parse version: %w", err) - } - out[i] = c - } - - return out, nil -} - -func (e *Entity) Version() Version { - if len(e.heads) == 0 { - return "" - } - - return NewVersion(maps.Keys(e.heads)...) -} - -// ApplyChange to the internal state. -func (e *Entity) ApplyChange(c cid.Cid, ch Change) error { - if ch.Entity != e.id { - return fmt.Errorf("won't apply change from a different entity: want=%q, got=%q", e.id, ch.Entity) - } - - if _, ok := e.applied[c]; ok { - return fmt.Errorf("change is already applied") - } - - var actor string - { - au := ch.Signer.UnsafeString() - a, ok := e.actorsIntern[au] - if !ok { - e.actorsIntern[au] = au - a = au - } - actor = a - } - - if ch.HLCTime < e.vectorClock[actor] { - return fmt.Errorf("applying change '%s' violates causal order", c) - } - - e.vectorClock[actor] = ch.HLCTime - - if ch.HLCTime < e.maxClock.Max() { - return fmt.Errorf("applying change %s out of causal order", c) - } - - deps := make([]int, len(ch.Deps)) - - for i, dep := range ch.Deps { - depIdx, ok := e.applied[dep] - if !ok { - return fmt.Errorf("missing dependency %s of change %s", dep, c) - } - - deps[i] = depIdx - } - - if err := e.maxClock.Track(ch.HLCTime); err != nil { - return err - } - - e.state.ApplyPatch(int64(ch.HLCTime), OriginFromCID(c), ch.Patch) - e.changes = append(e.changes, ParsedBlob[Change]{c, ch}) - e.deps = append(e.deps, nil) - e.rdeps = append(e.rdeps, nil) - e.heads[c] = struct{}{} - curIdx := len(e.changes) - 1 - e.applied[c] = curIdx - - // One more pass through the deps to update the internal DAG structure, - // and update the heads of the current version. - // To avoid corrupting the entity state we shouldn't do this in the first loop we did. - for i, dep := range ch.Deps { - // If any of the deps was a head, then it's no longer the case. - delete(e.heads, dep) - - // Keeping the DAG edges between deps in both directions. - e.deps[curIdx] = addUnique(e.deps[curIdx], deps[i]) - e.rdeps[deps[i]] = addUnique(e.rdeps[deps[i]], curIdx) - } - - return nil -} - -// Deps returns the set of dependencies for the current heads. -// This is a bit more complex than just returning the deps of the head changes as is, -// because they may be redundant in some cases, when they have links between each other. -// This method returns the minimal set of deps by reducing the redundant edges. -// -// Given the following DAG (d, e) are the heads, while (c, b) are the direct deps, -// although only (c) needs to be returned, as b is already assumed by c. -// -// a ← b ← c ← d -// ↖ -// e -func (e *Entity) Deps() []cid.Cid { - if len(e.heads) == 0 { - return nil - } - - // Special case when there's only one head, - // because there's no need to do any reductions. - if len(e.heads) == 1 { - var head cid.Cid - for head = range e.heads { - break - } - - return slices.Clone(e.changes[e.applied[head]].Data.Deps) - } - - // These two sets initially will contain all deps of the heads - // but later the redundant deps will be removed from the reduced set. - // We still need to keep the full deps in order to perform the reduction correctly. - fullDeps := make(map[int]struct{}) - reducedDeps := make(map[int]struct{}) - - for head := range e.heads { - ihead, ok := e.applied[head] - if !ok { - panic("BUG: head change not applied") - } - - for _, dep := range e.deps[ihead] { - fullDeps[dep] = struct{}{} - reducedDeps[dep] = struct{}{} - } - } - - // For each collected dep we want to traverse back to the leaves, - // and if we find a node along the way that is already a collected dep, - // then this current dep is redundant and doesn't need to be returned. - var ( - stack []int - visited = make(map[int]struct{}) - ) - - // Initialize the traversal stack with all the full deps. - for dep := range fullDeps { - stack = append(stack, dep) - } - - // Then for each node in the stack traverse back to the leaves, - // breaking early if any of the rdeps is already in the full deps set. - for len(stack) > 0 { - node := stack[len(stack)-1] - stack = stack[:len(stack)-1] - - if _, ok := visited[node]; ok { - continue - } - - visited[node] = struct{}{} - for _, rdep := range e.rdeps[node] { - if _, ok := visited[rdep]; !ok { - stack = append(stack, rdep) - } - - if _, ok := fullDeps[rdep]; ok { - delete(reducedDeps, node) - break - } - } - } - - out := make([]cid.Cid, 0, len(reducedDeps)) - for dep := range reducedDeps { - out = append(out, e.changes[dep].CID) - } - - return out -} - -func addUnique(in []int, v int) []int { - // Slice in is very small most of the time, - // and is assumed to be sorted. - // Our assumption here is that linear search would be faster than binary search, - // because most changes have only a few dependencies. - var targetIndex int - for i, x := range in { - if x == v { - return in - } - - if x > v { - targetIndex = i - break - } - } - - return slices.Insert(in, targetIndex, v) -} - -// OriginFromCID creates a CRDT origin from the last 8 chars of the hash. -// Most of the time it's not needed, because HLC is very unlikely to collide. -func OriginFromCID(c cid.Cid) string { - if !c.Defined() { - return "" - } - - str, err := c.StringOfBase(multibase.Base58BTC) - if err != nil { - panic(err) - } - return str[len(str)-9:] -} - -// NextTimestamp returns the next timestamp from the HLC. -func (e *Entity) NextTimestamp() hlc.Timestamp { - return e.maxClock.MustNow() -} - -// ChangeOption is a functional option for creating Changes. -type ChangeOption func(*Change) - -// WithAction sets the action field of the change. -func WithAction(action string) ChangeOption { - return func(c *Change) { - c.Action = action - } -} - -// WithMessage sets the message field of the change. -func WithMessage(msg string) ChangeOption { - return func(c *Change) { - c.Message = msg - } -} - -// CreateChange entity creating a change blob, and applying it to the internal state. -func (e *Entity) CreateChange(ts hlc.Timestamp, signer core.KeyPair, delegation cid.Cid, patch map[string]any, opts ...ChangeOption) (hb Blob, err error) { - hb, err = NewChange(e.id, maps.Keys(e.heads), ts, signer, delegation, patch, opts...) - if err != nil { - return hb, err - } - - if err := e.ApplyChange(hb.CID, hb.Decoded.(Change)); err != nil { - return hb, err - } - - return hb, nil -} - -// ReplaceChange creates a new change instead of an existing one. The change to replace must be the current head. -func (e *Entity) ReplaceChange(old cid.Cid, ts hlc.Timestamp, signer core.KeyPair, delegation cid.Cid, patch map[string]any, opts ...ChangeOption) (hb Blob, err error) { - if len(e.heads) != 1 { - return hb, fmt.Errorf("must only have one head change to replace") - } - - if _, ok := e.heads[old]; !ok { - return hb, fmt.Errorf("change to replace must be the current head") - } - - var prev Change - { - idx, ok := e.applied[old] - if !ok { - return hb, fmt.Errorf("change to be replaced must be applied") - } - prev = e.changes[idx].Data - } - - e.state.ForgetState(int64(prev.HLCTime), OriginFromCID(old)) - delete(e.applied, old) - delete(e.heads, old) - - hb, err = NewChange(e.id, prev.Deps, ts, signer, delegation, patch, opts...) - if err != nil { - return hb, err - } - - if err := e.ApplyChange(hb.CID, hb.Decoded.(Change)); err != nil { - return hb, err - } - - return hb, nil -} - -// SortCIDs sorts the multiple CIDs when determinism is needed. -// The sorting is done in place, and the same slice is returned for convenience. -func SortCIDs(cids []cid.Cid) []cid.Cid { - slices.SortFunc(cids, func(a, b cid.Cid) int { return strings.Compare(a.KeyString(), b.KeyString()) }) - return cids -} - -// ForEachComment iterates through a target document comments to manipulate them. -func (bs *Storage) ForEachComment(ctx context.Context, target string, fn func(c cid.Cid, cmt Comment, conn *sqlite.Conn) error) (err error) { - conn, release, err := bs.db.Conn(ctx) - if err != nil { - return err - } - defer release() - - defer sqlitex.Save(conn)(&err) - - rdb, err := hypersql.EntitiesLookupID(conn, target) - if err != nil { - return err - } - if rdb.ResourcesID == 0 { - return fmt.Errorf("resource %s not found: make sure resource ID doesn't have any additional parameters", target) - } - - buf := make([]byte, 0, 1024*1024) // preallocating 1MB for decompression. - err = sqlitex.Exec(conn, qForEachComment(), func(stmt *sqlite.Stmt) error { - var ( - codec = stmt.ColumnInt64(0) - hash = stmt.ColumnBytesUnsafe(1) - data = stmt.ColumnBytesUnsafe(2) - ) - - buf, err = bs.bs.decoder.DecodeAll(data, buf) - if err != nil { - return err - } - - chcid := cid.NewCidV1(uint64(codec), hash) - var cmt Comment - if err := cbornode.DecodeInto(buf, &cmt); err != nil { - return fmt.Errorf("forEachComment: failed to decode comment %s for target %s: %w", chcid, target, err) - } - - if err := fn(chcid, cmt, conn); err != nil { - return err - } - - buf = buf[:0] // reset the slice reusing the backing array - - return nil - }, rdb.ResourcesID) - if err != nil { - return err - } - - return nil -} - -var qForEachComment = dqb.Str(` - SELECT - blobs.codec, - blobs.multihash, - blobs.data - FROM resource_links - JOIN blobs ON blobs.id = resource_links.source - WHERE resource_links.target = :resource - AND resource_links.type = 'comment/target'; -`) - -func (bs *Storage) ForEachChange(ctx context.Context, eid EntityID, fn func(c cid.Cid, ch Change) error) (err error) { - conn, release, err := bs.db.Conn(ctx) - if err != nil { - return err - } - defer release() - - defer sqlitex.Save(conn)(&err) - - edb, err := hypersql.EntitiesLookupID(conn, string(eid)) - if err != nil { - return err - } - if edb.ResourcesID == 0 { - return status.Errorf(codes.NotFound, "entity %q not found", eid) - } - - changes, err := hypersql.ChangesListForEntity(conn, string(eid)) - if err != nil { - return err - } - - buf := make([]byte, 0, 1024*1024) // preallocating 1MB for decompression. - for _, change := range changes { - buf, err = bs.bs.decoder.DecodeAll(change.BlobsData, buf) - if err != nil { - return err - } - - chcid := cid.NewCidV1(uint64(change.BlobsCodec), change.BlobsMultihash) - var ch Change - if err := cbornode.DecodeInto(buf, &ch); err != nil { - return fmt.Errorf("forEachChange: failed to decode change %s for entity %s: %w", chcid, eid, err) - } - - if err := fn(chcid, ch); err != nil { - return err - } - - buf = buf[:0] // reset the slice reusing the backing array - } - - return nil -} - -// LoadEntityAll loads the entity with all the changes. -// -// TODO(burdiyan): DRY out all the loading methods. -func (bs *Storage) LoadEntityAll(ctx context.Context, eid EntityID) (e *Entity, err error) { - conn, release, err := bs.db.Conn(ctx) - if err != nil { - return nil, err - } - defer release() - - defer sqlitex.Save(conn)(&err) - - edb, err := hypersql.EntitiesLookupID(conn, string(eid)) - if err != nil { - return nil, err - } - if edb.ResourcesID == 0 { - return nil, status.Errorf(codes.NotFound, "entity %q not found", eid) - } - - entity := NewEntity(eid) - buf := make([]byte, 0, 1024*1024) // preallocating 1MB for decompression. - if err := sqlitex.Exec(conn, qLoadEntityAll(), func(stmt *sqlite.Stmt) error { - var ( - codec = stmt.ColumnInt64(0) - hash = stmt.ColumnBytesUnsafe(1) - data = stmt.ColumnBytesUnsafe(2) - ) - - buf, err = bs.bs.decoder.DecodeAll(data, buf) - if err != nil { - return err - } - - c := cid.NewCidV1(uint64(codec), hash) - var ch Change - if err := cbornode.DecodeInto(buf, &ch); err != nil { - return fmt.Errorf("loadEntity: failed to decode change %q for entity %q: %w", c, eid, err) - } - - if err := entity.ApplyChange(c, ch); err != nil { - return err - } - - // Reset the slice to reuse the underlying array for the next decompression. - buf = buf[:0] - return nil - }, edb.ResourcesID); err != nil { - return nil, err - } - // TODO(burdiyan): this is not a great way to handle not found errors. - // But in a lot of places we rely on that behavior, which was more of an accident. - // Need to clean up at some point. - if len(entity.changes) == 0 { - return nil, nil - } - - return entity, nil -} - -var qLoadEntityAll = dqb.Str(` - SELECT - blobs.codec, - blobs.multihash, - blobs.data - FROM structural_blobs - JOIN blobs ON blobs.id = structural_blobs.id - LEFT JOIN drafts ON drafts.resource = structural_blobs.resource AND drafts.blob = structural_blobs.id - WHERE structural_blobs.type = 'Change' - AND structural_blobs.resource = :entity - AND drafts.blob IS NULL - ORDER BY structural_blobs.ts; -`) - -// LoadEntity from the database. If not found returns nil result and nil error. -// It returns the latest version as per the owner of the entity. -func (bs *Storage) LoadEntity(ctx context.Context, eid EntityID) (e *Entity, err error) { - conn, release, err := bs.db.Conn(ctx) - if err != nil { - return nil, err - } - defer release() - - defer sqlitex.Save(conn)(&err) - - edb, err := hypersql.EntitiesLookupID(conn, string(eid)) - if err != nil { - return nil, err - } - if edb.ResourcesID == 0 { - return nil, status.Errorf(codes.NotFound, "entity %q not found", eid) - } - - entity := NewEntity(eid) - buf := make([]byte, 0, 1024*1024) // preallocating 1MB for decompression. - if err := sqlitex.Exec(conn, qLoadEntity(), func(stmt *sqlite.Stmt) error { - var ( - codec = stmt.ColumnInt64(0) - hash = stmt.ColumnBytesUnsafe(1) - data = stmt.ColumnBytesUnsafe(2) - ) - - buf, err = bs.bs.decoder.DecodeAll(data, buf) - if err != nil { - return err - } - - c := cid.NewCidV1(uint64(codec), hash) - var ch Change - if err := cbornode.DecodeInto(buf, &ch); err != nil { - return fmt.Errorf("loadEntity: failed to decode change %q for entity %q: %w", c, eid, err) - } - - if err := entity.ApplyChange(c, ch); err != nil { - return err - } - - // Reset the slice to reuse the underlying array for the next decompression. - buf = buf[:0] - return nil - }, edb.ResourcesID); err != nil { - return nil, err - } - // TODO(burdiyan): this is not a great way to handle not found errors. - // But in a lot of places we rely on that behavior, which was more of an accident. - // Need to clean up at some point. - if len(entity.changes) == 0 { - return nil, nil - } - - return entity, nil -} - -// In this query we first collect the blobs authored by the owner of the entity, -// then resolve their transitive dependencies, -// and then we finally join with the actual blob data. -var qLoadEntity = dqb.Str(` - WITH RECURSIVE selected (id) AS ( - SELECT structural_blobs.id - FROM structural_blobs - JOIN resources ON structural_blobs.resource = resources.id - LEFT JOIN drafts ON drafts.resource = structural_blobs.resource AND drafts.blob = structural_blobs.id - WHERE structural_blobs.type = 'Change' - AND structural_blobs.resource = :entity - AND structural_blobs.author = resources.owner - AND drafts.blob IS NULL - UNION - SELECT change_deps.parent - FROM selected - JOIN change_deps ON change_deps.child = selected.id - ) - SELECT - blobs.codec, - blobs.multihash, - blobs.data - FROM selected - -- Using cross join here to force the query planner to use the primary index on blobs. - -- Otherwise, for some reason the query planner chooses to scan over all the blobs table - -- probably because it assumes that the CTE is large and doesn't have indexes. - -- But that's OK in this case, because it's a recursive query that we'll have to scan entirely anyway. - CROSS JOIN blobs ON blobs.id = selected.id - JOIN structural_blobs ON structural_blobs.id = selected.id - ORDER BY structural_blobs.ts; -`) - -type Draft struct { - *Entity - CID cid.Cid - Change Change -} - -func (bs *Storage) LoadDraft(ctx context.Context, eid EntityID) (*Draft, error) { - // load draft change - c, err := bs.FindDraft(ctx, eid) - if err != nil { - return nil, err - } - - var ch Change - if err := bs.LoadBlob(ctx, c, &ch); err != nil { - return nil, err - } - - var entity *Entity - if len(ch.Deps) == 0 { - entity = NewEntity(eid) - } else { - e, err := bs.LoadEntityFromHeads(ctx, eid, ch.Deps...) - if err != nil { - return nil, err - } - entity = e - } - - if entity == nil { - return nil, nil - } - - if err := entity.maxClock.Track(ch.HLCTime); err != nil { - return nil, err - } - - return &Draft{ - Entity: entity, - CID: c, - Change: ch, - }, nil -} - -// LoadDraftEntity includes draft changes. -func (bs *Storage) LoadDraftEntity(ctx context.Context, eid EntityID) (*Entity, error) { - draft, err := bs.FindDraft(ctx, eid) - if err != nil { - return nil, err - } - - return bs.LoadEntityFromHeads(ctx, eid, draft) -} - -// FindDraft for a given entity. -func (bs *Storage) FindDraft(ctx context.Context, eid EntityID) (cid.Cid, error) { - conn, release, err := bs.db.Conn(ctx) - if err != nil { - return cid.Undef, err - } - defer release() - - res, err := hypersql.DraftsGet(conn, string(eid)) - if err != nil { - return cid.Undef, err - } - if res.DraftsViewBlobID == 0 { - return cid.Undef, fmt.Errorf("no draft for entity %s", eid) - } - - return cid.NewCidV1(uint64(res.DraftsViewCodec), res.DraftsViewMultihash), nil -} - -// LoadEntityFromHeads returns the loaded entity at a given "version" corresponding to the provided HEAD changes. -func (bs *Storage) LoadEntityFromHeads(ctx context.Context, eid EntityID, heads ...cid.Cid) (e *Entity, err error) { - if len(heads) == 0 { - return nil, fmt.Errorf("must specify heads to load: %s", eid) - } - - conn, release, err := bs.db.Conn(ctx) - if err != nil { - return nil, err - } - defer release() - - defer sqlitex.Save(conn)(&err) - - dbheads := make([]int64, 0, len(heads)) - for _, c := range heads { - res, err := hypersql.BlobsGetSize(conn, c.Hash()) - if err != nil { - return nil, err - } - if res.BlobsID == 0 || res.BlobsSize < 0 { - return nil, status.Errorf(codes.NotFound, "no such head %s for entity %s", c, eid) - } - dbheads = append(dbheads, res.BlobsID) - } - - if len(dbheads) != len(heads) { - return nil, fmt.Errorf("couldn't resolve all the heads %v for entity %s", heads, eid) - } - - jsonheads, err := json.Marshal(dbheads) - if err != nil { - return nil, err - } - - return bs.loadFromHeads(conn, eid, localHeads(strbytes.String(jsonheads))) -} - -// localHeads is a JSON-encoded array of integers corresponding to heads. -type localHeads string - -func (bs *Storage) loadFromHeads(conn *sqlite.Conn, eid EntityID, heads localHeads) (e *Entity, err error) { - if heads == "" || heads == "null" { - heads = "[]" - } - - cset, err := hypersql.ChangesResolveHeads(conn, string(heads)) - if err != nil { - return nil, err - } - - changes, err := hypersql.ChangesListFromChangeSet(conn, cset.ResolvedJSON, string(eid)) - if err != nil { - return nil, err - } - if len(changes) == 0 { - return nil, nil - } - - entity := NewEntity(eid) - buf := make([]byte, 0, 1024*1024) // preallocating 1MB for decompression. - for _, change := range changes { - buf, err = bs.bs.decoder.DecodeAll(change.StructuralBlobsViewData, buf) - if err != nil { - return nil, err - } - - chcid := cid.NewCidV1(uint64(change.StructuralBlobsViewCodec), change.StructuralBlobsViewMultihash) - var ch Change - if err := cbornode.DecodeInto(buf, &ch); err != nil { - return nil, fmt.Errorf("loadFromHeads: failed to decode change %s for entity %s: %w", chcid, eid, err) - } - - if err := entity.ApplyChange(chcid, ch); err != nil { - return nil, err - } - - buf = buf[:0] // reset the slice reusing the backing array - } - - return entity, nil -} - -// ParsedBlob is a decoded IPLD blob. -type ParsedBlob[T any] struct { - CID cid.Cid - Data T -} - -// NewUnforgeableID creates a new random ID that is verifiable with the author's public key. -// It return the ID and the nonce. The nonce argument can be nil in which case a new nonce will be created. -// Otherwise the same nonce will be returned. -func NewUnforgeableID(prefix string, author core.Principal, nonce []byte, ts int64) (string, []byte) { - const hashSize = 22 - - if nonce == nil { - nonce = make([]byte, 16) - _, err := rand.Read(nonce) - if err != nil { - panic(err) - } - } - - h := sha256.New() - if _, err := h.Write(author); err != nil { - panic(err) - } - if _, err := h.Write(nonce); err != nil { - panic(err) - } - - var buf [8]byte - binary.BigEndian.PutUint64(buf[:], uint64(ts)) - - if _, err := h.Write(buf[:]); err != nil { - panic(err) - } - - dig := h.Sum(nil) - base, err := multibase.Encode(multibase.Base58BTC, dig) - if err != nil { - panic(err) - } - - // Using last [hashSize] characters to avoid multibase prefix, - // and reduce the size of the resulting ID. - // We don't use full hash digest here, to make our IDs shorter. - // But it should have enough collision resistance for our purpose. - return prefix + base[len(base)-hashSize:], nonce -} - -func verifyUnforgeableID(id EntityID, prefix int, owner core.Principal, nonce []byte, ts int64) error { - id2, _ := NewUnforgeableID(string(id[:prefix]), owner, nonce, ts) - if id2 != string(id) { - return fmt.Errorf("failed to verify unforgeable ID want=%q got=%q", id, id2) - } - - return nil -} diff --git a/backend/hyper/hyper.go b/backend/hyper/hyper.go deleted file mode 100644 index 4a33a4c0..00000000 --- a/backend/hyper/hyper.go +++ /dev/null @@ -1,727 +0,0 @@ -// Package hyper implements Seed Hypermedia System. -package hyper - -import ( - "context" - "errors" - "fmt" - "seed/backend/core" - "seed/backend/hyper/hypersql" - "seed/backend/ipfs" - "seed/backend/util/dqb" - "seed/backend/util/must" - - "crawshaw.io/sqlite" - "crawshaw.io/sqlite/sqlitex" - "github.com/fxamacker/cbor/v2" - lru "github.com/hashicorp/golang-lru/v2" - "github.com/ipfs/boxo/blockstore" - blocks "github.com/ipfs/go-block-format" - "github.com/ipfs/go-cid" - cbornode "github.com/ipfs/go-ipld-cbor" - format "github.com/ipfs/go-ipld-format" - dagpb "github.com/ipld/go-codec-dagpb" - "github.com/multiformats/go-multicodec" - "go.uber.org/zap" -) - -var ( - // ErrEntityNotFound used when looking for an entity not present in the database. - ErrEntityNotFound = errors.New("entity not found") -) - -// BlobType is a named type for Seed Terra Blobs. -type BlobType string - -// Storage is an indexing blob storage. -type Storage struct { - db *sqlitex.Pool - bs *indexingBlockStore - log *zap.Logger - - delegationCache *lru.Cache[cid.Cid, core.Principal] - *indexer -} - -// NewStorage creates a new blob storage. -func NewStorage(db *sqlitex.Pool, log *zap.Logger) *Storage { - bs := newBlockstore(db) - - idx := &indexer{ - db: db, - log: log, - bs: bs, - } - - return &Storage{ - db: db, - bs: &indexingBlockStore{blockStore: bs, indexBlob: idx.indexBlob}, - log: log, - indexer: idx, - delegationCache: must.Do2(lru.New[cid.Cid, core.Principal](256)), - } -} - -// Query allows to execute raw SQLite queries. -func (bs *Storage) Query(ctx context.Context, fn func(conn *sqlite.Conn) error) (err error) { - conn, release, err := bs.db.Conn(ctx) - if err != nil { - return err - } - defer release() - - // TODO(burdiyan): make the main database read-only. - // This is commented because we want to allow writing into an attached in-memory database - // while keeping the main database read-only. Apparently this is not possible in SQLite. - // There're a bunch of other ways to achieve this but there's currently no time for implementing them. - // - // if err := sqlitex.ExecTransient(conn, "PRAGMA query_only = on;", nil); err != nil { - // return err - // } - // defer func() { - // err = multierr.Combine(err, sqlitex.ExecTransient(conn, "PRAGMA query_only = off;", nil)) - // }() - - return fn(conn) -} - -// SaveBlob into the internal storage. Index if necessary. -func (bs *Storage) SaveBlob(ctx context.Context, blob Blob) error { - conn, release, err := bs.db.Conn(ctx) - if err != nil { - return err - } - defer release() - - return sqlitex.WithTx(conn, func() error { - _, err := bs.saveBlob(conn, blob) - return err - }) -} - -// saveBlob must be called within a transaction. -func (bs *Storage) saveBlob(conn *sqlite.Conn, blob Blob) (id int64, err error) { - codec, hash := ipfs.DecodeCID(blob.CID) - id, exists, err := bs.bs.putBlock(conn, 0, uint64(codec), hash, blob.Data) - if err != nil { - return 0, err - } - - // No need to index if exists. - if exists { - return id, nil - } - - if err := bs.indexBlob(conn, id, blob.CID, blob.Decoded); err != nil { - return 0, fmt.Errorf("failed to index blob %s: %w", blob.CID, err) - } - - return id, nil -} - -// SetAccountTrust sets an account to trusted. -func (bs *Storage) SetAccountTrust(ctx context.Context, acc []byte) error { - conn, release, err := bs.db.Conn(ctx) - if err != nil { - return err - } - defer release() - - return sqlitex.WithTx(conn, func() error { - return hypersql.SetAccountTrust(conn, acc) - }) -} - -// UnsetAccountTrust untrust the provided account. -func (bs *Storage) UnsetAccountTrust(ctx context.Context, acc []byte) error { - conn, release, err := bs.db.Conn(ctx) - if err != nil { - return err - } - defer release() - - return sqlitex.WithTx(conn, func() error { - return hypersql.UnsetAccountTrust(conn, acc) - }) -} - -func (bs *Storage) SaveDraftBlob(ctx context.Context, eid EntityID, blob Blob) error { - conn, release, err := bs.db.Conn(ctx) - if err != nil { - return err - } - defer release() - - return sqlitex.WithTx(conn, func() error { - id, err := bs.saveBlob(conn, blob) - if err != nil { - return err - } - - if id == 0 { - panic("BUG: saveDraft: didn't save draft blob for some reason") - } - - resp, err := hypersql.EntitiesLookupID(conn, string(eid)) - if err != nil { - return err - } - if resp.ResourcesID == 0 { - panic("BUG: saveDraft: failed to lookup entity after inserting the blob") - } - - return hypersql.DraftsInsert(conn, resp.ResourcesID, id) - }) -} - -// ListEntities returns a list of entities matching the pattern. -func (bs *Storage) ListEntities(ctx context.Context, pattern string) ([]EntityID, error) { - conn, release, err := bs.db.Conn(ctx) - if err != nil { - return nil, err - } - defer release() - - resp, err := hypersql.EntitiesListByPrefix(conn, pattern) - if err != nil { - return nil, err - } - if len(resp) == 0 { - return nil, nil - } - - out := make([]EntityID, len(resp)) - for i, r := range resp { - out[i] = EntityID(r.ResourcesIRI) - } - - return out, nil -} - -// ListTrustedEntities returns a list of entities matching the pattern owned by trusted accounts. -func (bs *Storage) ListTrustedEntities(ctx context.Context, pattern string) ([]EntityID, error) { - conn, release, err := bs.db.Conn(ctx) - if err != nil { - return nil, err - } - defer release() - - var out []EntityID - if err := sqlitex.Exec(conn, qListTrustedEntitites(), func(stmt *sqlite.Stmt) error { - out = append(out, EntityID(stmt.ColumnText(0))) - return nil - }, pattern); err != nil { - return nil, err - } - - return out, nil -} - -var qListTrustedEntitites = dqb.Str(` - SELECT resources.iri - FROM trusted_accounts - JOIN resources ON resources.owner = trusted_accounts.id - WHERE resources.iri GLOB :prefix - ORDER BY resources.id -`) - -func (bs *Storage) GetDraft(ctx context.Context, eid EntityID) (ch Change, err error) { - conn, release, err := bs.db.Conn(ctx) - if err != nil { - return ch, err - } - defer release() - - res, err := hypersql.DraftsGet(conn, string(eid)) - if err != nil { - return ch, err - } - if res.DraftsViewBlobID == 0 { - return ch, fmt.Errorf("no draft for entity %s", eid) - } - - if err := bs.LoadBlob(ctx, cid.NewCidV1(uint64(res.DraftsViewCodec), res.DraftsViewMultihash), &ch); err != nil { - return ch, err - } - - return ch, nil -} - -// PublishBlob publishes a blob. -func (bs *Storage) PublishBlob(ctx context.Context, c cid.Cid) (cid.Cid, error) { - conn, release, err := bs.db.Conn(ctx) - if err != nil { - return cid.Undef, err - } - defer release() - - var out cid.Cid - if err := sqlitex.WithTx(conn, func() error { - newID, err := allocateBlobID(conn) - if err != nil { - return err - } - - res, err := hypersql.BlobsGet(conn, c.Hash()) - if err != nil { - return err - } - - if err := sqlitex.Exec(conn, qBlobsTouch(), nil, newID, res.BlobsID); err != nil { - return err - } - - out = cid.NewCidV1(uint64(res.BlobsCodec), res.BlobsMultihash) - - return nil - }); err != nil { - return cid.Undef, err - } - - if !out.Defined() { - return cid.Undef, fmt.Errorf("BUG: got draft without CID") - } - - return out, nil -} - -// PublishDraft publishes a draft. -func (bs *Storage) PublishDraft(ctx context.Context, eid EntityID) (cid.Cid, error) { - conn, release, err := bs.db.Conn(ctx) - if err != nil { - return cid.Undef, err - } - defer release() - - var out cid.Cid - if err := sqlitex.WithTx(conn, func() error { - res, err := hypersql.DraftsGet(conn, string(eid)) - if err != nil { - return err - } - if res.DraftsViewBlobID == 0 { - return fmt.Errorf("no draft to publish for entity %s", eid) - } - - if err := hypersql.DraftsDelete(conn, res.DraftsViewBlobID); err != nil { - return err - } - - newID, err := allocateBlobID(conn) - if err != nil { - return err - } - - if err := sqlitex.Exec(conn, qBlobsTouch(), nil, newID, res.DraftsViewBlobID); err != nil { - return err - } - - out = cid.NewCidV1(uint64(res.DraftsViewCodec), res.DraftsViewMultihash) - - return nil - }); err != nil { - return cid.Undef, err - } - - if !out.Defined() { - return cid.Undef, fmt.Errorf("BUG: got draft without CID") - } - - return out, nil -} - -var qBlobsTouch = dqb.Str(` - UPDATE blobs - SET id = :newID - WHERE id = :oldID; -`) - -func (bs *Storage) DeleteDraft(ctx context.Context, eid EntityID) error { - conn, release, err := bs.db.Conn(ctx) - if err != nil { - return err - } - defer release() - - return sqlitex.WithTx(conn, func() error { - res, err := hypersql.DraftsGet(conn, string(eid)) - if err != nil { - return err - } - if res.DraftsViewBlobID == 0 { - return fmt.Errorf("no draft to publish for entity %s", eid) - } - - if err := hypersql.DraftsDelete(conn, res.DraftsViewBlobID); err != nil { - return err - } - - _, err = hypersql.BlobsDelete(conn, res.DraftsViewMultihash) - if err != nil { - return err - } - - // Trying to delete the entity. It will fail if there're more changes left for it. - err = hypersql.EntitiesDelete(conn, string(eid)) - if sqlite.ErrCode(err) == sqlite.SQLITE_CONSTRAINT_FOREIGNKEY { - return nil - } - return err - }) -} - -// DeleteEntity deletes an entity from the database. -func (bs *Storage) DeleteEntity(ctx context.Context, eid EntityID) error { - conn, release, err := bs.db.Conn(ctx) - if err != nil { - return err - } - defer release() - - return sqlitex.WithTx(conn, func() error { - edb, err := hypersql.EntitiesLookupID(conn, string(eid)) - if err != nil { - return fmt.Errorf("%w. problem with the query: %s", ErrEntityNotFound, err.Error()) - } - if edb.ResourcesID == 0 { - return fmt.Errorf("%w: %s", ErrEntityNotFound, eid) - } - - if err := hypersql.ChangesDeleteForEntity(conn, edb.ResourcesID); err != nil { - return err - } - - return hypersql.EntitiesDelete(conn, string(eid)) - - }) -} - -func (bs *Storage) ReplaceDraftBlob(ctx context.Context, eid EntityID, old cid.Cid, blob Blob) error { - if !old.Defined() { - return fmt.Errorf("BUG: can't replace: old CID is not defined") - } - - conn, release, err := bs.db.Conn(ctx) - if err != nil { - return err - } - defer release() - - return sqlitex.WithTx(conn, func() error { - oldid, err := bs.bs.deleteBlock(conn, old) - if err != nil { - return err - } - - codec, hash := ipfs.DecodeCID(blob.CID) - - id, exists, err := bs.bs.putBlock(conn, oldid, uint64(codec), hash, blob.Data) - if err != nil { - return fmt.Errorf("replace draft blob error when insert: %w", err) - } - - // No need to index if exists. - if exists { - return nil - } - - if err := bs.indexBlob(conn, id, blob.CID, blob.Decoded); err != nil { - return fmt.Errorf("failed to index blob %s: %w", blob.CID, err) - } - - resp, err := hypersql.EntitiesLookupID(conn, string(eid)) - if err != nil { - return err - } - if resp.ResourcesID == 0 { - panic("BUG: replaceDraft: failed to lookup entity after inserting the blob") - } - - return hypersql.DraftsInsert(conn, resp.ResourcesID, id) - }) -} - -func (bs *Storage) LoadBlob(ctx context.Context, c cid.Cid, v any) error { - codec, _ := ipfs.DecodeCID(c) - if codec != uint64(multicodec.DagCbor) { - return fmt.Errorf("TODO: can't load non-cbor blobs") - } - - blk, err := bs.bs.Get(ctx, c) - if err != nil { - return err - } - - if err := cbornode.DecodeInto(blk.RawData(), v); err != nil { - return fmt.Errorf("failed to decode CBOR blob %s: %w", c, err) - } - - return nil -} - -// GetDelegationIssuer returns the issuer of the given key delegation. -func (bs *Storage) GetDelegationIssuer(ctx context.Context, c cid.Cid) (core.Principal, error) { - if v, ok := bs.delegationCache.Get(c); ok { - return v, nil - } - - conn, release, err := bs.db.Conn(ctx) - if err != nil { - return nil, err - } - defer release() - - iss, err := hypersql.KeyDelegationsGetIssuer(conn, c.Hash()) - if err != nil { - return nil, err - } - if iss.KeyDelegationsIssuer == 0 { - return nil, fmt.Errorf("key delegation %s not found", c) - } - - pk, err := hypersql.PublicKeysLookupPrincipal(conn, iss.KeyDelegationsIssuer) - if err != nil { - return nil, err - } - if pk.PublicKeysPrincipal == nil { - return nil, fmt.Errorf("BUG: public key not found for issuer of %s", c) - } - - bs.delegationCache.Add(c, pk.PublicKeysPrincipal) - - return pk.PublicKeysPrincipal, nil -} - -type IPFSBlockstoreReader interface { - Has(context.Context, cid.Cid) (bool, error) - Get(context.Context, cid.Cid) (blocks.Block, error) - GetSize(context.Context, cid.Cid) (int, error) -} - -func (bs *Storage) IPFSBlockstoreReader() IPFSBlockstoreReader { - return bs.bs -} - -func (bs *Storage) IPFSBlockstore() blockstore.Blockstore { - return bs.bs -} - -// Blob is a structural artifact. -type Blob struct { - CID cid.Cid - Data []byte - Decoded any -} - -// EncodeBlob produces a Blob from any object. -func EncodeBlob(v any) (hb Blob, err error) { - data, err := cbornode.DumpObject(v) - if err != nil { - return hb, fmt.Errorf("failed to encode blob %T: %w", v, err) - } - - blk := ipfs.NewBlock(uint64(multicodec.DagCbor), data) - c := blk.Cid() - - return Blob{ - CID: c, - Data: data, - Decoded: v, - }, nil -} - -var errNotHyperBlob = errors.New("not a hyper blob") - -// DecodeBlob attempts to infer hyper Blob information from arbitrary IPFS block. -func DecodeBlob(c cid.Cid, data []byte) (hb Blob, err error) { - codec := c.Prefix().Codec - - switch multicodec.Code(codec) { - case multicodec.DagPb: - b := dagpb.Type.PBNode.NewBuilder() - if err := dagpb.DecodeBytes(b, data); err != nil { - return hb, fmt.Errorf("failed to decode dagpb node %s: %w", c, err) - } - - hb.Decoded = b.Build() - case multicodec.DagCbor: - var v struct { - Type string `cbor:"@type"` - } - if err := cbor.Unmarshal(data, &v); err != nil { - return hb, fmt.Errorf("failed to infer hyper blob %s: %w", c, err) - } - - switch BlobType(v.Type) { - case TypeKeyDelegation: - var v KeyDelegation - if err := cbornode.DecodeInto(data, &v); err != nil { - return hb, err - } - hb.Decoded = v - case TypeChange: - var v Change - if err := cbornode.DecodeInto(data, &v); err != nil { - return hb, err - } - hb.Decoded = v - case TypeComment: - var v Comment - if err := cbornode.DecodeInto(data, &v); err != nil { - return hb, err - } - hb.Decoded = v - default: - return hb, fmt.Errorf("unknown hyper blob type: '%s'", v.Type) - } - default: - return hb, fmt.Errorf("%s: %w", c, errNotHyperBlob) - } - - hb.CID = c - hb.Data = data - - return hb, nil -} - -type indexingBlockStore struct { - *blockStore - indexBlob func(conn *sqlite.Conn, id int64, c cid.Cid, blob any) error -} - -// The following Get methods are wrapped to make sure -// we can respond to BitSwap requests asking for our CID-encoded Entity IDs. - -func (b *indexingBlockStore) Get(ctx context.Context, c cid.Cid) (blocks.Block, error) { - eid, err := EntityIDFromCID(c) - if err != nil { - return b.blockStore.Get(ctx, c) - } - - ok, err := b.checkEntityExists(ctx, eid) - if err != nil { - return nil, err - } - - if !ok { - return nil, format.ErrNotFound{Cid: c} - } - - return blocks.NewBlockWithCid(nil, c) -} - -func (b *indexingBlockStore) GetSize(ctx context.Context, c cid.Cid) (int, error) { - eid, err := EntityIDFromCID(c) - if err != nil { - return b.blockStore.GetSize(ctx, c) - } - - ok, err := b.checkEntityExists(ctx, eid) - if err != nil { - return 0, err - } - - if !ok { - return 0, format.ErrNotFound{Cid: c} - } - - return 0, nil -} - -func (b *indexingBlockStore) Has(ctx context.Context, c cid.Cid) (bool, error) { - eid, err := EntityIDFromCID(c) - if err != nil { - return b.blockStore.Has(ctx, c) - } - - ok, err := b.checkEntityExists(ctx, eid) - if err != nil { - return false, err - } - - return ok, nil -} - -func (b *indexingBlockStore) checkEntityExists(ctx context.Context, eid EntityID) (exists bool, err error) { - conn, release, err := b.db.Conn(ctx) - if err != nil { - return false, err - } - defer release() - - res, err := hypersql.EntitiesLookupID(conn, string(eid)) - if err != nil { - return false, err - } - - if res.ResourcesID == 0 { - return false, nil - } - - return hypersql.CheckEntityHasChanges(conn, res.ResourcesID) -} - -// The following methods are wrapped -// to make sure all the blobs -// coming into the blockstore -// from the outside are indexed. - -func (b *indexingBlockStore) Put(ctx context.Context, block blocks.Block) error { - conn, release, err := b.db.Conn(ctx) - if err != nil { - return err - } - defer release() - - return sqlitex.WithTx(conn, func() error { - codec, hash := ipfs.DecodeCID(block.Cid()) - id, exists, err := b.putBlock(conn, 0, codec, hash, block.RawData()) - if err != nil { - return err - } - - if exists || !isIndexable(multicodec.Code(codec)) { - return nil - } - - hb, err := DecodeBlob(block.Cid(), block.RawData()) - if err != nil { - return err - } - return b.indexBlob(conn, id, hb.CID, hb.Decoded) - }) -} - -// PutMany implements blockstore.Blockstore interface. -func (b *indexingBlockStore) PutMany(ctx context.Context, blocks []blocks.Block) error { - conn, release, err := b.db.Conn(ctx) - if err != nil { - return err - } - defer release() - - return sqlitex.WithTx(conn, func() error { - for _, blk := range blocks { - codec, hash := ipfs.DecodeCID(blk.Cid()) - id, exists, err := b.putBlock(conn, 0, codec, hash, blk.RawData()) - if err != nil { - return err - } - - if exists || !isIndexable(multicodec.Code(codec)) { - continue - } - - hb, err := DecodeBlob(blk.Cid(), blk.RawData()) - if err != nil { - return err - } - - if err := b.indexBlob(conn, id, hb.CID, hb.Decoded); err != nil { - return err - } - } - - return nil - }) -} diff --git a/backend/hyper/indexing.go b/backend/hyper/indexing.go deleted file mode 100644 index 28993a46..00000000 --- a/backend/hyper/indexing.go +++ /dev/null @@ -1,762 +0,0 @@ -package hyper - -import ( - "bytes" - "context" - "encoding/json" - "fmt" - "net/url" - "seed/backend/core" - documents "seed/backend/genproto/documents/v1alpha" - groups "seed/backend/genproto/groups/v1alpha" - "seed/backend/hyper/hypersql" - "seed/backend/storage" - "strings" - "time" - - "crawshaw.io/sqlite" - "crawshaw.io/sqlite/sqlitex" - "github.com/ipfs/go-cid" - cbornode "github.com/ipfs/go-ipld-cbor" - dagpb "github.com/ipld/go-codec-dagpb" - "github.com/ipld/go-ipld-prime" - cidlink "github.com/ipld/go-ipld-prime/linking/cid" - "github.com/ipld/go-ipld-prime/traversal" - "github.com/multiformats/go-multicodec" - "go.uber.org/zap" - "golang.org/x/exp/slices" - "google.golang.org/protobuf/encoding/protojson" -) - -const idPrefixLen = len("hm://x/") // common prefix length for all the Entities with unforgeable IDs. - -type indexer struct { - db *sqlitex.Pool - log *zap.Logger - bs *blockStore -} - -// Reindex forces deletes all the information derived from the blobs and reindexes them. -func (bs *indexer) Reindex(ctx context.Context) (err error) { - conn, release, err := bs.db.Conn(ctx) - if err != nil { - return err - } - defer release() - - return bs.reindex(conn) -} - -func (bs *indexer) reindex(conn *sqlite.Conn) (err error) { - start := time.Now() - bs.log.Info("ReindexingStarted") - defer func() { - bs.log.Info("ReindexingFinished", zap.Error(err), zap.Duration("duration", time.Since(start))) - }() - - // Order is important to ensure foreign key constraints are not violated. - derivedTables := []string{ - storage.T_BlobLinks, - storage.T_ResourceLinks, - storage.T_StructuralBlobs, - storage.T_GroupSites, - storage.T_KeyDelegations, - // Not deleting from resources yet, because they are referenced in the drafts table, - // and we can't yet reconstruct the drafts table purely from the blobs. - // storage.T_Resources, - } - - const q = "SELECT * FROM " + storage.T_Blobs - - if err := sqlitex.WithTx(conn, func() error { - for _, table := range derivedTables { - if err := sqlitex.ExecTransient(conn, "DELETE FROM "+table, nil); err != nil { - return err - } - } - - buf := make([]byte, 0, 1024*1024) // 1MB preallocated slice to reuse for decompressing. - if err := sqlitex.ExecTransient(conn, q, func(stmt *sqlite.Stmt) error { - codec := stmt.ColumnInt64(stmt.ColumnIndex(storage.BlobsCodec.ShortName())) - - if !isIndexable(multicodec.Code(codec)) { - return nil - } - - id := stmt.ColumnInt64(stmt.ColumnIndex(storage.BlobsID.ShortName())) - hash := stmt.ColumnBytes(stmt.ColumnIndex(storage.BlobsMultihash.ShortName())) - size := stmt.ColumnInt(stmt.ColumnIndex(storage.BlobsSize.ShortName())) - data := stmt.ColumnBytesUnsafe(stmt.ColumnIndex(storage.BlobsData.ShortName())) - // We have to skip blobs we know the hashes of but we don't have the data. - // Also the blobs that are inline (data stored in the hash itself) because we don't index them ever. - // TODO(burdiyan): filter the select query to avoid fetching these blobs in the first place. - if size <= 0 { - return nil - } - - buf = buf[:0] - buf = slices.Grow(buf, size) - buf, err = bs.bs.decoder.DecodeAll(data, buf) - if err != nil { - return fmt.Errorf("failed to decompress block: %w", err) - } - - c := cid.NewCidV1(uint64(codec), hash) - hb, err := DecodeBlob(c, buf) - if err != nil { - bs.log.Warn("failed to decode blob for reindexing", zap.Error(err), zap.String("cid", c.String())) - return nil - } - - return bs.indexBlob(conn, id, hb.CID, hb.Decoded) - }); err != nil { - return err - } - - return hypersql.SetReindexTime(conn, time.Now().UTC().String()) - }); err != nil { - return err - } - - return nil -} - -// MaybeReindex will trigger reindexing if it's needed. -func (bs *indexer) MaybeReindex(ctx context.Context) error { - conn, release, err := bs.db.Conn(ctx) - if err != nil { - return err - } - defer release() - - res, err := hypersql.GetReindexTime(conn) - if err != nil { - return err - } - - if res.KVValue == "" { - return bs.reindex(conn) - } - - return nil -} - -// indexBlob is an uber-function that knows about all types of blobs we want to index. -// This is probably a bad idea to put here, but for now it's easier to work with that way. -// TODO(burdiyan): eventually we might want to make this package agnostic to blob types. -func (bs *indexer) indexBlob(conn *sqlite.Conn, id int64, c cid.Cid, blobData any) error { - idx := newCtx(conn) - - switch v := blobData.(type) { - case ipld.Node: - return bs.indexDagPB(idx, id, c, v) - case KeyDelegation: - return bs.indexKeyDelegation(idx, id, c, v) - case Change: - return bs.indexChange(idx, id, c, v) - case Comment: - return bs.indexComment(idx, id, c, v) - } - - return nil -} - -func (bs *indexer) indexDagPB(idx *indexingCtx, id int64, c cid.Cid, v ipld.Node) error { - sb := newSimpleStructuralBlob(c, string(TypeDagPB)) - - if err := traversal.WalkLocal(v, func(prog traversal.Progress, n ipld.Node) error { - pblink, ok := n.(dagpb.PBLink) - if !ok { - return nil - } - - target, ok := pblink.Hash.Link().(cidlink.Link) - if !ok { - return fmt.Errorf("link is not CID: %v", pblink.Hash) - } - - linkType := "dagpb/chunk" - if pblink.Name.Exists() { - if name := pblink.Name.Must().String(); name != "" { - linkType = "dagpb/" + name - } - } - - sb.AddBlobLink(linkType, target.Cid) - return nil - }); err != nil { - return err - } - - return idx.SaveBlob(id, sb) -} - -func (bs *indexer) indexKeyDelegation(idx *indexingCtx, id int64, c cid.Cid, v KeyDelegation) error { - // Validate key delegation. - { - if v.Purpose != DelegationPurposeRegistration { - return fmt.Errorf("unknown key delegation purpose %q", v.Purpose) - } - - if _, err := v.Issuer.Libp2pKey(); err != nil { - return fmt.Errorf("key delegation issuer is not a valid libp2p public key: %w", err) - } - - if _, err := v.Delegate.Libp2pKey(); err != nil { - return fmt.Errorf("key delegation delegate is not a valid libp2p public key: %w", err) - } - } - - issuerProfile := IRI("hm://a/" + v.Issuer.String()) - - sb := newStructuralBlob(c, string(TypeKeyDelegation), v.Issuer, v.IssueTime, issuerProfile, v.Issuer, time.Time{}) - - sb.AddResourceLink("kd/issuer", issuerProfile, false, nil) - - iss, err := idx.ensurePubKey(v.Issuer) - if err != nil { - return err - } - - del, err := idx.ensurePubKey(v.Delegate) - if err != nil { - return err - } - - if err := hypersql.KeyDelegationsInsertOrIgnore(idx.conn, id, iss, del); err != nil { - return err - } - - return idx.SaveBlob(id, sb) -} - -func (bs *indexer) indexChange(idx *indexingCtx, id int64, c cid.Cid, v Change) error { - // TODO(burdiyan): ensure there's only one change that brings an entity into life. - - // Extracting author from the associated key delegation. - // TODO(burdiyan): need to improve this part, because it's kinda ugly. - // TODO(burdiyan): must verify the key delegation to make sure device really belongs to the account. - author, err := bs.getAuthorFromDelegation(idx, v.Delegation) - if err != nil { - return err - } - - // Validate semantic meaning of Create/Update changes. - { - isHMEntity := v.Entity.HasPrefix("hm://") - isAccountChange := v.Entity.HasPrefix("hm://a/") - switch { - // We want to ensure all changes to have action, but historically - // we've been creating Account-related changes without one, so we continue to allow that. - case isHMEntity && v.Action == "" && !isAccountChange: - return fmt.Errorf("non-account change %s must have an action specified", c) - - // Changes with create action must have correct unforgeable ID and fields in their patch to validate it. - case isHMEntity && v.Action == ActionCreate: - nonce, ok := v.Patch["nonce"].([]byte) - if !ok { - return fmt.Errorf("change that creates an entity must have a nonce to verify the ID") - } - - ct, ok := v.Patch["createTime"].(int) - if !ok { - return fmt.Errorf("change that creates an entity must have a createTime field in its patch") - } - - ownerField, ok := v.Patch["owner"].([]byte) - if !ok { - return fmt.Errorf("change that creates an entity must have an owner field in its patch") - } - - if !bytes.Equal(ownerField, author) { - return fmt.Errorf("owner field in the create change must correspond with the author of the change") - } - - if err := verifyUnforgeableID(v.Entity, idPrefixLen, ownerField, nonce, int64(ct)); err != nil { - return err - } - - // Changes that are updates must not have any fields for verifying unforgeable IDs, - // and they must have at least one dep. - case isHMEntity && v.Action == ActionUpdate: - if len(v.Deps) == 0 { - return fmt.Errorf("change with Update action must have at least one dep") - } - - if v.Patch["nonce"] != nil { - return fmt.Errorf("update change must not have nonce set") - } - - if v.Patch["owner"] != nil { - return fmt.Errorf("update change must not have owner field") - } - - if v.Patch["createTime"] != nil { - return fmt.Errorf("update change must not have createTime field") - } - } - } - - var sb structuralBlob - { - var resourceTime time.Time - if v.Action == ActionCreate { - resourceTime = v.HLCTime.Time() - } - sb = newStructuralBlob(c, string(TypeChange), author, v.HLCTime.Time(), IRI(v.Entity), author, resourceTime) - } - - // TODO(burdiyan): ensure deps are indexed, not just known. - // Although in practice deps must always be indexed first, but need to make sure. - for _, dep := range v.Deps { - if err := idx.AssertBlobData(dep); err != nil { - return fmt.Errorf("missing causal dependency %s of change %s", dep, c) - } - - sb.AddBlobLink("change/dep", dep) - } - - sb.AddBlobLink("change/auth", v.Delegation) - - // TODO(burdiyan): remove this when all the tests are fixed. Sometimes CBOR codec decodes into - // different types than what was encoded, and we might not have accounted for that during indexing. - // So we re-encode the patch here to make sure. - // This is of course very wasteful. - // EDIT: actually re-encoding is probably not a bad idea to enforce the canonical encoding, and hash correctness. - // But it would probably need to happen in some other layer, and more generalized. - { - data, err := cbornode.DumpObject(v.Patch) - if err != nil { - return err - } - v.Patch = nil - - if err := cbornode.DecodeInto(data, &v.Patch); err != nil { - return err - } - } - - // Indexing specific to various types of Entities - switch { - case v.Entity.HasPrefix("hm://a/"): - res, err := hypersql.EntitiesLookupRemovedRecord(idx.conn, sb.Resource.ID.String()) - if err == nil && res.DeletedResourcesIRI == sb.Resource.ID.String() { - return fmt.Errorf("Change belongs to a deleted account [%s]", res.DeletedResourcesIRI) - } - if v, ok := v.Patch["avatar"].(cid.Cid); ok { - sb.AddBlobLink("account/avatar", v) - } - - if alias, ok := v.Patch["alias"].(string); ok { - sb.Meta = alias - } - - if doc, ok := v.Patch["rootDocument"].(string); ok { - sb.AddResourceLink("account/root-document", IRI(doc), false, nil) - } - case v.Entity.HasPrefix("hm://d/"): - title, ok := v.Patch["title"].(string) - if ok { - sb.Meta = title - } - blocks, ok := v.Patch["blocks"].(map[string]any) - - res, err := hypersql.EntitiesLookupRemovedRecord(idx.conn, sb.Resource.ID.String()) - if err == nil && res.DeletedResourcesIRI == sb.Resource.ID.String() { - return fmt.Errorf("Change belongs to a deleted document [%s]", res.DeletedResourcesIRI) - } - - if ok { - for id, blk := range blocks { - v, ok := blk.(map[string]any)["#map"] - if !ok { - continue - } - // This is a very bad way to convert an opaque map into a block struct. - // TODO(burdiyan): we should do better than this. This is ugly as hell. - data, err := json.Marshal(v) - if err != nil { - return err - } - blk := &documents.Block{} - if err := protojson.Unmarshal(data, blk); err != nil { - return err - } - blk.Id = id - blk.Revision = c.String() - if err := indexURL(&sb, bs.log, blk.Id, "doc/"+blk.Type, blk.Ref); err != nil { - return err - } - - for _, ann := range blk.Annotations { - if err := indexURL(&sb, bs.log, blk.Id, "doc/"+ann.Type, ann.Ref); err != nil { - return err - } - } - } - } - - case v.Entity.HasPrefix("hm://g/"): - authorEntity := IRI("hm://a/" + author.String()) - title, ok := v.Patch["title"].(string) - if ok { - sb.Meta = title - } - var currentRole groups.Role - res, err := hypersql.EntitiesLookupRemovedRecord(idx.conn, sb.Resource.ID.String()) - if err == nil && res.DeletedResourcesIRI == sb.Resource.ID.String() { - return fmt.Errorf("Change belongs to a deleted group [%s]", res.DeletedResourcesIRI) - } - if v.Action == ActionCreate { - currentRole = groups.Role_OWNER - sb.AddResourceLink("group/member", authorEntity, false, GroupLinkMeta{Role: currentRole}) - } else { - // Only owner or members are allowed to make updates. - // TODO(burdiyan): need a better way to get resource owner. - rid, err := idx.ensureResource(IRI(v.Entity)) - if err != nil { - return err - } - - authorID, err := idx.ensurePubKey(author) - if err != nil { - return err - } - - owner, err := hypersql.ResourceGetOwner(idx.conn, rid) - if err != nil { - return err - } - - if owner == authorID { - currentRole = groups.Role_OWNER - } else { - role, err := hypersql.GetGroupRole(idx.conn, v.Entity.String(), authorEntity.String()) - if err != nil { - return err - } - currentRole = groups.Role(role) - } - } - - if currentRole != groups.Role_OWNER && currentRole != groups.Role_EDITOR { - return fmt.Errorf("only members can change groups: account %q is not a member of the group %q", author, v.Entity) - } - - // Check if some of the owner-only fields are touched by non-owners - if currentRole != groups.Role_OWNER { - if v.Patch["siteURL"] != nil { - return fmt.Errorf("only group owner can set siteURL") - } - - if v.Patch["members"] != nil { - return fmt.Errorf("only group owner can change members") - } - } - - // Validate site URL - if siteURL, ok := v.Patch["siteURL"].(string); ok { - u, err := url.Parse(siteURL) - if err != nil { - return fmt.Errorf("failed to parse site URL %s: %w", siteURL, err) - } - - if u.Scheme != "http" && u.Scheme != "https" { - return fmt.Errorf("site URL must have http or https scheme, got %s", siteURL) - } - - if siteURL != (&url.URL{Scheme: u.Scheme, Host: u.Host}).String() { - return fmt.Errorf("site URL must have only scheme and host, got %s", siteURL) - } - - if err := hypersql.SitesInsertOrIgnore(idx.conn, v.Entity.String(), siteURL, int64(v.HLCTime), OriginFromCID(c)); err != nil { - return err - } - } - // Index content links. - if content, ok := v.Patch["content"].(map[string]any); ok { - for path, v := range content { - rawURL, ok := v.(string) - if !ok { - bs.log.Warn("Group content value is not string", zap.Any("value", v), zap.String("path", path)) - continue - } - - if err := indexURL(&sb, bs.log, path, "group/content", rawURL); err != nil { - return err - } - } - } - - // Index member links. - if members, ok := v.Patch["members"].(map[string]any); ok { - for k, v := range members { - acc, err := core.DecodePrincipal(k) - if err != nil { - return fmt.Errorf("failed to parse group member as principal: %w", err) - } - - role, ok := v.(int) - if !ok { - return fmt.Errorf("member must have valid role") - } - - if role == int(groups.Role_ROLE_UNSPECIFIED) { - return fmt.Errorf("member role must be specified") - } - - if role == int(groups.Role_OWNER) { - return fmt.Errorf("owner role can't be used in updates") - } - - if _, _, err := idx.ensureAccount(acc); err != nil { - return err - } - - sb.AddResourceLink("group/member", IRI("hm://a/"+acc.String()), false, GroupLinkMeta{Role: groups.Role(role)}) - } - } - } - - return idx.SaveBlob(id, sb) -} - -func (bs *indexer) indexComment(idx *indexingCtx, id int64, c cid.Cid, v Comment) error { - if v.Target == "" { - return fmt.Errorf("comment must have a target") - } - - if !strings.HasPrefix(v.Target, "hm://") { - return fmt.Errorf("comment target must be a hypermedia resource, got %s", v.Target) - } - iri := strings.Split(v.Target, "?v=")[0] - res, err := hypersql.EntitiesLookupRemovedRecord(idx.conn, iri) - if err == nil && res.DeletedResourcesIRI == iri { - return fmt.Errorf("Comment references to a deleted entity [%s]", res.DeletedResourcesIRI) - } - - isReply := v.RepliedComment.Defined() || v.ThreadRoot.Defined() - - if isReply { - if !v.RepliedComment.Defined() || !v.ThreadRoot.Defined() { - return fmt.Errorf("replies must have both repliedComment and threadRoot set") - } - - blk, err := bs.bs.get(idx.conn, v.RepliedComment) - if err != nil { - return err - } - - replied, err := DecodeBlob(blk.Cid(), blk.RawData()) - if err != nil { - return fmt.Errorf("failed to decode replied comment %s: %w", v.RepliedComment, err) - } - - rc, ok := replied.Decoded.(Comment) - if !ok { - return fmt.Errorf("replied comment is not a comment, got %T", replied.Decoded) - } - - if v.HLCTime < rc.HLCTime { - return fmt.Errorf("reply must have a higher timestamp than the replied comment: failed to assert %s > %s", v.HLCTime, rc.HLCTime) - } - - repliedTarget, _, _ := strings.Cut(rc.Target, "?") - newReplyTarget, _, _ := strings.Cut(v.Target, "?") - - if newReplyTarget != repliedTarget { - return fmt.Errorf("reply target '%s' doesn't match replied comment's target '%s'", newReplyTarget, repliedTarget) - } - - // Replies to replies must share the thread root. - // Replies to top-level comments must have thread root equal to the top-level comment itself. - if rc.ThreadRoot.Defined() { - if !v.ThreadRoot.Equals(rc.ThreadRoot) { - return fmt.Errorf("reply to reply thread roots don't match: '%s' != '%s'", v.ThreadRoot, rc.ThreadRoot) - } - } else { - // TODO(burdiyan): this will not be true when we implement editing comments. - if !v.ThreadRoot.Equals(v.RepliedComment) { - return fmt.Errorf("reply to a top-level comment must have the same thread root as the replied comment: '%s' != '%s'", v.ThreadRoot, v.RepliedComment) - } - } - } - - author, err := bs.getAuthorFromDelegation(idx, v.Delegation) - if err != nil { - return err - } - - sb := newStructuralBlob(c, string(TypeComment), author, v.HLCTime.Time(), "", nil, time.Time{}) - - if err := indexURL(&sb, bs.log, "", "comment/target", v.Target); err != nil { - return err - } - - if v.ThreadRoot.Defined() { - sb.AddBlobLink("comment/thread-root", v.ThreadRoot) - } - - if v.RepliedComment.Defined() { - sb.AddBlobLink("comment/reply-to", v.RepliedComment) - } - - sb.AddBlobLink("comment/auth", v.Delegation) - - var indexCommentContent func([]CommentBlock) error // workaround to allow recursive closure calls. - indexCommentContent = func(in []CommentBlock) error { - for _, blk := range in { - if err := indexURL(&sb, bs.log, blk.ID, "comment/"+blk.Type, blk.Ref); err != nil { - return err - } - - for _, a := range blk.Annotations { - if err := indexURL(&sb, bs.log, blk.ID, "comment/"+a.Type, a.Ref); err != nil { - return err - } - } - - if err := indexCommentContent(blk.Children); err != nil { - return err - } - } - - return nil - } - - if err := indexCommentContent(v.Body); err != nil { - return err - } - - if err := idx.SaveBlob(id, sb); err != nil { - return fmt.Errorf("failed to index comment: %w", err) - } - - return nil -} - -func (bs *indexer) getAuthorFromDelegation(idx *indexingCtx, delegation cid.Cid) (core.Principal, error) { - // TODO(burdiyan): this is also quite stupid having to get it from the DB. - iss, err := hypersql.KeyDelegationsGetIssuer(idx.conn, delegation.Hash()) - if err != nil { - return nil, err - } - if iss.KeyDelegationsIssuer == 0 { - // Try to get the issuer from the actual blob. This can happen when we are reindexing all the blobs, - // and we happen to index a change before the key delegation. - - blk, err := bs.bs.get(idx.conn, delegation) - if err != nil { - return nil, err - } - - var del KeyDelegation - if err := cbornode.DecodeInto(blk.RawData(), &del); err != nil { - return nil, fmt.Errorf("failed to decode key delegation %s: %w", delegation, err) - } - - iss.KeyDelegationsIssuer, err = idx.ensurePubKey(del.Issuer) - if err != nil { - return nil, err - } - - if iss.KeyDelegationsIssuer == 0 { - return nil, fmt.Errorf("missing key delegation info %s", delegation) - } - } - - issuerDB, err := hypersql.PublicKeysLookupPrincipal(idx.conn, iss.KeyDelegationsIssuer) - if err != nil { - return nil, err - } - - author, err := core.DecodePrincipal(issuerDB.PublicKeysPrincipal) - if err != nil { - return nil, err - } - - if err := idx.AssertBlobData(delegation); err != nil { - return nil, err - } - - return author, nil -} - -func indexURL(sb *structuralBlob, log *zap.Logger, anchor, linkType, rawURL string) error { - if rawURL == "" { - return nil - } - - u, err := url.Parse(rawURL) - if err != nil { - log.Warn("FailedToParseURL", zap.String("url", rawURL), zap.Error(err)) - return nil - } - - switch { - case u.Scheme == "hm" && u.Host != "c": - uq := u.Query() - - linkMeta := DocLinkMeta{ - Anchor: anchor, - TargetFragment: u.Fragment, - TargetVersion: uq.Get("v"), - } - - target := IRI("hm://" + u.Host + u.Path) - - isLatest := uq.Has("l") || linkMeta.TargetVersion == "" - - sb.AddResourceLink(linkType, target, !isLatest, linkMeta) - - vblobs, err := Version(linkMeta.TargetVersion).Parse() - if err != nil { - return err - } - - for _, vcid := range vblobs { - sb.AddBlobLink(linkType, vcid) - } - case u.Scheme == "hm" && u.Host == "c": - c, err := cid.Decode(strings.TrimPrefix(u.Path, "/")) - if err != nil { - return fmt.Errorf("failed to parse comment CID %s: %w", rawURL, err) - } - - sb.AddBlobLink(linkType, c) - case u.Scheme == "ipfs": - c, err := cid.Decode(u.Hostname()) - if err != nil { - return fmt.Errorf("failed to parse IPFS URL %s: %w", rawURL, err) - } - - sb.AddBlobLink(linkType, c) - } - - return nil -} - -// GroupLinkMeta is a metadata for a group link. -type GroupLinkMeta struct { - Role groups.Role `json:"r"` -} - -// DocLinkMeta is a metadata for a document link. -type DocLinkMeta struct { - Anchor string `json:"a,omitempty"` - TargetFragment string `json:"f,omitempty"` - TargetVersion string `json:"v,omitempty"` -} - -func isIndexable[T multicodec.Code | cid.Cid](v T) bool { - var code multicodec.Code - - switch v := any(v).(type) { - case multicodec.Code: - code = v - case cid.Cid: - code = multicodec.Code(v.Prefix().Codec) - } - - return code == multicodec.DagCbor || code == multicodec.DagPb -} diff --git a/backend/hyper/indexing_structural_blob.go b/backend/hyper/indexing_structural_blob.go deleted file mode 100644 index 367f4b36..00000000 --- a/backend/hyper/indexing_structural_blob.go +++ /dev/null @@ -1,334 +0,0 @@ -package hyper - -import ( - "encoding/json" - "fmt" - "seed/backend/core" - "seed/backend/hyper/hypersql" - "seed/backend/ipfs" - "seed/backend/util/maybe" - "time" - - "crawshaw.io/sqlite" - "github.com/ipfs/go-cid" -) - -// IRI is an identifier of a Hypermedia resource. -// -// [iri]: https://en.wikipedia.org/wiki/Internationalized_Resource_Identifier -type IRI string - -func (i IRI) String() string { - return string(i) -} - -type structuralBlob struct { - ID cid.Cid - Type string - Author core.Principal - Time time.Time - Resource struct { - ID IRI - Owner core.Principal - CreateTime time.Time - } - BlobLinks []blobLink - ResourceLinks []resourceLink - Meta any -} - -func newStructuralBlob(id cid.Cid, blobType string, author core.Principal, ts time.Time, resource IRI, resourceOwner core.Principal, resourceTimestamp time.Time) structuralBlob { - sb := structuralBlob{ - ID: id, - Type: blobType, - Author: author, - Time: ts, - } - sb.Resource.ID = resource - sb.Resource.Owner = resourceOwner - sb.Resource.CreateTime = resourceTimestamp - - return sb -} - -func newSimpleStructuralBlob(id cid.Cid, blobType string) structuralBlob { - return structuralBlob{ID: id, Type: blobType} -} - -func (sb *structuralBlob) AddBlobLink(linkType string, target cid.Cid) { - sb.BlobLinks = append(sb.BlobLinks, blobLink{Type: linkType, Target: target}) -} - -func (sb *structuralBlob) AddResourceLink(linkType string, target IRI, isPinned bool, meta any) { - sb.ResourceLinks = append(sb.ResourceLinks, resourceLink{Type: linkType, Target: target, IsPinned: isPinned, Meta: meta}) -} - -type blobLink struct { - Type string - Target cid.Cid -} - -type resourceLink struct { - Type string - Target IRI - IsPinned bool - Meta any -} - -type indexingCtx struct { - conn *sqlite.Conn - - // Lookup tables for internal database IDs. - pubKeys map[string]int64 - resources map[IRI]int64 - blobs map[cid.Cid]int64 -} - -func newCtx(conn *sqlite.Conn) *indexingCtx { - return &indexingCtx{ - conn: conn, - // Setting arbitrary size for maps, to avoid dynamic resizing in most cases. - pubKeys: make(map[string]int64, 16), - resources: make(map[IRI]int64, 16), - blobs: make(map[cid.Cid]int64, 16), - } -} - -func (idx *indexingCtx) SaveBlob(id int64, b structuralBlob) error { - var ( - blobAuthor maybe.Value[int64] - blobResource maybe.Value[int64] - blobTime maybe.Value[int64] - title maybe.Value[[]byte] - ) - - if b.Author != nil { - _, kid, err := idx.ensureAccount(b.Author) - if err != nil { - return err - } - blobAuthor = maybe.New(kid) - } - - if b.Resource.ID != "" { - rid, err := idx.ensureResource(b.Resource.ID) - if err != nil { - return err - } - blobResource = maybe.New(rid) - - if err := idx.ensureResourceMetadata(b.Resource.ID, b.Resource.Owner, b.Resource.CreateTime); err != nil { - return err - } - } - - if b.Resource.ID != "" { - rid, err := idx.ensureResource(b.Resource.ID) - if err != nil { - return err - } - blobResource = maybe.New(rid) - - if err := idx.ensureResourceMetadata(b.Resource.ID, b.Resource.Owner, b.Resource.CreateTime); err != nil { - return err - } - } - - if b.Meta != nil { - data, err := json.Marshal(b.Meta) - if err != nil { - return err - } - title = maybe.New(data) - } - - if !b.Time.IsZero() { - // For changes we need microsecond timestamp, so we use it for all the blobs. - blobTime = maybe.New(b.Time.UnixMicro()) - } - - if err := hypersql.StructuralBlobsInsert(idx.conn, id, b.Type, blobAuthor, blobResource, blobTime, title); err != nil { - return err - } - - for _, link := range b.BlobLinks { - tgt, err := idx.ensureBlob(link.Target) - if err != nil { - return fmt.Errorf("failed to ensure link target blob %s: %w", link.Target, err) - } - if err := hypersql.BlobLinksInsertOrIgnore(idx.conn, id, link.Type, tgt); err != nil { - return fmt.Errorf("failed to insert blob link: %w", err) - } - } - - for _, link := range b.ResourceLinks { - tgt, err := idx.ensureResource(link.Target) - if err != nil { - return fmt.Errorf("failed to ensure resource %s: %w", link.Target, err) - } - - meta, err := json.Marshal(link.Meta) - if err != nil { - return fmt.Errorf("failed to encode resource link metadata as json: %w", err) - } - - if err := hypersql.ResourceLinksInsert(idx.conn, id, tgt, link.Type, link.IsPinned, meta); err != nil { - return fmt.Errorf("failed to insert resource link: %w", err) - } - } - - return nil -} - -func (idx *indexingCtx) AssertBlobData(c cid.Cid) (err error) { - delid, err := hypersql.BlobsGetSize(idx.conn, c.Hash()) - if err != nil { - return err - } - if delid.BlobsID == 0 { - return fmt.Errorf("blob %q not found", c) - } - - if delid.BlobsSize < 0 { - return fmt.Errorf("blob %q is known, but has no data", c) - } - - return nil -} - -func (idx *indexingCtx) ensureAccount(key core.Principal) (aid, kid int64, err error) { - kid, err = idx.ensurePubKey(key) - if err != nil { - return 0, 0, err - } - - accountResource := IRI("hm://a/" + key.String()) - - aid, err = idx.ensureResource(accountResource) - if err != nil { - return 0, 0, err - } - - if err := idx.ensureResourceMetadata(accountResource, key, time.Time{}); err != nil { - return 0, 0, err - } - - return aid, kid, nil -} - -func (idx *indexingCtx) ensurePubKey(key core.Principal) (int64, error) { - if id, ok := idx.pubKeys[key.UnsafeString()]; ok { - return id, nil - } - - res, err := hypersql.PublicKeysLookupID(idx.conn, key) - if err != nil { - return 0, err - } - - var id int64 - if res.PublicKeysID > 0 { - id = res.PublicKeysID - } else { - ins, err := hypersql.PublicKeysInsert(idx.conn, key) - if err != nil { - return 0, err - } - - if ins.PublicKeysID <= 0 { - panic("BUG: failed to insert key for some reason") - } - - id = ins.PublicKeysID - } - - idx.pubKeys[key.UnsafeString()] = id - return id, nil -} - -func (idx *indexingCtx) ensureBlob(c cid.Cid) (int64, error) { - if id, ok := idx.blobs[c]; ok { - return id, nil - } - - codec, hash := ipfs.DecodeCID(c) - - size, err := hypersql.BlobsGetSize(idx.conn, hash) - if err != nil { - return 0, err - } - - var id int64 - if size.BlobsID != 0 { - id = size.BlobsID - } else { - ins, err := hypersql.BlobsInsert(idx.conn, 0, hash, int64(codec), nil, -1) - if err != nil { - return 0, err - } - if ins.BlobsID == 0 { - return 0, fmt.Errorf("failed to ensure blob %s after insert", c) - } - id = ins.BlobsID - } - - idx.blobs[c] = id - return id, nil -} - -func (idx *indexingCtx) ensureResource(r IRI) (int64, error) { - if id, ok := idx.resources[r]; ok { - return id, nil - } - - res, err := hypersql.EntitiesLookupID(idx.conn, string(r)) - if err != nil { - return 0, err - } - - var id int64 - if res.ResourcesID > 0 { - id = res.ResourcesID - } else { - ins, err := hypersql.EntitiesInsertOrIgnore(idx.conn, string(r)) - if err != nil { - return 0, err - } - - if ins.EntitiesID <= 0 { - panic("BUG: failed to insert resource for some reason") - } - - id = ins.EntitiesID - } - - idx.resources[r] = id - return id, nil -} - -func (idx *indexingCtx) ensureResourceMetadata(r IRI, owner core.Principal, createTime time.Time) error { - id, err := idx.ensureResource(r) - if err != nil { - return err - } - - if owner != nil { - oid, err := idx.ensurePubKey(owner) - if err != nil { - return err - } - - if _, err := hypersql.ResourcesMaybeSetOwner(idx.conn, id, oid); err != nil { - return err - } - } - - if !createTime.IsZero() { - // We don't need microsecond precision for create time in resources. It's mostly here for convenience anyway. - if _, err := hypersql.ResourcesMaybeSetTimestamp(idx.conn, id, createTime.Unix()); err != nil { - return err - } - } - - return nil -} diff --git a/backend/hyper/terra.cue b/backend/hyper/terra.cue deleted file mode 100644 index f628a12f..00000000 --- a/backend/hyper/terra.cue +++ /dev/null @@ -1,72 +0,0 @@ -// This is an experiment for definiting our blob types in a more formal, -// portable way using the Cue language: https://cuelang.org. -// Work in progress. You probably should just ignore this file. - -package terra - -// Principal is a binary representation of a public key -// specifying the type of the key. See more in `backend/core/principal.go`. -#Principal: bytes - -// TimeUnix is a Unix timestamp in seconds. -// Even though normally they can be negative to indicate a timestamp before the beginning of the Unix epoch, -// in our system it wouldn't make sense, because our app was released around 2023. -#TimeUnix: int64 & >0 - -// TimeMicro is a Unix timestamp in microseconds. -// The higher granularity is useful for CRDT conflict resolution. -// This is in fact a Hybrid-Logical Clock (HLC) timestamp. -#TimeMicro: int64 & >0 - -// Signature is a raw bytes of a asymmetric cryptographic signature. -#Signature: bytes - -// CID is a binary representation of an IPFS content identifier. -#CID: bytes - -// IRI is a Hypermedia resource identifier. -// https://en.wikipedia.org/wiki/Internationalized_Resource_Identifier. -#IRI: string - -// SignedBlob is any struct that has a cryptographic signature. -// Our way of signing blobs is not the most efficient, but it's simple and easy to implement in most programming languages. -// It requires serializing the structure twice, once without the signature field, to obtain the bytes to sign, and then -// with the signature field itself to complete the entire structure. -#SignedBlob: { - ... - sig: #Signature -} - -// TypedBlob is a struct that has a type specified. -// All the blobs we create have this type, so they are easy to identify and process. -#TypedBlob: { - "@type": string - ... -} - -// BaseBlob represents the base structure of all the blobs we create. -// They are all signed and typed. -#BaseBlob: #TypedBlob & #SignedBlob - -// KeyDelegation is a blob that allows one public key (delegate) act on behalf of another key (issuer). -#KeyDelegation: #BaseBlob & { - "@type": "KeyDelegation" - issuer: #Principal - delegate: #Principal - issueTime: #TimeUnix - purpose: "DeviceRegistration" -} - -// BaseChange is a common structure of all the Change blobs we produce. -// Changes describe mutations of our CRDT Entities. -#BaseChange: #BaseBlob & { - "@type": "Change" - deps: [#CID] - delegation: #CID - action: "Create" | "Update" - message?: string - hlcTime: #TimeMicro - entity: #IRI - patch: {} - signer: #Principal -} diff --git a/backend/hyper/terra.go b/backend/hyper/terra.go deleted file mode 100644 index 09f03d5a..00000000 --- a/backend/hyper/terra.go +++ /dev/null @@ -1,312 +0,0 @@ -package hyper - -import ( - "fmt" - "seed/backend/core" - "seed/backend/hlc" - "time" - - "github.com/ipfs/go-cid" - cbornode "github.com/ipfs/go-ipld-cbor" - "github.com/polydawn/refmt/obj/atlas" -) - -var timeAtlas = atlas.BuildEntry(time.Time{}).Transform(). - TransformMarshal(atlas.MakeMarshalTransformFunc(func(t time.Time) (int64, error) { - return t.Unix(), nil - })). - TransformUnmarshal(atlas.MakeUnmarshalTransformFunc(func(in int64) (time.Time, error) { - return time.Unix(in, 0), nil - })). - Complete() - -func init() { - cbornode.RegisterCborType(timeAtlas) - cbornode.RegisterCborType(KeyDelegation{}) - cbornode.RegisterCborType(Change{}) - cbornode.RegisterCborType(Comment{}) - cbornode.RegisterCborType(Block{}) - cbornode.RegisterCborType(Annotation{}) - cbornode.RegisterCborType(CommentBlock{}) -} - -// Available types. -const ( - TypeKeyDelegation BlobType = "KeyDelegation" - TypeChange BlobType = "Change" - TypeDagPB BlobType = "DagPB" - TypeComment BlobType = "Comment" -) - -// Delegation purposes. -// The delegation object doesn't make distinction between key types, -// so by just looking at it we can't know whether the issuer is actually an account. -// This should give us more flexibility for the future, if we want to delegate -// keys from accounts to accounts, or from devices to other devices. -// So to distinguish between delegation types there's a field to specify the purpose. -const ( - DelegationPurposeRegistration = "DeviceRegistration" // assumes issuer is an account and delegate is a device. -) - -// KeyDelegation is a signed payload which allows one key pair (delegate) -// to participate in the network on behalf of another key (issuer). -// The delegation is signed by the issuer, and used by the delegate. -// The field names `aud`, `iss`, `nbf` are borrowed from the JWT specification. -type KeyDelegation struct { - Type BlobType `refmt:"@type"` - Issuer core.Principal `refmt:"issuer"` - Delegate core.Principal `refmt:"delegate"` - IssueTime time.Time `refmt:"issueTime"` - Purpose string `refmt:"purpose"` - Signature core.Signature `refmt:"sig,omitempty"` // omitempty for signing. -} - -// NewKeyDelegation creates a new signed key delegation from one key to another. -func NewKeyDelegation(issuer core.KeyPair, delegate core.PublicKey, validFrom time.Time) (kd KeyDelegation, err error) { - if validFrom.IsZero() { - return kd, fmt.Errorf("must specify valid from timestamp") - } - - d := KeyDelegation{ - Type: TypeKeyDelegation, - Issuer: issuer.Principal(), - Delegate: delegate.Principal(), - Purpose: "DeviceRegistration", - IssueTime: validFrom, - } - - data, err := cbornode.DumpObject(d) - if err != nil { - return kd, fmt.Errorf("failed to encode signing bytes for key delegation: %w", err) - } - - sig, err := issuer.Sign(data) - if err != nil { - return kd, fmt.Errorf("failed to sign key delegation %w", err) - } - - d.Signature = sig - - return d, nil -} - -// Verify signature of the delegation. -func (kd KeyDelegation) Verify() error { - sig := kd.Signature - kd.Signature = nil - - data, err := cbornode.DumpObject(kd) - if err != nil { - return fmt.Errorf("failed to encoding signing bytes to verify key delegation: %w", err) - } - - if err := kd.Issuer.Verify(data, sig); err != nil { - return err - } - - return nil -} - -// Blob encodes the delegation into a blob. -func (kd KeyDelegation) Blob() Blob { - hb, err := EncodeBlob(kd) - if err != nil { - panic(err) - } - return hb -} - -// Actions for entity changes. -const ( - ActionCreate = "Create" - ActionUpdate = "Update" -) - -// Change for a Seed mutable Entity. -type Change struct { - // Type is always the same (see constants). - Type BlobType `refmt:"@type"` - - // Deps is a list of dependency patches. - Deps []cid.Cid `refmt:"deps,omitempty"` - - // Delegation points to the blob where we can get the Account ID - // on which behalf this blob is signed. - Delegation cid.Cid `refmt:"delegation"` // points to the delegation where we can get the account id - - // Action is an option machine-readable description of an action that Change describes. - Action string `refmt:"action,omitempty"` - - // Message is an optional human readable message. - Message string `refmt:"message,omitempty"` - - // HLCTime is the Hybrid-Logical timestamp. - // Must be greater than the one of any of the deps. - // Can be used as a Unix timestamp in *microseconds*. - HLCTime hlc.Timestamp `refmt:"hlcTime"` - - // Entity is an arbitrary string describing the entity to mutate. - // Meant to be globally unique, and should include the type of the entity if relevant. - // Using an IRI[iri] might be a good option. - // - // [iri]: https://en.wikipedia.org/wiki/Internationalized_Resource_Identifier - Entity EntityID `refmt:"entity"` - - // Patch is the body of our Merge Patch CRDT. - // TODO(burdiyan): point to docs. - Patch map[string]any `refmt:"patch,omitempty"` - - // Signer is the public key of the signer. - Signer core.Principal `refmt:"signer,omitempty"` - - // Sig is the signature over the rest of the fields. - Sig core.Signature `refmt:"sig,omitempty"` -} - -// NewChange creates a new Change blob. -func NewChange(eid EntityID, deps []cid.Cid, ts hlc.Timestamp, signer core.KeyPair, delegation cid.Cid, patch map[string]any, opts ...ChangeOption) (hb Blob, err error) { - // Make sure deps field is not present in the patch if there're no deps. - if len(deps) == 0 { - deps = nil - } - - if len(patch) == 0 && len(deps) == 1 { - return hb, fmt.Errorf("new changes must have a patch: nothing to update") - } - - SortCIDs(deps) - - ch := Change{ - Type: TypeChange, - Entity: eid, - Deps: deps, - Delegation: delegation, - HLCTime: ts, - Patch: patch, - Signer: signer.Principal(), - } - for _, o := range opts { - o(&ch) - } - - sigdata, err := cbornode.DumpObject(ch) - if err != nil { - return hb, fmt.Errorf("failed to encode signing bytes for change %w", err) - } - - ch.Sig, err = signer.Sign(sigdata) - if err != nil { - return hb, fmt.Errorf("failed to sign change: %w", err) - } - - hb, err = EncodeBlob(ch) - if err != nil { - return hb, err - } - - return hb, nil -} - -// Verify change signature. -func (ch Change) Verify() error { - sig := ch.Sig - ch.Sig = nil - - data, err := cbornode.DumpObject(ch) - if err != nil { - return fmt.Errorf("failed to encoding signing bytes to verify change blob: %w", err) - } - - if err := ch.Signer.Verify(data, sig); err != nil { - return err - } - - return nil -} - -// Comment is a signed blob representing a comment or a reply. -type Comment struct { - Type BlobType `refmt:"@type"` - Delegation cid.Cid `refmt:"delegation"` - Target string `refmt:"target,omitempty"` - ThreadRoot cid.Cid `refmt:"threadRoot,omitempty"` - RepliedComment cid.Cid `refmt:"repliedComment,omitempty"` - HLCTime hlc.Timestamp `refmt:"hlcTime"` - Body []CommentBlock `refmt:"body"` - Signer core.Principal `refmt:"signer,omitempty"` - Sig core.Signature `refmt:"sig,omitempty"` -} - -// NewComment creates a new Comment blob. -func NewComment(target string, threadRoot, repliedComment cid.Cid, ts hlc.Timestamp, signer core.KeyPair, delegation cid.Cid, body []CommentBlock) (hb Blob, err error) { - c := Comment{ - Type: TypeComment, - Delegation: delegation, - Target: target, - ThreadRoot: threadRoot, - RepliedComment: repliedComment, - HLCTime: ts, - Body: body, - Signer: signer.Principal(), - } - - sigdata, err := cbornode.DumpObject(c) - if err != nil { - return hb, fmt.Errorf("failed to encode signing bytes for comment %w", err) - } - - c.Sig, err = signer.Sign(sigdata) - if err != nil { - return hb, fmt.Errorf("failed to sign change: %w", err) - } - - hb, err = EncodeBlob(c) - if err != nil { - return hb, err - } - - return hb, nil -} - -// Verify comment signature. -func (c Comment) Verify() error { - sig := c.Sig - c.Sig = nil - - data, err := cbornode.DumpObject(c) - if err != nil { - return fmt.Errorf("failed to encoding signing bytes to verify comment blob: %w", err) - } - - if err := c.Signer.Verify(data, sig); err != nil { - return err - } - - return nil -} - -// Block is a block of text with annotations. -type Block struct { - ID string `refmt:"id,omitempty"` // Omitempty when used in Documents. - Type string `refmt:"type,omitempty"` - Text string `refmt:"text,omitempty"` - Ref string `refmt:"ref,omitempty"` - Attributes map[string]string `refmt:"attributes,omitempty"` - Annotations []Annotation `refmt:"annotations,omitempty"` -} - -// Annotation is a range of text that has a type and attributes. -type Annotation struct { - Type string `refmt:"type"` - Ref string `refmt:"ref,omitempty"` - Attributes map[string]string `refmt:"attributes,omitempty"` - Starts []int32 `refmt:"starts,omitempty"` - Ends []int32 `refmt:"ends,omitempty"` -} - -// CommentBlock is a block of text with annotations. -type CommentBlock struct { - Block - - Children []CommentBlock -}