From 2aabe8cbbe81e15386c7d1b7f0ec6d724bf2ea0f Mon Sep 17 00:00:00 2001 From: Alexandr Burdiyan Date: Thu, 11 Apr 2024 21:03:23 +0200 Subject: [PATCH] fix(backend): fix publication list sorting --- .../daemon/api/documents/v1alpha/documents.go | 408 ++++++++---------- .../api/documents/v1alpha/documents_test.go | 9 +- .../apiutil/{page_token.go => pagination.go} | 19 + backend/daemon/daemon_e2e_test.go | 6 +- frontend/packages/app/models/documents.ts | 9 +- 5 files changed, 208 insertions(+), 243 deletions(-) rename backend/daemon/apiutil/{page_token.go => pagination.go} (75%) diff --git a/backend/daemon/api/documents/v1alpha/documents.go b/backend/daemon/api/documents/v1alpha/documents.go index ba018e084..ea19dc399 100644 --- a/backend/daemon/api/documents/v1alpha/documents.go +++ b/backend/daemon/api/documents/v1alpha/documents.go @@ -4,7 +4,6 @@ package documents import ( "bytes" "context" - "encoding/base64" "encoding/hex" "fmt" "math" @@ -16,7 +15,6 @@ import ( groups_proto "mintter/backend/genproto/groups/v1alpha" "mintter/backend/hlc" "mintter/backend/mttnet" - "strconv" "strings" "time" @@ -268,10 +266,8 @@ func (api *Server) ListDrafts(ctx context.Context, req *documents.ListDraftsRequ return nil, err } - resp := &documents.ListDraftsResponse{} - - if req.PageSize == 0 { - req.PageSize = 30 + if err := apiutil.ValidatePageSize(&req.PageSize); err != nil { + return nil, err } type Cursor struct { @@ -289,10 +285,13 @@ func (api *Server) ListDrafts(ctx context.Context, req *documents.ListDraftsRequ } } - var lastCursor Cursor + resp := &documents.ListDraftsResponse{} if err := api.db.WithSave(ctx, func(conn *sqlite.Conn) error { - var count int32 + var ( + count int32 + lastCursor Cursor + ) return sqlitex.Exec(conn, qListAllDrafts(), func(stmt *sqlite.Stmt) error { // This is necessary to always return empty page token when we reach the last result. if count == req.PageSize { @@ -312,6 +311,9 @@ func (api *Server) ListDrafts(ctx context.Context, req *documents.ListDraftsRequ cursorResource = stmt.ColumnInt64(6) ) + lastCursor.Resource = cursorResource + lastCursor.Ts = updateTime + for i, x := range editors { data, err := hex.DecodeString(x) if err != nil { @@ -321,9 +323,6 @@ func (api *Server) ListDrafts(ctx context.Context, req *documents.ListDraftsRequ editors[i] = core.Principal(data).String() } - lastCursor.Resource = cursorResource - lastCursor.Ts = updateTime - doc := &documents.Document{ Id: iri, Title: meta, @@ -374,6 +373,7 @@ var qListAllDrafts = dqb.Str(` MAX(cset.ts) AS update_time, authors.principal AS author, GROUP_CONCAT(DISTINCT HEX(public_keys.principal)) AS editors, + -- CRDT conflict resolution: greatest timestamp wins, greatest public key is a tie-breaker. (JSONB_GROUP_ARRAY(cset.meta ORDER BY ts DESC, public_keys.principal DESC) FILTER (WHERE cset.meta IS NOT NULL))->>'0' AS meta, MAX(resources.id) AS cursor_resource FROM cset @@ -717,249 +717,195 @@ func (api *Server) PushPublication(ctx context.Context, in *documents.PushPublic } var qListAllPublications = dqb.Str(` - WITH RECURSIVE resource_authors AS ( - SELECT - r.iri, - r.create_time, - r.owner, - mv.meta, - pk.principal AS author_raw, - sb.ts, - sb.id AS blob_id - FROM - resources r - JOIN structural_blobs sb ON r.id = sb.resource - JOIN public_keys pk ON sb.author = pk.id - JOIN meta_view mv ON r.iri = mv.iri - WHERE - sb.author IS NOT NULL - AND r.iri GLOB :pattern - AND sb.id not in (SELECT distinct blob from drafts) - UNION ALL - SELECT - ra.iri, - ra.create_time, - ra.owner, - sb.meta, - pk.principal, - sb.ts, - sb.id - FROM - resource_authors ra - JOIN structural_blobs sb ON ra.iri = sb.resource - JOIN public_keys pk ON sb.author = pk.id - WHERE - sb.author IS NOT NULL - AND ra.iri GLOB :pattern - ), - owners_raw AS ( - SELECT - id, - principal AS owner_raw - FROM - public_keys - ), - latest_blobs AS ( + WITH RECURSIVE + -- Selecting owner's changes for each document, and resolving their dependencies. + cset (blob, author, ts, resource, meta, iri, create_time, owner) AS ( + -- Selecting owner's changes that are not drafts. + SELECT + sb.id, + sb.author, + sb.ts, + sb.resource, + sb.meta, + resources.iri, + resources.create_time, + resources.owner + FROM resources + JOIN structural_blobs sb ON sb.resource = resources.id AND resources.owner = sb.author + LEFT JOIN drafts ON drafts.blob = sb.id + WHERE resources.iri GLOB 'hm://d/*' + AND drafts.blob IS NULL + UNION + -- Resolving the dependencies. + SELECT + sb.id, + sb.author, + sb.ts, + sb.resource, + sb.meta, + cset.iri, + cset.create_time, + cset.owner + FROM blob_links + JOIN cset ON cset.blob = blob_links.source AND blob_links.type = 'change/dep' + JOIN structural_blobs sb ON sb.id = blob_links.target AND sb.resource = cset.resource + ) + -- Processing the changes grouping by resource. SELECT - ra.iri, - MAX(ra.ts) AS latest_ts, - b.multihash, - b.codec - FROM - resource_authors ra - JOIN blobs b ON ra.blob_id = b.id - GROUP BY ra.iri - ) - SELECT - ra.iri, - ra.create_time, - GROUP_CONCAT(DISTINCT HEX(ra.author_raw)) AS authors_hex, - ra.meta, - MAX(ra.ts) AS latest_ts, - HEX(oraw.owner_raw), - lb.multihash AS latest_multihash, - lb.codec AS latest_codec, - ra.blob_id - FROM - resource_authors ra - LEFT JOIN owners_raw oraw ON ra.owner = oraw.id - LEFT JOIN latest_blobs lb ON ra.iri = lb.iri - WHERE ra.blob_id <= :idx - GROUP BY - ra.iri, ra.create_time, ra.meta - ORDER BY ra.blob_id asc LIMIT :page_size; + cset.iri AS iri, + cset.create_time AS create_time, + MAX(cset.ts) AS update_time, + owners.principal AS author, + GROUP_CONCAT(DISTINCT HEX(editors.principal)) AS editors, + -- CRDT conflict resolution: greatest timestamp wins, greatest public key is a tie-breaker. + (JSONB_GROUP_ARRAY(cset.meta ORDER BY cset.ts DESC, editors.principal DESC) FILTER (WHERE cset.meta IS NOT NULL))->>'0' AS meta + FROM cset + JOIN public_keys owners ON owners.id = cset.owner + JOIN public_keys editors ON editors.id = cset.author + GROUP BY resource HAVING (update_time < :cursor_update_time AND iri < :cursor_iri) + ORDER BY update_time DESC, iri DESC + LIMIT :page_size + 1; `) var qListTrustedPublications = dqb.Str(` - WITH RECURSIVE resource_authors AS ( - SELECT - r.iri, - r.create_time, - r.owner, - mv.meta, - pk.principal AS author_raw, - sb.ts, - sb.id AS blob_id - FROM - resources r - JOIN structural_blobs sb ON r.id = sb.resource - JOIN public_keys pk ON sb.author = pk.id - JOIN meta_view mv ON r.iri = mv.iri - JOIN trusted_accounts ON trusted_accounts.id = r.owner - WHERE - sb.author IS NOT NULL - AND r.iri GLOB :pattern - AND r.id not in (SELECT resource from drafts) - UNION ALL - SELECT - ra.iri, - ra.create_time, - ra.owner, - sb.meta, - pk.principal, - sb.ts, - sb.id - FROM - resource_authors ra - JOIN structural_blobs sb ON ra.iri = sb.resource - JOIN public_keys pk ON sb.author = pk.id - WHERE - sb.author IS NOT NULL - AND ra.iri GLOB :pattern - ), - owners_raw AS ( - SELECT - id, - principal AS owner_raw - FROM - public_keys - ), - latest_blobs AS ( + WITH RECURSIVE + -- Selecting owner's changes for each document, and resolving their dependencies. + cset (blob, author, ts, resource, meta, iri, create_time, owner) AS ( + -- Selecting owner's changes that are not drafts. + SELECT + sb.id, + sb.author, + sb.ts, + sb.resource, + sb.meta, + resources.iri, + resources.create_time, + resources.owner + FROM resources + JOIN trusted_accounts ON trusted_accounts.id = resources.owner + JOIN structural_blobs sb ON sb.resource = resources.id AND resources.owner = sb.author + LEFT JOIN drafts ON drafts.blob = sb.id + WHERE resources.iri GLOB 'hm://d/*' + AND drafts.blob IS NULL + UNION + -- Resolving the dependencies. + SELECT + sb.id, + sb.author, + sb.ts, + sb.resource, + sb.meta, + cset.iri, + cset.create_time, + cset.owner + FROM blob_links + JOIN cset ON cset.blob = blob_links.source AND blob_links.type = 'change/dep' + JOIN structural_blobs sb ON sb.id = blob_links.target AND sb.resource = cset.resource + ) + -- Processing the changes grouping by resource. SELECT - ra.iri, - MAX(ra.ts) AS latest_ts, - b.multihash, - b.codec - FROM - resource_authors ra - JOIN blobs b ON ra.blob_id = b.id - GROUP BY ra.iri - ) - SELECT - ra.iri, - ra.create_time, - GROUP_CONCAT(DISTINCT HEX(ra.author_raw)) AS authors_hex, - ra.meta, - MAX(ra.ts) AS latest_ts, - HEX(oraw.owner_raw), - lb.multihash AS latest_multihash, - lb.codec AS latest_codec, - ra.blob_id - FROM - resource_authors ra - LEFT JOIN owners_raw oraw ON ra.owner = oraw.id - LEFT JOIN latest_blobs lb ON ra.iri = lb.iri - WHERE ra.blob_id <= :idx - GROUP BY - ra.iri, ra.create_time, ra.meta - ORDER BY ra.blob_id asc LIMIT :page_size; + cset.iri AS iri, + cset.create_time AS create_time, + MAX(cset.ts) AS update_time, + owners.principal AS author, + GROUP_CONCAT(DISTINCT HEX(editors.principal)) AS editors, + -- CRDT conflict resolution: greatest timestamp wins, greatest public key is a tie-breaker. + (JSONB_GROUP_ARRAY(cset.meta ORDER BY cset.ts DESC, editors.principal DESC) FILTER (WHERE cset.meta IS NOT NULL))->>'0' AS meta + FROM cset + JOIN public_keys owners ON owners.id = cset.owner + JOIN public_keys editors ON editors.id = cset.author + GROUP BY resource HAVING (update_time < :cursor_update_time AND iri < :cursor_iri) + ORDER BY update_time DESC, iri DESC + LIMIT :page_size + 1; `) // ListPublications implements the corresponding gRPC method. func (api *Server) ListPublications(ctx context.Context, in *documents.ListPublicationsRequest) (*documents.ListPublicationsResponse, error) { - var ( - entities []hyper.EntityID - err error - ) me, ok := api.me.Get() if !ok { return nil, fmt.Errorf("account is not initialized yet") } - conn, cancel, err := api.db.Conn(ctx) - if err != nil { - return nil, fmt.Errorf("Can't get a connection from the db: %w", err) - } - defer cancel() - resp := &documents.ListPublicationsResponse{ - Publications: make([]*documents.Publication, 0, len(entities)), + + if err := apiutil.ValidatePageSize(&in.PageSize); err != nil { + return nil, err } - var cursorBlobID int64 = math.MaxInt32 - if in.PageSize == 0 { - in.PageSize = 30 + + type Cursor struct { + UpdateTime int64 `json:"u"` + IRI string `json:"i"` } - if in.PageToken != "" { - pageTokenBytes, _ := base64.StdEncoding.DecodeString(in.PageToken) - if err != nil { - return nil, fmt.Errorf("Token encoding not valid: %w", err) - } - clearPageToken, err := me.DeviceKey().Decrypt(pageTokenBytes) - if err != nil { - return nil, fmt.Errorf("Token not valid: %w", err) - } - pageToken, err := strconv.ParseUint(string(clearPageToken), 10, 32) - if err != nil { - return nil, fmt.Errorf("Token not valid: %w", err) + + var cursor Cursor + if in.PageToken == "" { + cursor.UpdateTime = math.MaxInt64 + cursor.IRI = string([]rune{0xFFFF}) // Max string. + } else { + if err := apiutil.DecodePageToken(in.PageToken, &cursor, me.DeviceKey()); err != nil { + return nil, status.Errorf(codes.InvalidArgument, "%v", err) } - cursorBlobID = int64(pageToken) } - pattern := "hm://d/*" - query := qListAllPublications - if in.TrustedOnly { - query = qListTrustedPublications - } - var lastBlobID int64 - err = sqlitex.Exec(conn, query(), func(stmt *sqlite.Stmt) error { + + resp := &documents.ListPublicationsResponse{} + + if err := api.db.WithSave(ctx, func(conn *sqlite.Conn) error { var ( - id = stmt.ColumnText(0) - createTime = stmt.ColumnInt64(1) - editorsStr = stmt.ColumnText(2) - title = stmt.ColumnText(3) - updatedTime = stmt.ColumnInt64(4) - ownerHex = stmt.ColumnText(5) - mhash = stmt.ColumnBytes(6) - codec = stmt.ColumnInt64(7) + count int32 + lastCursor Cursor ) - lastBlobID = stmt.ColumnInt64(7) - editors := []string{} - for _, editorHex := range strings.Split(editorsStr, ",") { - editorBin, err := hex.DecodeString(editorHex) - if err != nil { + + q := qListAllPublications + if in.TrustedOnly { + q = qListTrustedPublications + } + + return sqlitex.Exec(conn, q(), func(stmt *sqlite.Stmt) error { + if count == in.PageSize { + var err error + resp.NextPageToken, err = apiutil.EncodePageToken(lastCursor, me.DeviceKey()) return err } - editors = append(editors, core.Principal(editorBin).String()) - } - ownerBin, err := hex.DecodeString(ownerHex) - if err != nil { - return err - } - version := cid.NewCidV1(uint64(codec), mhash) - pub := &documents.Publication{ - Version: version.String(), - Document: &documents.Document{ - Id: id, - Title: title, - Author: core.Principal(ownerBin).String(), - Editors: editors, - Children: []*documents.BlockNode{}, - - CreateTime: timestamppb.New(time.Unix(int64(createTime), 0)), - UpdateTime: timestamppb.New(time.Unix(int64(updatedTime/1000000), (updatedTime%1000000)*1000)), - PublishTime: timestamppb.New(time.Unix(int64(updatedTime/1000000), (updatedTime%1000000)*1000)), - }, - } - resp.Publications = append(resp.Publications, pub) - return nil - }, pattern, cursorBlobID, in.PageSize) - if err != nil { - return nil, err - } - pageToken, err := me.DeviceKey().Encrypt([]byte(strconv.Itoa(int(lastBlobID - 1)))) - if err != nil { + count++ + + var ( + iri = stmt.ColumnText(0) + createTime = stmt.ColumnInt64(1) + updateTime = stmt.ColumnInt64(2) + author = stmt.ColumnBytesUnsafe(3) + editors = strings.Split(stmt.ColumnText(4), ",") + meta = stmt.ColumnText(5) + ) + + lastCursor.UpdateTime = updateTime + lastCursor.IRI = iri + + for i, x := range editors { + data, err := hex.DecodeString(x) + if err != nil { + return fmt.Errorf("failed to decode editor: %w", err) + } + editors[i] = core.Principal(data).String() + } + + pub := &documents.Publication{ + Document: &documents.Document{ + Id: iri, + Title: meta, + Author: core.Principal(author).String(), + Editors: editors, + CreateTime: timestamppb.New(time.Unix(createTime, 0)), + UpdateTime: timestamppb.New(hlc.Timestamp(updateTime).Time()), + }, + } + pub.Document.PublishTime = pub.Document.UpdateTime + + resp.Publications = append(resp.Publications, pub) + + return nil + }, cursor.UpdateTime, cursor.IRI, in.PageSize) + }); err != nil { return nil, err } - if lastBlobID != 0 && in.PageSize == int32(len(resp.Publications)) { - resp.NextPageToken = base64.StdEncoding.EncodeToString(pageToken) - } + return resp, nil } diff --git a/backend/daemon/api/documents/v1alpha/documents_test.go b/backend/daemon/api/documents/v1alpha/documents_test.go index ab89b432c..99ce56026 100644 --- a/backend/daemon/api/documents/v1alpha/documents_test.go +++ b/backend/daemon/api/documents/v1alpha/documents_test.go @@ -502,7 +502,6 @@ func TestAPIPublishDraft_E2E(t *testing.T) { published, err := api.PublishDraft(ctx, &documents.PublishDraftRequest{DocumentId: draft.Id}) require.NoError(t, err) - published.Document.Children = []*documents.BlockNode{} updated.PublishTime = published.Document.PublishTime // Drafts don't have publish time. updated.Children = published.Document.Children diff := cmp.Diff(updated, published.Document, testutil.ExportedFieldsFilter()) @@ -531,9 +530,7 @@ func TestAPIPublishDraft_E2E(t *testing.T) { require.Error(t, err, "must fail to get published draft") } - // Must get publication after publishing. got, err := api.GetPublication(ctx, &documents.GetPublicationRequest{DocumentId: draft.Id}) - got.Document.Children = []*documents.BlockNode{} require.NoError(t, err, "must get document after publishing") testutil.ProtoEqual(t, published, got, "published document doesn't match") @@ -542,6 +539,8 @@ func TestAPIPublishDraft_E2E(t *testing.T) { list, err := api.ListPublications(ctx, &documents.ListPublicationsRequest{}) require.NoError(t, err) require.Len(t, list.Publications, 1, "must have 1 publication") + published.Document.Children = nil + published.Version = "" testutil.ProtoEqual(t, published, list.Publications[0], "publication in the list must match") } } @@ -701,7 +700,6 @@ func TestCreateDraftFromPublication(t *testing.T) { }) pub2, err := api.PublishDraft(ctx, &documents.PublishDraftRequest{DocumentId: draft2.Id}) - pub2.Document.Children = []*documents.BlockNode{} require.NoError(t, err) require.NotNil(t, pub2) @@ -712,6 +710,9 @@ func TestCreateDraftFromPublication(t *testing.T) { pubs, err := api.ListPublications(ctx, &documents.ListPublicationsRequest{}) require.NoError(t, err) require.Len(t, pubs.Publications, 1) + + pub2.Document.Children = nil + pub2.Version = "" testutil.ProtoEqual(t, pub2, pubs.Publications[0], "publication in the list must be the same as published") } diff --git a/backend/daemon/apiutil/page_token.go b/backend/daemon/apiutil/pagination.go similarity index 75% rename from backend/daemon/apiutil/page_token.go rename to backend/daemon/apiutil/pagination.go index dcef2578d..9f436a7c8 100644 --- a/backend/daemon/apiutil/page_token.go +++ b/backend/daemon/apiutil/pagination.go @@ -5,6 +5,9 @@ import ( "encoding/base64" "encoding/json" "fmt" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) // Key is an interface for an encryption key. @@ -52,3 +55,19 @@ func DecodePageToken(token string, value any, key Key) error { return nil } + +// ValidatePageSize validates the provided page size. +// The argument is a pointer because it sets the default value if the provided one is zero. +func ValidatePageSize(pageSize *int32) error { + const defaultSize = 30 + + if *pageSize < 0 { + return status.Errorf(codes.InvalidArgument, "bad page size %d: must be a positive number", pageSize) + } + + if *pageSize == 0 { + *pageSize = defaultSize + } + + return nil +} diff --git a/backend/daemon/daemon_e2e_test.go b/backend/daemon/daemon_e2e_test.go index e593de91e..24bc605de 100644 --- a/backend/daemon/daemon_e2e_test.go +++ b/backend/daemon/daemon_e2e_test.go @@ -18,6 +18,7 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/stretchr/testify/require" + "golang.org/x/exp/slices" "golang.org/x/sync/errgroup" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" @@ -255,9 +256,12 @@ func TestBug_PublicationsListInconsistent(t *testing.T) { want := []*documents.Publication{} for i := 1; i <= 4; i++ { doc := publish(ctx, t, "Doc-"+strconv.Itoa(i), "This is a doc-"+strconv.Itoa(i)) - doc.Document.Children = []*documents.BlockNode{} + doc.Document.Children = nil + doc.Version = "" want = append(want, doc) } + slices.Reverse(want) // Most recently updated docs are returned first. + var g errgroup.Group // Trying this more than once and expecting it to return the same result. This is what bug was mostly about. diff --git a/frontend/packages/app/models/documents.ts b/frontend/packages/app/models/documents.ts index aa241e0d6..e5d6d811c 100644 --- a/frontend/packages/app/models/documents.ts +++ b/frontend/packages/app/models/documents.ts @@ -82,13 +82,8 @@ export function usePublicationList( pageSize: 50, pageToken: context.pageParam, }) - let publications = - result.publications.sort((a, b) => - sortDocuments(a.document?.updateTime, b.document?.updateTime), - ) || [] - // publications = publications.filter((pub) => { - // return pub.document?.title !== '(HIDDEN) Group Navigation' - // }) + const publications = result.publications || [] + return { ...result, publications,