diff --git a/cmd/elasticindexer/config/config.toml b/cmd/elasticindexer/config/config.toml index 35fa0702..b63329c7 100644 --- a/cmd/elasticindexer/config/config.toml +++ b/cmd/elasticindexer/config/config.toml @@ -2,7 +2,7 @@ available-indices = [ "rating", "transactions", "blocks", "validators", "miniblocks", "rounds", "accounts", "accountshistory", "receipts", "scresults", "accountsesdt", "accountsesdthistory", "epochinfo", "scdeploys", "tokens", "tags", - "logs", "delegators", "operations", "esdts" + "logs", "delegators", "operations", "esdts", "values", "events" ] [config.address-converter] length = 32 diff --git a/data/event.go b/data/event.go new file mode 100644 index 00000000..f3e22668 --- /dev/null +++ b/data/event.go @@ -0,0 +1,20 @@ +package data + +import "time" + +// LogEvent is the dto for the log event structure +type LogEvent struct { + ID string `json:"-"` + TxHash string `json:"txHash"` + OriginalTxHash string `json:"originalTxHash,omitempty"` + LogAddress string `json:"logAddress"` + Address string `json:"address"` + Identifier string `json:"identifier"` + Data string `json:"data,omitempty"` + AdditionalData []string `json:"additionalData,omitempty"` + Topics []string `json:"topics"` + Order int `json:"order"` + TxOrder int `json:"txOrder"` + ShardID uint32 `json:"shardID"` + Timestamp time.Duration `json:"timestamp,omitempty"` +} diff --git a/data/logs.go b/data/logs.go index 1b568fe7..2daf0179 100644 --- a/data/logs.go +++ b/data/logs.go @@ -38,4 +38,5 @@ type PreparedLogsResults struct { NFTsDataUpdates []*NFTDataUpdate TokenRolesAndProperties *tokeninfo.TokenRolesAndProperties DBLogs []*Logs + DBEvents []*LogEvent } diff --git a/data/scresult.go b/data/scresult.go index a2f595ce..b246528d 100644 --- a/data/scresult.go +++ b/data/scresult.go @@ -39,6 +39,7 @@ type ScResult struct { CanBeIgnored bool `json:"canBeIgnored,omitempty"` OriginalSender string `json:"originalSender,omitempty"` HasLogs bool `json:"hasLogs,omitempty"` + ExecutionOrder int `json:"-"` SenderAddressBytes []byte `json:"-"` InitialTxGasUsed uint64 `json:"-"` InitialTxFee string `json:"-"` diff --git a/data/transaction.go b/data/transaction.go index 04b7364f..d5cefd56 100644 --- a/data/transaction.go +++ b/data/transaction.go @@ -5,8 +5,8 @@ import ( ) // Transaction is a structure containing all the fields that need -// to be saved for a transaction. It has all the default fields -// plus some extra information for ease of search and filter +// to be saved for a transaction. It has all the default fields +// plus some extra information for ease of search and filter type Transaction struct { MBHash string `json:"miniBlockHash"` Nonce uint64 `json:"nonce"` @@ -48,6 +48,7 @@ type Transaction struct { GuardianSignature string `json:"guardianSignature,omitempty"` ErrorEvent bool `json:"errorEvent,omitempty"` CompletedEvent bool `json:"completedEvent,omitempty"` + ExecutionOrder int `json:"-"` SmartContractResults []*ScResult `json:"-"` Hash string `json:"-"` BlockHash string `json:"-"` diff --git a/docker-compose.yml b/docker-compose.yml index 7b03b074..970f0b40 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,4 +1,3 @@ -version: "3.0" services: elasticsearch: container_name: es-container diff --git a/go.mod b/go.mod index 2dd2ccb9..962aae85 100644 --- a/go.mod +++ b/go.mod @@ -6,10 +6,10 @@ require ( github.com/elastic/go-elasticsearch/v7 v7.12.0 github.com/gin-contrib/cors v1.4.0 github.com/gin-gonic/gin v1.9.1 - github.com/multiversx/mx-chain-communication-go v1.0.12 - github.com/multiversx/mx-chain-core-go v1.2.16 - github.com/multiversx/mx-chain-logger-go v1.0.13 - github.com/multiversx/mx-chain-vm-common-go v1.5.2 + github.com/multiversx/mx-chain-communication-go v1.0.14 + github.com/multiversx/mx-chain-core-go v1.2.19 + github.com/multiversx/mx-chain-logger-go v1.0.14 + github.com/multiversx/mx-chain-vm-common-go v1.5.12 github.com/prometheus/client_model v0.4.0 github.com/prometheus/common v0.37.0 github.com/stretchr/testify v1.8.4 diff --git a/go.sum b/go.sum index 87bf48e2..483734ce 100644 --- a/go.sum +++ b/go.sum @@ -247,15 +247,15 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/mr-tron/base58 v1.2.0 h1:T/HDJBh4ZCPbU39/+c3rRvE0uKBQlU27+QI8LJ4t64o= github.com/mr-tron/base58 v1.2.0/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc= -github.com/multiversx/mx-chain-communication-go v1.0.12 h1:67WOaf87gpwouydD1AAOHw5LMGZh7NfITrp/KqFY3Tw= -github.com/multiversx/mx-chain-communication-go v1.0.12/go.mod h1:+oaUowpq+SqrEmAsMPGwhz44g7L81loWb6AiNQU9Ms4= -github.com/multiversx/mx-chain-core-go v1.2.16 h1:m0hUNmZQjGJxKDLQOHoM9jSaeDfVTbyd+mqiS8+NckE= -github.com/multiversx/mx-chain-core-go v1.2.16/go.mod h1:BILOGHUOIG5dNNX8cgkzCNfDaVtoYrJRYcPnpxRMH84= -github.com/multiversx/mx-chain-crypto-go v1.2.8 h1:wOgVlUaO5X4L8iEbFjcQcL8SZvv6WZ7LqH73BiRPhxU= -github.com/multiversx/mx-chain-logger-go v1.0.13 h1:eru/TETo0MkO4ZTnXsQDKf4PBRpAXmqjT02klNT/JnY= -github.com/multiversx/mx-chain-logger-go v1.0.13/go.mod h1:MZJhTAtZTJxT+yK2EHc4ZW3YOHUc1UdjCD0iahRNBZk= -github.com/multiversx/mx-chain-vm-common-go v1.5.2 h1:iRWJNlogjkq9w+pJZIfkVkXQFmMoRxZr6pzCfg2/K68= -github.com/multiversx/mx-chain-vm-common-go v1.5.2/go.mod h1:sqkKMCnwkWl8DURdb9q7pctK8IANghdHY1KJLE0ox2c= +github.com/multiversx/mx-chain-communication-go v1.0.14 h1:YhAUDjBBpc5h5W0A7LHLXUMIMeCgwgGvkqfAPbFqsno= +github.com/multiversx/mx-chain-communication-go v1.0.14/go.mod h1:qYCqgk0h+YpcTA84jHIpCBy6UShRwmXzHSCcdfwNrkw= +github.com/multiversx/mx-chain-core-go v1.2.19 h1:2BaVHkB0tro3cjs5ay2pmLup1loCV0e1p9jV5QW0xqc= +github.com/multiversx/mx-chain-core-go v1.2.19/go.mod h1:B5zU4MFyJezmEzCsAHE9YNULmGCm2zbPHvl9hazNxmE= +github.com/multiversx/mx-chain-crypto-go v1.2.11 h1:MNPJoiTJA5/tedYrI0N22OorbsKDESWG0SF8MCJwcJI= +github.com/multiversx/mx-chain-logger-go v1.0.14 h1:PRMpAvXE7Nec2d//QNmbYfKVHMomOKmcN4UXurQWX9o= +github.com/multiversx/mx-chain-logger-go v1.0.14/go.mod h1:bDfHSdwqIimn7Gp8w+SH5KlDuGzJ//nlyEANAaTSc3o= +github.com/multiversx/mx-chain-vm-common-go v1.5.12 h1:Q8F6DE7XhgHtWgg2rozSv4Tv5fE3ENkJz6mjRoAfht8= +github.com/multiversx/mx-chain-vm-common-go v1.5.12/go.mod h1:Sv6iS1okB6gy3HAsW6KHYtAxShNAfepKLtu//AURI8c= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= diff --git a/integrationtests/accountsBalanceNftTransfer_test.go b/integrationtests/accountsBalanceNftTransfer_test.go index 7dbd2023..aaf44e00 100644 --- a/integrationtests/accountsBalanceNftTransfer_test.go +++ b/integrationtests/accountsBalanceNftTransfer_test.go @@ -34,6 +34,7 @@ func createOutportBlockWithHeader( TransactionPool: pool, AlteredAccounts: coreAlteredAccounts, NumberOfShards: numOfShards, + ShardID: header.GetShardID(), }, Header: header, } diff --git a/integrationtests/logsCrossShard_test.go b/integrationtests/logsCrossShard_test.go index aa62f94a..7c96688f 100644 --- a/integrationtests/logsCrossShard_test.go +++ b/integrationtests/logsCrossShard_test.go @@ -30,13 +30,21 @@ func TestIndexLogSourceShardAndAfterDestinationAndAgainSource(t *testing.T) { Round: 50, TimeStamp: 5040, } - body := &dataBlock.Body{} + + txHash := []byte("cross-log") + logID := hex.EncodeToString(txHash) + + body := &dataBlock.Body{ + MiniBlocks: []*dataBlock.MiniBlock{ + { + TxHashes: [][]byte{txHash}, + }, + }, + } address1 := "erd1ju8pkvg57cwdmjsjx58jlmnuf4l9yspstrhr9tgsrt98n9edpm2qtlgy99" address2 := "erd1w7jyzuj6cv4ngw8luhlkakatjpmjh3ql95lmxphd3vssc4vpymks6k5th7" - logID := hex.EncodeToString([]byte("cross-log")) - // index on source pool := &outport.TransactionPool{ Logs: []*outport.LogData{ @@ -55,6 +63,12 @@ func TestIndexLogSourceShardAndAfterDestinationAndAgainSource(t *testing.T) { }, }, }, + Transactions: map[string]*outport.TxInfo{ + logID: { + Transaction: &transaction.Transaction{}, + ExecutionOrder: 0, + }, + }, } err = esProc.SaveTransactions(createOutportBlockWithHeader(body, header, pool, map[string]*alteredAccount.AlteredAccount{}, testNumOfShards)) require.Nil(t, err) @@ -68,10 +82,20 @@ func TestIndexLogSourceShardAndAfterDestinationAndAgainSource(t *testing.T) { string(genericResponse.Docs[0].Source), ) + event1ID := logID + "-0-0" + ids = []string{event1ID} + err = esClient.DoMultiGet(context.Background(), ids, indexerdata.EventsIndex, true, genericResponse) + require.Nil(t, err) + require.JSONEq(t, + readExpectedResult("./testdata/logsCrossShard/event-transfer-source-first.json"), + string(genericResponse.Docs[0].Source), + ) + // INDEX ON DESTINATION header = &dataBlock.Header{ Round: 50, TimeStamp: 6040, + ShardID: 1, } pool = &outport.TransactionPool{ Logs: []*outport.LogData{ @@ -96,10 +120,17 @@ func TestIndexLogSourceShardAndAfterDestinationAndAgainSource(t *testing.T) { }, }, }, + Transactions: map[string]*outport.TxInfo{ + logID: { + Transaction: &transaction.Transaction{}, + ExecutionOrder: 0, + }, + }, } err = esProc.SaveTransactions(createOutportBlockWithHeader(body, header, pool, map[string]*alteredAccount.AlteredAccount{}, testNumOfShards)) require.Nil(t, err) + ids = []string{logID} err = esClient.DoMultiGet(context.Background(), ids, indexerdata.LogsIndex, true, genericResponse) require.Nil(t, err) require.JSONEq(t, @@ -107,6 +138,19 @@ func TestIndexLogSourceShardAndAfterDestinationAndAgainSource(t *testing.T) { string(genericResponse.Docs[0].Source), ) + event2ID, event3ID := logID+"-1-0", logID+"-1-1" + ids = []string{event2ID, event3ID} + err = esClient.DoMultiGet(context.Background(), ids, indexerdata.EventsIndex, true, genericResponse) + require.Nil(t, err) + require.JSONEq(t, + readExpectedResult("./testdata/logsCrossShard/event-transfer-destination.json"), + string(genericResponse.Docs[0].Source), + ) + require.JSONEq(t, + readExpectedResult("./testdata/logsCrossShard/event-do-something.json"), + string(genericResponse.Docs[1].Source), + ) + // index on source again should not change the log header = &dataBlock.Header{ Round: 50, @@ -129,10 +173,17 @@ func TestIndexLogSourceShardAndAfterDestinationAndAgainSource(t *testing.T) { }, }, }, + Transactions: map[string]*outport.TxInfo{ + logID: { + Transaction: &transaction.Transaction{}, + ExecutionOrder: 0, + }, + }, } err = esProc.SaveTransactions(createOutportBlockWithHeader(body, header, pool, map[string]*alteredAccount.AlteredAccount{}, testNumOfShards)) require.Nil(t, err) + ids = []string{logID} err = esClient.DoMultiGet(context.Background(), ids, indexerdata.LogsIndex, true, genericResponse) require.Nil(t, err) require.JSONEq(t, @@ -147,6 +198,7 @@ func TestIndexLogSourceShardAndAfterDestinationAndAgainSource(t *testing.T) { MiniBlockHeaders: []dataBlock.MiniBlockHeader{ {}, }, + ShardID: 1, } body = &dataBlock.Body{ MiniBlocks: []*dataBlock.MiniBlock{ @@ -163,4 +215,11 @@ func TestIndexLogSourceShardAndAfterDestinationAndAgainSource(t *testing.T) { require.Nil(t, err) require.False(t, genericResponse.Docs[0].Found) + + ids = []string{event2ID, event3ID} + err = esClient.DoMultiGet(context.Background(), ids, indexerdata.EventsIndex, true, genericResponse) + require.Nil(t, err) + + require.False(t, genericResponse.Docs[0].Found) + require.False(t, genericResponse.Docs[1].Found) } diff --git a/integrationtests/testdata/logsCrossShard/event-do-something.json b/integrationtests/testdata/logsCrossShard/event-do-something.json new file mode 100644 index 00000000..2490eea4 --- /dev/null +++ b/integrationtests/testdata/logsCrossShard/event-do-something.json @@ -0,0 +1,14 @@ +{ + "logAddress": "erd1ju8pkvg57cwdmjsjx58jlmnuf4l9yspstrhr9tgsrt98n9edpm2qtlgy99", + "identifier": "do-something", + "address": "erd1w7jyzuj6cv4ngw8luhlkakatjpmjh3ql95lmxphd3vssc4vpymks6k5th7", + "topics": [ + "746f70696331", + "746f70696332" + ], + "shardID": 1, + "txHash": "63726f73732d6c6f67", + "order": 1, + "timestamp": 6040, + "txOrder": 0 +} diff --git a/integrationtests/testdata/logsCrossShard/event-transfer-destination.json b/integrationtests/testdata/logsCrossShard/event-transfer-destination.json new file mode 100644 index 00000000..c373f9be --- /dev/null +++ b/integrationtests/testdata/logsCrossShard/event-transfer-destination.json @@ -0,0 +1,15 @@ +{ + "logAddress": "erd1ju8pkvg57cwdmjsjx58jlmnuf4l9yspstrhr9tgsrt98n9edpm2qtlgy99", + "identifier": "ESDTTransfer", + "address": "erd1ju8pkvg57cwdmjsjx58jlmnuf4l9yspstrhr9tgsrt98n9edpm2qtlgy99", + "topics": [ + "455344542d61626364", + "", + "01" + ], + "shardID": 1, + "txHash": "63726f73732d6c6f67", + "order": 0, + "timestamp": 6040, + "txOrder": 0 +} diff --git a/integrationtests/testdata/logsCrossShard/event-transfer-source-first.json b/integrationtests/testdata/logsCrossShard/event-transfer-source-first.json new file mode 100644 index 00000000..d085f338 --- /dev/null +++ b/integrationtests/testdata/logsCrossShard/event-transfer-source-first.json @@ -0,0 +1,15 @@ +{ + "logAddress": "erd1ju8pkvg57cwdmjsjx58jlmnuf4l9yspstrhr9tgsrt98n9edpm2qtlgy99", + "identifier": "ESDTTransfer", + "address": "erd1ju8pkvg57cwdmjsjx58jlmnuf4l9yspstrhr9tgsrt98n9edpm2qtlgy99", + "topics": [ + "455344542d61626364", + "", + "01" + ], + "shardID": 0, + "txOrder": 0, + "txHash": "63726f73732d6c6f67", + "order": 0, + "timestamp": 5040 +} diff --git a/integrationtests/utils.go b/integrationtests/utils.go index ee243b2a..27f07c23 100644 --- a/integrationtests/utils.go +++ b/integrationtests/utils.go @@ -57,7 +57,7 @@ func CreateElasticProcessor( ValidatorPubkeyConverter: mock.NewPubkeyConverterMock(32), DBClient: esClient, EnabledIndexes: []string{dataindexer.TransactionsIndex, dataindexer.LogsIndex, dataindexer.AccountsESDTIndex, dataindexer.ScResultsIndex, - dataindexer.ReceiptsIndex, dataindexer.BlockIndex, dataindexer.AccountsIndex, dataindexer.TokensIndex, dataindexer.TagsIndex, + dataindexer.ReceiptsIndex, dataindexer.BlockIndex, dataindexer.AccountsIndex, dataindexer.TokensIndex, dataindexer.TagsIndex, dataindexer.EventsIndex, dataindexer.OperationsIndex, dataindexer.DelegatorsIndex, dataindexer.ESDTsIndex, dataindexer.SCDeploysIndex, dataindexer.MiniblocksIndex, dataindexer.ValuesIndex}, Denomination: 18, } diff --git a/process/dataindexer/constants.go b/process/dataindexer/constants.go index 05196c63..229fc5d8 100644 --- a/process/dataindexer/constants.go +++ b/process/dataindexer/constants.go @@ -47,6 +47,8 @@ const ( ESDTsIndex = "esdts" // ValuesIndex is the Elasticsearch index for extra indexer information ValuesIndex = "values" + // EventsIndex is the Elasticsearch index for log events + EventsIndex = "events" // TransactionsPolicy is the Elasticsearch policy for the transactions TransactionsPolicy = "transactions_policy" diff --git a/process/elasticproc/block/blockProcessor.go b/process/elasticproc/block/blockProcessor.go index c23e3e1b..a182f358 100644 --- a/process/elasticproc/block/blockProcessor.go +++ b/process/elasticproc/block/blockProcessor.go @@ -73,7 +73,7 @@ func (bp *blockProcessor) PrepareBlockForDB(obh *outport.OutportBlockWithHeader) } sizeTxs := computeSizeOfTransactions(obh.TransactionPool) - miniblocksHashes := bp.getEncodedMBSHashes(obh.BlockData.Body) + miniblocksHashes := bp.getEncodedMBSHashes(obh.BlockData.Body, obh.BlockData.IntraShardMiniBlocks) leaderIndex := bp.getLeaderIndex(obh.SignersIndexes) numTxs, notarizedTxs := getTxsCount(obh.Header) @@ -126,7 +126,9 @@ func (bp *blockProcessor) PrepareBlockForDB(obh *outport.OutportBlockWithHeader) } bp.addEpochStartInfoForMeta(obh.Header, elasticBlock) - putMiniblocksDetailsInBlock(obh.Header, elasticBlock, obh.TransactionPool, obh.BlockData.Body) + + appendBlockDetailsFromHeaders(elasticBlock, obh.Header, obh.BlockData.Body, obh.TransactionPool) + appendBlockDetailsFromIntraShardMbs(elasticBlock, obh.BlockData.IntraShardMiniBlocks, obh.TransactionPool, len(obh.Header.GetMiniBlockHeaderHandlers())) return elasticBlock, nil } @@ -227,9 +229,10 @@ func (bp *blockProcessor) addEpochStartShardDataForMeta(epochStartShardData node block.EpochStartShardsData = append(block.EpochStartShardsData, shardData) } -func (bp *blockProcessor) getEncodedMBSHashes(body *block.Body) []string { +func (bp *blockProcessor) getEncodedMBSHashes(body *block.Body, intraShardMbs []*nodeBlock.MiniBlock) []string { miniblocksHashes := make([]string, 0) - for _, miniblock := range body.MiniBlocks { + mbs := append(body.MiniBlocks, intraShardMbs...) + for _, miniblock := range mbs { mbHash, errComputeHash := core.CalculateHash(bp.marshalizer, bp.hasher, miniblock) if errComputeHash != nil { log.Warn("internal error computing hash", "error", errComputeHash) @@ -244,10 +247,8 @@ func (bp *blockProcessor) getEncodedMBSHashes(body *block.Body) []string { return miniblocksHashes } -func putMiniblocksDetailsInBlock(header coreData.HeaderHandler, block *data.Block, pool *outport.TransactionPool, body *block.Body) { - mbHeaders := header.GetMiniBlockHeaderHandlers() - - for idx, mbHeader := range mbHeaders { +func appendBlockDetailsFromHeaders(block *data.Block, header coreData.HeaderHandler, body *block.Body, pool *outport.TransactionPool) { + for idx, mbHeader := range header.GetMiniBlockHeaderHandlers() { mbType := nodeBlock.Type(mbHeader.GetTypeInt32()) if mbType == nodeBlock.PeerBlock { continue @@ -268,6 +269,42 @@ func putMiniblocksDetailsInBlock(header coreData.HeaderHandler, block *data.Bloc } } +func appendBlockDetailsFromIntraShardMbs(block *data.Block, intraShardMbs []*block.MiniBlock, pool *outport.TransactionPool, offset int) { + for idx, intraMB := range intraShardMbs { + if intraMB.Type == nodeBlock.PeerBlock || intraMB.Type == nodeBlock.ReceiptBlock { + continue + } + + block.MiniBlocksDetails = append(block.MiniBlocksDetails, &data.MiniBlocksDetails{ + IndexFirstProcessedTx: 0, + IndexLastProcessedTx: int32(len(intraMB.GetTxHashes()) - 1), + SenderShardID: intraMB.GetSenderShardID(), + ReceiverShardID: intraMB.GetReceiverShardID(), + MBIndex: idx + offset, + Type: intraMB.Type.String(), + ProcessingType: nodeBlock.Normal.String(), + TxsHashes: hexEncodeSlice(intraMB.TxHashes), + ExecutionOrderTxsIndices: extractExecutionOrderIntraShardMBUnsigned(intraMB, pool), + }) + } +} + +func extractExecutionOrderIntraShardMBUnsigned(mb *block.MiniBlock, pool *outport.TransactionPool) []int { + executionOrderTxsIndices := make([]int, len(mb.TxHashes)) + for idx, txHash := range mb.TxHashes { + executionOrder, found := getExecutionOrderForTx(txHash, int32(mb.Type), pool) + if !found { + log.Warn("blockProcessor.extractExecutionOrderIntraShardMBUnsigned cannot find tx in pool", "txHash", hex.EncodeToString(txHash)) + executionOrderTxsIndices[idx] = notFound + continue + } + + executionOrderTxsIndices[idx] = int(executionOrder) + } + + return executionOrderTxsIndices +} + func extractExecutionOrderIndicesFromPool(mbHeader coreData.MiniBlockHeaderHandler, txsHashes [][]byte, pool *outport.TransactionPool) []int { mbType := mbHeader.GetTypeInt32() executionOrderTxsIndices := make([]int, len(txsHashes)) diff --git a/process/elasticproc/block/blockProcessor_test.go b/process/elasticproc/block/blockProcessor_test.go index d727fc1f..b0208c23 100644 --- a/process/elasticproc/block/blockProcessor_test.go +++ b/process/elasticproc/block/blockProcessor_test.go @@ -368,7 +368,7 @@ func TestBlockProcessor_PrepareBlockForDBMiniBlocksDetails(t *testing.T) { } mbhrBytes, _ := gogoMarshaller.Marshal(mbhr) - txHash, notExecutedTxHash, notFoundTxHash, invalidTxHash, rewardsTxHash, scrHash := "tx", "notExecuted", "notFound", "invalid", "reward", "scr" + txHash, notExecutedTxHash, notFoundTxHash, invalidTxHash, rewardsTxHash, scrHash, intraSCR := "tx", "notExecuted", "notFound", "invalid", "reward", "scr", "intraSCR" header := &dataBlock.Header{ TxCount: 5, @@ -397,6 +397,12 @@ func TestBlockProcessor_PrepareBlockForDBMiniBlocksDetails(t *testing.T) { Header: header, OutportBlock: &outport.OutportBlock{ BlockData: &outport.BlockData{ + IntraShardMiniBlocks: []*dataBlock.MiniBlock{ + { + Type: dataBlock.SmartContractResultBlock, + TxHashes: [][]byte{[]byte(intraSCR)}, + }, + }, HeaderBytes: headerBytes, HeaderHash: []byte("hash"), Body: &dataBlock.Body{ @@ -446,6 +452,10 @@ func TestBlockProcessor_PrepareBlockForDBMiniBlocksDetails(t *testing.T) { SmartContractResult: &smartContractResult.SmartContractResult{}, ExecutionOrder: 0, }, + hex.EncodeToString([]byte(intraSCR)): { + SmartContractResult: &smartContractResult.SmartContractResult{}, + ExecutionOrder: 4, + }, }, }, HeaderGasConsumption: &outport.HeaderGasConsumption{}, @@ -458,7 +468,7 @@ func TestBlockProcessor_PrepareBlockForDBMiniBlocksDetails(t *testing.T) { require.Equal(t, &data.Block{ Hash: "68617368", Size: int64(723), - SizeTxs: 15, + SizeTxs: 21, AccumulatedFees: "0", DeveloperFees: "0", TxCount: uint32(5), @@ -468,6 +478,7 @@ func TestBlockProcessor_PrepareBlockForDBMiniBlocksDetails(t *testing.T) { "1183f422a5b76c3cb7b439334f1fe7235c8d09f577e0f1e15e62cd05b9a81950", "b24e307f3917e84603d3ebfb9c03c8fc651b62cb68ca884c3ff015b66a610a79", "c0a855563172b2f72be569963d26d4fae38d4371342e2bf3ded93466a72f36f3", + "381b0f52b35781ddce70dc7ee08907a29f49ed9c46ea0b7b59e5833ba3213d10", }, MiniBlocksDetails: []*data.MiniBlocksDetails{ { @@ -502,6 +513,13 @@ func TestBlockProcessor_PrepareBlockForDBMiniBlocksDetails(t *testing.T) { ProcessingType: dataBlock.Normal.String(), ExecutionOrderTxsIndices: []int{0}, TxsHashes: []string{"736372"}}, + {IndexFirstProcessedTx: 0, + IndexLastProcessedTx: 0, + MBIndex: 4, + Type: dataBlock.SmartContractResultBlock.String(), + ProcessingType: dataBlock.Normal.String(), + ExecutionOrderTxsIndices: []int{4}, + TxsHashes: []string{"696e747261534352"}}, }, }, dbBlock) } diff --git a/process/elasticproc/converters/field.go b/process/elasticproc/converters/field.go index d96d3571..27b6be81 100644 --- a/process/elasticproc/converters/field.go +++ b/process/elasticproc/converters/field.go @@ -21,7 +21,7 @@ func TruncateFieldIfExceedsMaxLengthBase64(field string) string { return field } -//TruncateSliceElementsIfExceedsMaxLength will truncate the provided slice of the field if the max length is exceeded +// TruncateSliceElementsIfExceedsMaxLength will truncate the provided slice of the field if the max length is exceeded func TruncateSliceElementsIfExceedsMaxLength(fields []string) []string { var localFields []string for _, field := range fields { diff --git a/process/elasticproc/elasticProcessor.go b/process/elasticproc/elasticProcessor.go index 7306abf6..640e5f09 100644 --- a/process/elasticproc/elasticProcessor.go +++ b/process/elasticproc/elasticProcessor.go @@ -30,7 +30,7 @@ var ( elasticIndexer.TransactionsIndex, elasticIndexer.BlockIndex, elasticIndexer.MiniblocksIndex, elasticIndexer.RatingIndex, elasticIndexer.RoundsIndex, elasticIndexer.ValidatorsIndex, elasticIndexer.AccountsIndex, elasticIndexer.AccountsHistoryIndex, elasticIndexer.ReceiptsIndex, elasticIndexer.ScResultsIndex, elasticIndexer.AccountsESDTHistoryIndex, elasticIndexer.AccountsESDTIndex, elasticIndexer.EpochInfoIndex, elasticIndexer.SCDeploysIndex, elasticIndexer.TokensIndex, elasticIndexer.TagsIndex, elasticIndexer.LogsIndex, elasticIndexer.DelegatorsIndex, elasticIndexer.OperationsIndex, - elasticIndexer.ESDTsIndex, elasticIndexer.ValuesIndex, + elasticIndexer.ESDTsIndex, elasticIndexer.ValuesIndex, elasticIndexer.EventsIndex, } ) @@ -328,6 +328,11 @@ func (ei *elasticProcessor) RemoveTransactions(header coreData.HeaderHandler, bo return err } + err = ei.removeFromIndexByTimestampAndShardID(header.GetTimeStamp(), header.GetShardID(), elasticIndexer.EventsIndex) + if err != nil { + return err + } + return ei.updateDelegatorsInCaseOfRevert(header, body) } @@ -360,20 +365,21 @@ func (ei *elasticProcessor) removeIfHashesNotEmpty(index string, hashes []string // RemoveAccountsESDT will remove data from accountsesdt index and accountsesdthistory func (ei *elasticProcessor) RemoveAccountsESDT(headerTimestamp uint64, shardID uint32) error { - ctxWithValue := context.WithValue(context.Background(), request.ContextKey, request.ExtendTopicWithShardID(request.RemoveTopic, shardID)) - query := fmt.Sprintf(`{"query": {"bool": {"must": [{"match": {"shardID": {"query": %d,"operator": "AND"}}},{"match": {"timestamp": {"query": "%d","operator": "AND"}}}]}}}`, shardID, headerTimestamp) - err := ei.elasticClient.DoQueryRemove( - ctxWithValue, - elasticIndexer.AccountsESDTIndex, - bytes.NewBuffer([]byte(query)), - ) + err := ei.removeFromIndexByTimestampAndShardID(headerTimestamp, shardID, elasticIndexer.AccountsESDTIndex) if err != nil { return err } + return ei.removeFromIndexByTimestampAndShardID(headerTimestamp, shardID, elasticIndexer.AccountsESDTHistoryIndex) +} + +func (ei *elasticProcessor) removeFromIndexByTimestampAndShardID(headerTimestamp uint64, shardID uint32, index string) error { + ctxWithValue := context.WithValue(context.Background(), request.ContextKey, request.ExtendTopicWithShardID(request.RemoveTopic, shardID)) + query := fmt.Sprintf(`{"query": {"bool": {"must": [{"match": {"shardID": {"query": %d,"operator": "AND"}}},{"match": {"timestamp": {"query": "%d","operator": "AND"}}}]}}}`, shardID, headerTimestamp) + return ei.elasticClient.DoQueryRemove( ctxWithValue, - elasticIndexer.AccountsESDTHistoryIndex, + index, bytes.NewBuffer([]byte(query)), ) } @@ -424,7 +430,12 @@ func (ei *elasticProcessor) SaveTransactions(obh *outport.OutportBlockWithHeader return err } - err = ei.prepareAndIndexLogs(logsData.DBLogs, buffers) + err = ei.indexLogs(logsData.DBLogs, buffers) + if err != nil { + return err + } + + err = ei.indexEvents(logsData.DBEvents, buffers) if err != nil { return err } @@ -511,7 +522,7 @@ func (ei *elasticProcessor) indexTransactionsFeeData(txsHashFeeData map[string]* return ei.transactionsProc.SerializeTransactionsFeeData(txsHashFeeData, buffSlice, elasticIndexer.OperationsIndex) } -func (ei *elasticProcessor) prepareAndIndexLogs(logsDB []*data.Logs, buffSlice *data.BufferSlice) error { +func (ei *elasticProcessor) indexLogs(logsDB []*data.Logs, buffSlice *data.BufferSlice) error { if !ei.isIndexEnabled(elasticIndexer.LogsIndex) { return nil } @@ -519,6 +530,14 @@ func (ei *elasticProcessor) prepareAndIndexLogs(logsDB []*data.Logs, buffSlice * return ei.logsAndEventsProc.SerializeLogs(logsDB, buffSlice, elasticIndexer.LogsIndex) } +func (ei *elasticProcessor) indexEvents(eventsDB []*data.LogEvent, buffSlice *data.BufferSlice) error { + if !ei.isIndexEnabled(elasticIndexer.EventsIndex) { + return nil + } + + return ei.logsAndEventsProc.SerializeEvents(eventsDB, buffSlice, elasticIndexer.EventsIndex) +} + func (ei *elasticProcessor) indexScDeploys(deployData map[string]*data.ScDeployInfo, changeOwnerOperation map[string]*data.OwnerData, buffSlice *data.BufferSlice) error { if !ei.isIndexEnabled(elasticIndexer.SCDeploysIndex) { return nil diff --git a/process/elasticproc/elasticProcessor_test.go b/process/elasticproc/elasticProcessor_test.go index 381db5db..9f3311af 100644 --- a/process/elasticproc/elasticProcessor_test.go +++ b/process/elasticproc/elasticProcessor_test.go @@ -463,10 +463,18 @@ func TestElasticProcessor_RemoveTransactions(t *testing.T) { dbWriter := &mock.DatabaseWriterStub{ DoQueryRemoveCalled: func(index string, body *bytes.Buffer) error { bodyStr := body.String() - require.Contains(t, []string{dataindexer.TransactionsIndex, dataindexer.OperationsIndex, dataindexer.LogsIndex}, index) - require.True(t, strings.Contains(bodyStr, expectedHashes[0])) - require.True(t, strings.Contains(bodyStr, expectedHashes[1])) - called = true + require.Contains(t, []string{dataindexer.TransactionsIndex, dataindexer.OperationsIndex, dataindexer.LogsIndex, dataindexer.EventsIndex}, index) + if index != dataindexer.EventsIndex { + require.True(t, strings.Contains(bodyStr, expectedHashes[0])) + require.True(t, strings.Contains(bodyStr, expectedHashes[1])) + called = true + } else { + require.Equal(t, + `{"query": {"bool": {"must": [{"match": {"shardID": {"query": 4294967295,"operator": "AND"}}},{"match": {"timestamp": {"query": "0","operator": "AND"}}}]}}}`, + body.String(), + ) + } + return nil }, } diff --git a/process/elasticproc/interface.go b/process/elasticproc/interface.go index e59357cd..997697b2 100644 --- a/process/elasticproc/interface.go +++ b/process/elasticproc/interface.go @@ -99,6 +99,7 @@ type DBLogsAndEventsHandler interface { numOfShards uint32, ) *data.PreparedLogsResults + SerializeEvents(events []*data.LogEvent, buffSlice *data.BufferSlice, index string) error SerializeLogs(logs []*data.Logs, buffSlice *data.BufferSlice, index string) error SerializeSCDeploys(deploysInfo map[string]*data.ScDeployInfo, buffSlice *data.BufferSlice, index string) error SerializeChangeOwnerOperations(changeOwnerOperations map[string]*data.OwnerData, buffSlice *data.BufferSlice, index string) error diff --git a/process/elasticproc/logsevents/logsAndEventsProcessor.go b/process/elasticproc/logsevents/logsAndEventsProcessor.go index 016b8d93..d3076699 100644 --- a/process/elasticproc/logsevents/logsAndEventsProcessor.go +++ b/process/elasticproc/logsevents/logsAndEventsProcessor.go @@ -1,6 +1,8 @@ package logsevents import ( + "encoding/hex" + "fmt" "time" "github.com/multiversx/mx-chain-core-go/core" @@ -14,6 +16,8 @@ import ( "github.com/multiversx/mx-chain-es-indexer-go/process/dataindexer" ) +const eventIDFormat = "%s-%d-%d" + // ArgsLogsAndEventsProcessor holds all dependencies required to create new instances of logsAndEventsProcessor type ArgsLogsAndEventsProcessor struct { PubKeyConverter core.PubkeyConverter @@ -112,6 +116,8 @@ func (lep *logsAndEventsProcessor) ExtractDataFromLogs( } } + dbLogs, dbEvents := lep.prepareLogsForDB(lgData, logsAndEvents, timestamp, shardID) + return &data.PreparedLogsResults{ Tokens: lgData.tokens, ScDeploys: lgData.scDeploys, @@ -122,7 +128,8 @@ func (lep *logsAndEventsProcessor) ExtractDataFromLogs( TokenRolesAndProperties: lgData.tokenRolesAndProperties, TxHashStatusInfo: lgData.txHashStatusInfoProc.getAllRecords(), ChangeOwnerOperations: lgData.changeOwnerOperations, - DBLogs: lep.prepareLogsForDB(lgData, logsAndEvents, timestamp), + DBLogs: dbLogs, + DBEvents: dbEvents, } } @@ -185,18 +192,23 @@ func (lep *logsAndEventsProcessor) prepareLogsForDB( lgData *logsData, logsAndEvents []*outport.LogData, timestamp uint64, -) []*data.Logs { + shardID uint32, +) ([]*data.Logs, []*data.LogEvent) { logs := make([]*data.Logs, 0, len(logsAndEvents)) + events := make([]*data.LogEvent, 0) for _, txLog := range logsAndEvents { if txLog == nil { continue } - logs = append(logs, lep.prepareLog(lgData, txLog.TxHash, txLog.Log, timestamp)) + dbLog, logEvents := lep.prepareLog(lgData, txLog.TxHash, txLog.Log, timestamp, shardID) + + logs = append(logs, dbLog) + events = append(events, logEvents...) } - return logs + return logs, events } func (lep *logsAndEventsProcessor) prepareLog( @@ -204,13 +216,9 @@ func (lep *logsAndEventsProcessor) prepareLog( logHashHex string, eventLogs *transaction.Log, timestamp uint64, -) *data.Logs { - originalTxHash := "" - scr, ok := lgData.scrsMap[logHashHex] - if ok { - originalTxHash = scr.OriginalTxHash - } - + shardID uint32, +) (*data.Logs, []*data.LogEvent) { + originalTxHash := lep.getOriginalTxHash(lgData, logHashHex) encodedAddr := lep.pubKeyConverter.SilentEncode(eventLogs.GetAddress(), log) logsDB := &data.Logs{ ID: logHashHex, @@ -220,22 +228,86 @@ func (lep *logsAndEventsProcessor) prepareLog( Events: make([]*data.Event, 0, len(eventLogs.Events)), } + dbEvents := make([]*data.LogEvent, 0, len(eventLogs.Events)) for idx, event := range eventLogs.Events { if check.IfNil(event) { continue } - encodedAddress := lep.pubKeyConverter.SilentEncode(event.GetAddress(), log) - - logsDB.Events = append(logsDB.Events, &data.Event{ - Address: encodedAddress, + logEvent := &data.Event{ + Address: lep.pubKeyConverter.SilentEncode(event.GetAddress(), log), Identifier: string(event.GetIdentifier()), Topics: event.GetTopics(), Data: event.GetData(), AdditionalData: event.GetAdditionalData(), Order: idx, - }) + } + logsDB.Events = append(logsDB.Events, logEvent) + + executionOrder := lep.getExecutionOrder(lgData, logHashHex) + dbEvents = append(dbEvents, lep.prepareLogEvent(logsDB, logEvent, shardID, executionOrder)) + } + + return logsDB, dbEvents +} + +func (lep *logsAndEventsProcessor) prepareLogEvent(dbLog *data.Logs, event *data.Event, shardID uint32, execOrder int) *data.LogEvent { + dbEvent := &data.LogEvent{ + TxHash: dbLog.ID, + LogAddress: dbLog.Address, + Address: event.Address, + Identifier: event.Identifier, + Data: hex.EncodeToString(event.Data), + AdditionalData: hexEncodeSlice(event.AdditionalData), + Topics: hexEncodeSlice(event.Topics), + Order: event.Order, + ShardID: shardID, + TxOrder: execOrder, + OriginalTxHash: dbLog.OriginalTxHash, + Timestamp: dbLog.Timestamp, + ID: fmt.Sprintf(eventIDFormat, dbLog.ID, shardID, event.Order), + } + + return dbEvent +} + +func (lep *logsAndEventsProcessor) getOriginalTxHash(lgData *logsData, logHashHex string) string { + if lgData.scrsMap == nil { + return "" + } + + scr, ok := lgData.scrsMap[logHashHex] + if ok { + return scr.OriginalTxHash + } + + return "" +} + +func (lep *logsAndEventsProcessor) getExecutionOrder(lgData *logsData, logHashHex string) int { + tx, ok := lgData.txsMap[logHashHex] + if ok { + return tx.ExecutionOrder + } + + scr, ok := lgData.scrsMap[logHashHex] + if ok { + return scr.ExecutionOrder + } + + log.Warn("cannot find hash in the txs map or scrs map", "hash", logHashHex) + + return -1 +} + +func hexEncodeSlice(input [][]byte) []string { + hexEncoded := make([]string, 0, len(input)) + for idx := 0; idx < len(input); idx++ { + hexEncoded = append(hexEncoded, hex.EncodeToString(input[idx])) + } + if len(hexEncoded) == 0 { + return nil } - return logsDB + return hexEncoded } diff --git a/process/elasticproc/logsevents/logsAndEventsProcessor_test.go b/process/elasticproc/logsevents/logsAndEventsProcessor_test.go index 8ad3b785..321e007b 100644 --- a/process/elasticproc/logsevents/logsAndEventsProcessor_test.go +++ b/process/elasticproc/logsevents/logsAndEventsProcessor_test.go @@ -295,3 +295,83 @@ func TestLogsAndEventsProcessor_ExtractDataFromLogsNFTBurn(t *testing.T) { require.Equal(t, "MY-NFT", tokensSupply[0].Token) require.Equal(t, "MY-NFT-02", tokensSupply[0].Identifier) } + +func TestPrepareLogsAndEvents_LogEvents(t *testing.T) { + t.Parallel() + + logsAndEvents := []*outport.LogData{ + { + TxHash: hex.EncodeToString([]byte("txHash")), + Log: &transaction.Log{ + Address: []byte("address"), + Events: []*transaction.Event{ + { + Address: []byte("addr"), + Identifier: []byte(core.BuiltInFunctionESDTNFTTransfer), + Topics: [][]byte{[]byte("my-token"), big.NewInt(0).SetUint64(1).Bytes(), []byte("receiver")}, + AdditionalData: [][]byte{[]byte("something")}, + }, + { + Address: []byte("addr"), + Identifier: []byte(core.SCDeployIdentifier), + Topics: [][]byte{[]byte("my-token"), big.NewInt(0).SetUint64(1).Bytes()}, + Data: []byte("here"), + AdditionalData: [][]byte{[]byte("something")}, + }, + }, + }, + }, + } + + args := createMockArgs() + proc, _ := NewLogsAndEventsProcessor(args) + + results := proc.ExtractDataFromLogs(logsAndEvents, &data.PreparedResults{ScResults: []*data.ScResult{ + { + Hash: "747848617368", + OriginalTxHash: "originalHash", + }, + }}, 1234, 1, 3) + + require.Equal(t, []*data.LogEvent{ + { + ID: "747848617368-1-0", + TxHash: "747848617368", + OriginalTxHash: "originalHash", + LogAddress: "61646472657373", + Address: "61646472", + Identifier: "ESDTNFTTransfer", + AdditionalData: []string{"736f6d657468696e67"}, + Topics: []string{"6d792d746f6b656e", "01", "7265636569766572"}, + Order: 0, + ShardID: 1, + Timestamp: 1234, + TxOrder: 0, + }, + { + ID: "747848617368-1-1", + TxHash: "747848617368", + OriginalTxHash: "originalHash", + LogAddress: "61646472657373", + Address: "61646472", + Identifier: "SCDeploy", + Data: "68657265", + AdditionalData: []string{"736f6d657468696e67"}, + Topics: []string{"6d792d746f6b656e", "01"}, + Order: 1, + ShardID: 1, + Timestamp: 1234, + TxOrder: 0, + }, + }, results.DBEvents) +} + +func TestHexEncodeSlice(t *testing.T) { + t.Parallel() + + require.Equal(t, []string(nil), hexEncodeSlice(nil)) + require.Equal(t, []string(nil), hexEncodeSlice([][]byte{})) + require.Equal(t, []string{"61", ""}, hexEncodeSlice([][]byte{[]byte("a"), nil})) + require.Equal(t, []string{""}, hexEncodeSlice([][]byte{big.NewInt(0).Bytes()})) + require.Equal(t, []string{"61", "62"}, hexEncodeSlice([][]byte{[]byte("a"), []byte("b")})) +} diff --git a/process/elasticproc/logsevents/serialize.go b/process/elasticproc/logsevents/serialize.go index 488b1282..a96a125a 100644 --- a/process/elasticproc/logsevents/serialize.go +++ b/process/elasticproc/logsevents/serialize.go @@ -10,6 +10,45 @@ import ( "github.com/multiversx/mx-chain-es-indexer-go/process/elasticproc/tokeninfo" ) +// SerializeEvents will serialize the provided events in a way that Elasticsearch expects a bulk request +func (*logsAndEventsProcessor) SerializeEvents(events []*data.LogEvent, buffSlice *data.BufferSlice, index string) error { + for _, event := range events { + meta := []byte(fmt.Sprintf(`{ "update" : { "_index":"%s", "_id" : "%s" } }%s`, index, converters.JsonEscape(event.ID), "\n")) + serializedData, errMarshal := json.Marshal(event) + if errMarshal != nil { + return errMarshal + } + + codeToExecute := ` + if ('create' == ctx.op) { + ctx._source = params.event + } else { + if (ctx._source.containsKey('timestamp')) { + if (ctx._source.timestamp <= params.event.timestamp) { + ctx._source = params.event + } + } else { + ctx._source = params.event + } + } +` + serializedDataStr := fmt.Sprintf(`{"scripted_upsert": true, "script": {`+ + `"source": "%s",`+ + `"lang": "painless",`+ + `"params": { "event": %s }},`+ + `"upsert": {}}`, + converters.FormatPainlessSource(codeToExecute), serializedData, + ) + + err := buffSlice.PutData(meta, []byte(serializedDataStr)) + if err != nil { + return err + } + } + + return nil +} + // SerializeLogs will serialize the provided logs in a way that Elasticsearch expects a bulk request func (*logsAndEventsProcessor) SerializeLogs(logs []*data.Logs, buffSlice *data.BufferSlice, index string) error { for _, lg := range logs { diff --git a/process/elasticproc/templatesAndPolicies/noKibana.go b/process/elasticproc/templatesAndPolicies/noKibana.go index aac49506..a6928858 100644 --- a/process/elasticproc/templatesAndPolicies/noKibana.go +++ b/process/elasticproc/templatesAndPolicies/noKibana.go @@ -41,6 +41,7 @@ func (tr *templatesAndPolicyReaderNoKibana) GetElasticTemplatesAndPolicies() (ma indexTemplates[indexer.OperationsIndex] = noKibana.Operations.ToBuffer() indexTemplates[indexer.ESDTsIndex] = noKibana.ESDTs.ToBuffer() indexTemplates[indexer.ValuesIndex] = noKibana.Values.ToBuffer() + indexTemplates[indexer.EventsIndex] = noKibana.Events.ToBuffer() return indexTemplates, indexPolicies, nil } diff --git a/process/elasticproc/templatesAndPolicies/noKibana_test.go b/process/elasticproc/templatesAndPolicies/noKibana_test.go index db0b12a0..7cbf22b9 100644 --- a/process/elasticproc/templatesAndPolicies/noKibana_test.go +++ b/process/elasticproc/templatesAndPolicies/noKibana_test.go @@ -14,5 +14,5 @@ func TestTemplatesAndPolicyReaderNoKibana_GetElasticTemplatesAndPolicies(t *test templates, policies, err := reader.GetElasticTemplatesAndPolicies() require.Nil(t, err) require.Len(t, policies, 0) - require.Len(t, templates, 22) + require.Len(t, templates, 23) } diff --git a/process/elasticproc/transactions/smartContractResultsProcessor.go b/process/elasticproc/transactions/smartContractResultsProcessor.go index e5f6a72e..2fcb7ccf 100644 --- a/process/elasticproc/transactions/smartContractResultsProcessor.go +++ b/process/elasticproc/transactions/smartContractResultsProcessor.go @@ -189,6 +189,7 @@ func (proc *smartContractResultsProcessor) prepareSmartContractResult( OriginalSender: originalSenderAddr, InitialTxFee: feeInfo.Fee.String(), InitialTxGasUsed: feeInfo.GasUsed, + ExecutionOrder: int(scrInfo.ExecutionOrder), } } diff --git a/process/elasticproc/transactions/transactionDBBuilder.go b/process/elasticproc/transactions/transactionDBBuilder.go index 727563ce..0b497ef7 100644 --- a/process/elasticproc/transactions/transactionDBBuilder.go +++ b/process/elasticproc/transactions/transactionDBBuilder.go @@ -11,7 +11,6 @@ import ( "github.com/multiversx/mx-chain-core-go/data/block" "github.com/multiversx/mx-chain-core-go/data/outport" "github.com/multiversx/mx-chain-core-go/data/receipt" - "github.com/multiversx/mx-chain-core-go/data/rewardTx" "github.com/multiversx/mx-chain-es-indexer-go/data" "github.com/multiversx/mx-chain-es-indexer-go/process/dataindexer" "github.com/multiversx/mx-chain-es-indexer-go/process/elasticproc/converters" @@ -122,17 +121,19 @@ func (dtb *dbTransactionBuilder) prepareTransaction( Version: tx.Version, GuardianAddress: guardianAddress, GuardianSignature: hex.EncodeToString(tx.GuardianSignature), + ExecutionOrder: int(txInfo.ExecutionOrder), } } func (dtb *dbTransactionBuilder) prepareRewardTransaction( - rTx *rewardTx.RewardTx, + rTxInfo *outport.RewardInfo, txHash []byte, mbHash []byte, mb *block.MiniBlock, header coreData.HeaderHandler, txStatus string, ) *data.Transaction { + rTx := rTxInfo.Reward valueNum, err := dtb.balanceConverter.ConvertBigValueToFloat(rTx.Value) if err != nil { log.Warn("dbTransactionBuilder.prepareRewardTransaction cannot compute value as num", "value", rTx.Value, @@ -142,23 +143,24 @@ func (dtb *dbTransactionBuilder) prepareRewardTransaction( receiverAddr := dtb.addressPubkeyConverter.SilentEncode(rTx.RcvAddr, log) return &data.Transaction{ - Hash: hex.EncodeToString(txHash), - MBHash: hex.EncodeToString(mbHash), - Nonce: 0, - Round: rTx.Round, - Value: rTx.Value.String(), - ValueNum: valueNum, - Receiver: receiverAddr, - Sender: fmt.Sprintf("%d", core.MetachainShardId), - ReceiverShard: mb.ReceiverShardID, - SenderShard: mb.SenderShardID, - GasPrice: 0, - GasLimit: 0, - Data: make([]byte, 0), - Signature: "", - Timestamp: time.Duration(header.GetTimeStamp()), - Status: txStatus, - Operation: rewardsOperation, + Hash: hex.EncodeToString(txHash), + MBHash: hex.EncodeToString(mbHash), + Nonce: 0, + Round: rTx.Round, + Value: rTx.Value.String(), + ValueNum: valueNum, + Receiver: receiverAddr, + Sender: fmt.Sprintf("%d", core.MetachainShardId), + ReceiverShard: mb.ReceiverShardID, + SenderShard: mb.SenderShardID, + GasPrice: 0, + GasLimit: 0, + Data: make([]byte, 0), + Signature: "", + Timestamp: time.Duration(header.GetTimeStamp()), + Status: txStatus, + Operation: rewardsOperation, + ExecutionOrder: int(rTxInfo.ExecutionOrder), } } diff --git a/process/elasticproc/transactions/transactionDBBuilder_test.go b/process/elasticproc/transactions/transactionDBBuilder_test.go index 5b3d8759..ceeca3de 100644 --- a/process/elasticproc/transactions/transactionDBBuilder_test.go +++ b/process/elasticproc/transactions/transactionDBBuilder_test.go @@ -116,7 +116,10 @@ func TestGetTransactionByType_RewardTx(t *testing.T) { header := &block.Header{Nonce: 2} status := "Success" - resultTx := cp.prepareRewardTransaction(rwdTx, txHash, mbHash, mb, header, status) + rewardInfo := &outport.RewardInfo{ + Reward: rwdTx, + } + resultTx := cp.prepareRewardTransaction(rewardInfo, txHash, mbHash, mb, header, status) expectedTx := &data.Transaction{ Hash: hex.EncodeToString(txHash), MBHash: hex.EncodeToString(mbHash), diff --git a/process/elasticproc/transactions/transactionsGrouper.go b/process/elasticproc/transactions/transactionsGrouper.go index c6073914..5aaf15a4 100644 --- a/process/elasticproc/transactions/transactionsGrouper.go +++ b/process/elasticproc/transactions/transactionsGrouper.go @@ -151,7 +151,7 @@ func (tg *txsGrouper) prepareRewardTxForDB( return nil, false } - dbTx := tg.txBuilder.prepareRewardTransaction(rtx.Reward, txHash, mbHash, mb, header, mbStatus) + dbTx := tg.txBuilder.prepareRewardTransaction(rtx, txHash, mbHash, mb, header, mbStatus) return dbTx, true } diff --git a/scripts/script.sh b/scripts/script.sh index 898fc6d3..11a1bdb4 100755 --- a/scripts/script.sh +++ b/scripts/script.sh @@ -4,7 +4,7 @@ PROMETHEUS_CONTAINER_NAME=prometheus_container GRAFANA_CONTAINER_NAME=grafana_container GRAFANA_VERSION=10.0.3 PROMETHEUS_VERSION=v2.46.0 -INDICES_LIST=("rating" "transactions" "blocks" "validators" "miniblocks" "rounds" "accounts" "accountshistory" "receipts" "scresults" "accountsesdt" "accountsesdthistory" "epochinfo" "scdeploys" "tokens" "tags" "logs" "delegators" "operations" "esdts" "values") +INDICES_LIST=("rating" "transactions" "blocks" "validators" "miniblocks" "rounds" "accounts" "accountshistory" "receipts" "scresults" "accountsesdt" "accountsesdthistory" "epochinfo" "scdeploys" "tokens" "tags" "logs" "delegators" "operations" "esdts" "values" "events") start() { diff --git a/templates/noKibana/events.go b/templates/noKibana/events.go new file mode 100644 index 00000000..bbdce60e --- /dev/null +++ b/templates/noKibana/events.go @@ -0,0 +1,54 @@ +package noKibana + +// Events will hold the configuration for the events index +var Events = Object{ + "index_patterns": Array{ + "events-*", + }, + "settings": Object{ + "number_of_shards": 5, + "number_of_replicas": 0, + }, + "mappings": Object{ + "properties": Object{ + "txHash": Object{ + "type": "keyword", + }, + "originalTxHash": Object{ + "type": "keyword", + }, + "logAddress": Object{ + "type": "keyword", + }, + "address": Object{ + "type": "keyword", + }, + "identifier": Object{ + "type": "keyword", + }, + "shardID": Object{ + "type": "long", + }, + "data": Object{ + "index": "false", + "type": "text", + }, + "additionalData": Object{ + "type": "text", + }, + "topics": Object{ + "type": "text", + }, + "order": Object{ + "type": "long", + }, + "txOrder": Object{ + "type": "long", + }, + "timestamp": Object{ + "type": "date", + "format": "epoch_second", + }, + }, + }, +} diff --git a/tools/clusters-checker/pkg/checkers/process_indices_with_timestamp.go b/tools/clusters-checker/pkg/checkers/process_indices_with_timestamp.go index b925759a..08eb98a7 100644 --- a/tools/clusters-checker/pkg/checkers/process_indices_with_timestamp.go +++ b/tools/clusters-checker/pkg/checkers/process_indices_with_timestamp.go @@ -42,9 +42,12 @@ func (cc *clusterChecker) CompareIndicesWithTimestamp() error { func (cc *clusterChecker) compareIndexWithTimestamp(index string) error { rspSource := &generalElasticResponse{} + + withSource := !cc.onlyIDs + nextScrollIDSource, doneSource, err := cc.clientSource.InitializeScroll( index, - getAllSortTimestampASC(true, cc.startTimestamp, cc.stopTimestamp), + getAllSortTimestampASC(withSource, cc.startTimestamp, cc.stopTimestamp), rspSource, ) if err != nil { @@ -54,7 +57,7 @@ func (cc *clusterChecker) compareIndexWithTimestamp(index string) error { rspDestination := &generalElasticResponse{} nextScrollIDDestination, doneDestination, err := cc.clientDestination.InitializeScroll( index, - getAllSortTimestampASC(true, cc.startTimestamp, cc.stopTimestamp), + getAllSortTimestampASC(withSource, cc.startTimestamp, cc.stopTimestamp), rspDestination, ) if err != nil { diff --git a/tools/clusters-checker/pkg/checkers/query.go b/tools/clusters-checker/pkg/checkers/query.go index bf090db0..de07ed0d 100644 --- a/tools/clusters-checker/pkg/checkers/query.go +++ b/tools/clusters-checker/pkg/checkers/query.go @@ -36,8 +36,8 @@ func getAllSortTimestampASC(withSource bool, start, stop int) []byte { "query": object{ "range": object{ "timestamp": object{ - "gte": start, - "lte": stop, + "gte": fmt.Sprintf("%d", start), + "lte": fmt.Sprintf("%d", stop), }, }, }, @@ -45,7 +45,7 @@ func getAllSortTimestampASC(withSource bool, start, stop int) []byte { "sort": []interface{}{ object{ "timestamp": object{ - "order": "asc", + "order": "desc", }, }, },