From e66c311c209d14f8ef2a27c07d3d2ff632848a62 Mon Sep 17 00:00:00 2001 From: Viacheslav Date: Mon, 20 May 2024 19:30:01 +0300 Subject: [PATCH] fix(blob/service): fix handling of the padding shares (#3404) Fixes the issue with the missing blob. Initially, there was an incorrect assumption that only one padding share is possible between two blobs. So, the Blob service was skipping only one share instead of multiple. * Fixed parsing logic allowing to skip multiple padding shares; * Added a test case that retrieves the EDS and finds the correct number of blobs; --- blob/parser.go | 26 ++++---- blob/service.go | 2 + blob/service_test.go | 141 +++++++++++++++++++++++++++++++++---------- 3 files changed, 125 insertions(+), 44 deletions(-) diff --git a/blob/parser.go b/blob/parser.go index 51f8a3a17c..0d148ddde1 100644 --- a/blob/parser.go +++ b/blob/parser.go @@ -117,19 +117,21 @@ func (p *parser) skipPadding(shares []shares.Share) ([]shares.Share, error) { return nil, errEmptyShares } - isPadding, err := shares[0].IsPadding() - if err != nil { - return nil, err - } - - if !isPadding { - return shares, nil + offset := 0 + for _, sh := range shares { + isPadding, err := sh.IsPadding() + if err != nil { + return nil, err + } + if !isPadding { + break + } + offset++ } - - // update blob index if we are going to skip one share - p.index++ - if len(shares) > 1 { - return shares[1:], nil + // set start index + p.index = offset + if len(shares) > offset { + return shares[offset:], nil } return nil, nil } diff --git a/blob/service.go b/blob/service.go index 1a81a1ccd4..d75e3a3bfd 100644 --- a/blob/service.go +++ b/blob/service.go @@ -330,6 +330,8 @@ func (s *Service) retrieve( shrs, err = sharesParser.set(rowIndex*len(header.DAH.RowRoots)+index, appShares) if err != nil { if errors.Is(err, errEmptyShares) { + // reset parser as `skipPadding` can update next blob's index + sharesParser.reset() appShares = nil break } diff --git a/blob/service_test.go b/blob/service_test.go index 50b18da3bc..46c8794682 100644 --- a/blob/service_test.go +++ b/blob/service_test.go @@ -9,16 +9,15 @@ import ( "testing" "time" + "github.com/celestiaorg/celestia-app/pkg/appconsts" + "github.com/celestiaorg/celestia-app/pkg/shares" + "github.com/celestiaorg/go-header/store" ds "github.com/ipfs/go-datastore" ds_sync "github.com/ipfs/go-datastore/sync" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" tmrand "github.com/tendermint/tendermint/libs/rand" - "github.com/celestiaorg/celestia-app/pkg/appconsts" - "github.com/celestiaorg/celestia-app/pkg/shares" - "github.com/celestiaorg/go-header/store" - "github.com/celestiaorg/celestia-node/blob/blobtest" "github.com/celestiaorg/celestia-node/header" "github.com/celestiaorg/celestia-node/header/headertest" @@ -425,64 +424,68 @@ func TestService_Get(t *testing.T) { } } +// TestService_GetAllWithoutPadding it retrieves all blobs under the given namespace: +// the amount of the blobs is known and equal to 5. Then it ensures that each blob has a correct index inside the eds +// by requesting share and comparing them. func TestService_GetAllWithoutPadding(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) t.Cleanup(cancel) - appBlob, err := blobtest.GenerateV0Blobs([]int{9, 5}, true) + appBlob, err := blobtest.GenerateV0Blobs([]int{9, 5, 15, 4, 24}, true) require.NoError(t, err) blobs, err := convertBlobs(appBlob...) require.NoError(t, err) - ns1, ns2 := blobs[0].Namespace().ToAppNamespace(), blobs[1].Namespace().ToAppNamespace() + var ( + ns = blobs[0].Namespace().ToAppNamespace() + rawShares = make([][]byte, 0) + ) - padding0, err := shares.NamespacePaddingShare(ns1, appconsts.ShareVersionZero) - require.NoError(t, err) - padding1, err := shares.NamespacePaddingShare(ns2, appconsts.ShareVersionZero) + padding, err := shares.NamespacePaddingShare(ns, appconsts.ShareVersionZero) require.NoError(t, err) - rawShares0, err := BlobsToShares(blobs[0]) + + for i := 0; i < 2; i++ { + sh, err := BlobsToShares(blobs[i]) + require.NoError(t, err) + rawShares = append(rawShares, append(sh, padding.ToBytes())...) + } + + sh, err := BlobsToShares(blobs[2]) require.NoError(t, err) - rawShares1, err := BlobsToShares(blobs[1]) + rawShares = append(rawShares, append(sh, padding.ToBytes(), padding.ToBytes())...) + + sh, err = BlobsToShares(blobs[3]) require.NoError(t, err) - rawShares := make([][]byte, 0) + rawShares = append(rawShares, append(sh, padding.ToBytes(), padding.ToBytes(), padding.ToBytes())...) - // create shares in correct order with padding shares - if bytes.Compare(blobs[0].Namespace(), blobs[1].Namespace()) <= 0 { - rawShares = append(rawShares, append(rawShares0, padding0.ToBytes())...) - rawShares = append(rawShares, append(rawShares1, padding1.ToBytes())...) - } else { - rawShares = append(rawShares, append(rawShares1, padding1.ToBytes())...) - rawShares = append(rawShares, append(rawShares0, padding0.ToBytes())...) - } + sh, err = BlobsToShares(blobs[4]) + require.NoError(t, err) + rawShares = append(rawShares, sh...) bs := ipld.NewMemBlockservice() - batching := ds_sync.MutexWrap(ds.NewMapDatastore()) - headerStore, err := store.NewStore[*header.ExtendedHeader](batching) require.NoError(t, err) eds, err := ipld.AddShares(ctx, rawShares, bs) require.NoError(t, err) h := headertest.ExtendedHeaderFromEDS(t, 1, eds) - err = headerStore.Init(ctx, h) - require.NoError(t, err) fn := func(ctx context.Context, height uint64) (*header.ExtendedHeader, error) { - return headerStore.GetByHeight(ctx, height) + return h, nil } service := NewService(nil, getters.NewIPLDGetter(bs), fn) - blobs, err = service.GetAll(ctx, 1, []share.Namespace{blobs[0].Namespace(), blobs[1].Namespace()}) + newBlobs, err := service.GetAll(ctx, 1, []share.Namespace{blobs[0].Namespace()}) require.NoError(t, err) + assert.Equal(t, len(newBlobs), len(blobs)) - resultShares, err := BlobsToShares(blobs...) + resultShares, err := BlobsToShares(newBlobs...) require.NoError(t, err) - sort.Slice(blobs, func(i, j int) bool { - val := bytes.Compare(blobs[i].NamespaceId, blobs[j].NamespaceId) - return val < 0 - }) + shareOffset := 0 - for _, blob := range blobs { + for i, blob := range newBlobs { + require.True(t, blobs[i].compareCommitments(blob.Commitment)) + row, col := calculateIndex(len(h.DAH.RowRoots), blob.index) sh, err := service.shareGetter.GetShare(ctx, h, row, col) require.NoError(t, err) @@ -492,6 +495,80 @@ func TestService_GetAllWithoutPadding(t *testing.T) { } } +func TestAllPaddingSharesInEDS(t *testing.T) { + nid, err := share.NewBlobNamespaceV0(tmrand.Bytes(7)) + require.NoError(t, err) + padding, err := shares.NamespacePaddingShare(nid.ToAppNamespace(), appconsts.ShareVersionZero) + require.NoError(t, err) + + rawShares := make([]share.Share, 16) + for i := 0; i < 16; i++ { + rawShares[i] = padding.ToBytes() + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + t.Cleanup(cancel) + + bs := ipld.NewMemBlockservice() + require.NoError(t, err) + eds, err := ipld.AddShares(ctx, rawShares, bs) + require.NoError(t, err) + + h := headertest.ExtendedHeaderFromEDS(t, 1, eds) + + fn := func(ctx context.Context, height uint64) (*header.ExtendedHeader, error) { + return h, nil + } + + service := NewService(nil, getters.NewIPLDGetter(bs), fn) + _, err = service.GetAll(ctx, 1, []share.Namespace{nid}) + require.Error(t, err) +} + +func TestSkipPaddingsAndRetrieveBlob(t *testing.T) { + nid, err := share.NewBlobNamespaceV0(tmrand.Bytes(7)) + require.NoError(t, err) + padding, err := shares.NamespacePaddingShare(nid.ToAppNamespace(), appconsts.ShareVersionZero) + require.NoError(t, err) + + rawShares := make([]share.Share, 0, 64) + for i := 0; i < 58; i++ { + rawShares = append(rawShares, padding.ToBytes()) + } + + appBlob, err := blobtest.GenerateV0Blobs([]int{6}, true) + require.NoError(t, err) + appBlob[0].NamespaceVersion = nid[0] + appBlob[0].NamespaceID = nid[1:] + + blobs, err := convertBlobs(appBlob...) + require.NoError(t, err) + sh, err := BlobsToShares(blobs[0]) + require.NoError(t, err) + + rawShares = append(rawShares, sh...) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + t.Cleanup(cancel) + + bs := ipld.NewMemBlockservice() + require.NoError(t, err) + eds, err := ipld.AddShares(ctx, rawShares, bs) + require.NoError(t, err) + + h := headertest.ExtendedHeaderFromEDS(t, 1, eds) + + fn := func(ctx context.Context, height uint64) (*header.ExtendedHeader, error) { + return h, nil + } + + service := NewService(nil, getters.NewIPLDGetter(bs), fn) + newBlob, err := service.GetAll(ctx, 1, []share.Namespace{nid}) + require.NoError(t, err) + require.Len(t, newBlob, 1) + require.True(t, newBlob[0].compareCommitments(blobs[0].Commitment)) +} + // BenchmarkGetByCommitment-12 1869 571663 ns/op 1085371 B/op 6414 allocs/op func BenchmarkGetByCommitment(b *testing.B) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)