diff --git a/process/elasticproc/converters/tags.go b/process/elasticproc/converters/tags.go index 1bdb119b..3af2baa0 100644 --- a/process/elasticproc/converters/tags.go +++ b/process/elasticproc/converters/tags.go @@ -5,6 +5,9 @@ import ( ) const ( + // MaxIDSize is the maximum size of a document id + MaxIDSize = 512 + attributesSeparator = ";" keyValuesSeparator = ":" valuesSeparator = "," diff --git a/process/elasticproc/miniblocks/miniblocksProcessor.go b/process/elasticproc/miniblocks/miniblocksProcessor.go index db8405e8..c4c1c061 100644 --- a/process/elasticproc/miniblocks/miniblocksProcessor.go +++ b/process/elasticproc/miniblocks/miniblocksProcessor.go @@ -48,8 +48,14 @@ func (mp *miniblocksProcessor) PrepareDBMiniblocks(header coreData.HeaderHandler return nil } + selfShard := header.GetShardID() dbMiniblocks := make([]*data.Miniblock, 0) for mbIndex, miniBlock := range miniBlocks { + if miniBlock.ReceiverShardID == core.AllShardId && selfShard != core.MetachainShardId { + // will not index the miniblock on the destination if is for all shards + continue + } + dbMiniBlock, errPrepareMiniBlock := mp.prepareMiniblockForDB(mbIndex, miniBlock, header, headerHash) if errPrepareMiniBlock != nil { log.Warn("miniblocksProcessor.PrepareDBMiniBlocks cannot prepare miniblock", "error", errPrepareMiniBlock) diff --git a/process/elasticproc/tags/serialize.go b/process/elasticproc/tags/serialize.go index be13563f..1de2f129 100644 --- a/process/elasticproc/tags/serialize.go +++ b/process/elasticproc/tags/serialize.go @@ -16,12 +16,16 @@ func (tc *tagsCount) Serialize(buffSlice *data.BufferSlice, index string) error } base64Tag := base64.StdEncoding.EncodeToString([]byte(tag)) + if len(base64Tag) > converters.MaxIDSize { + base64Tag = base64Tag[:converters.MaxIDSize] + } meta := []byte(fmt.Sprintf(`{ "update" : {"_index":"%s", "_id" : "%s" } }%s`, index, converters.JsonEscape(base64Tag), "\n")) codeToExecute := ` ctx._source.count += params.count; ctx._source.tag = params.tag ` + serializedDataStr := fmt.Sprintf(`{"script": {"source": "%s","lang": "painless","params": {"count": %d, "tag": "%s"}},"upsert": {"count": %d, "tag":"%s"}}`, converters.FormatPainlessSource(codeToExecute), count, converters.JsonEscape(tag), count, converters.JsonEscape(tag), ) diff --git a/process/elasticproc/tags/serialize_test.go b/process/elasticproc/tags/serialize_test.go index 80e69d7d..ffeeadf5 100644 --- a/process/elasticproc/tags/serialize_test.go +++ b/process/elasticproc/tags/serialize_test.go @@ -1,9 +1,13 @@ package tags import ( + "crypto/rand" + "encoding/base64" + "fmt" "testing" "github.com/multiversx/mx-chain-es-indexer-go/data" + "github.com/multiversx/mx-chain-es-indexer-go/process/elasticproc/converters" "github.com/stretchr/testify/require" ) @@ -24,3 +28,23 @@ func TestTagsCount_Serialize(t *testing.T) { ` require.Equal(t, expected, buffSlice.Buffers()[0].String()) } + +func TestTagsCount_TruncateID(t *testing.T) { + t.Parallel() + + tagsC := NewTagsCount() + + randomBytes := make([]byte, 600) + _, _ = rand.Read(randomBytes) + + tagsC.ParseTags([]string{string(randomBytes)}) + + buffSlice := data.NewBufferSlice(data.DefaultMaxBulkSize) + err := tagsC.Serialize(buffSlice, "tags") + require.Nil(t, err) + + expected := fmt.Sprintf(`{ "update" : {"_index":"tags", "_id" : "%s" } } +{"script": {"source": "ctx._source.count += params.count; ctx._source.tag = params.tag","lang": "painless","params": {"count": 1, "tag": "%s"}},"upsert": {"count": 1, "tag":"%s"}} +`, base64.StdEncoding.EncodeToString(randomBytes)[:converters.MaxIDSize], converters.JsonEscape(string(randomBytes)), converters.JsonEscape(string(randomBytes))) + require.Equal(t, expected, buffSlice.Buffers()[0].String()) +}