diff --git a/data/event.go b/data/event.go index 8db4558f..f3e22668 100644 --- a/data/event.go +++ b/data/event.go @@ -14,6 +14,7 @@ type LogEvent struct { AdditionalData []string `json:"additionalData,omitempty"` Topics []string `json:"topics"` Order int `json:"order"` + TxOrder int `json:"txOrder"` ShardID uint32 `json:"shardID"` Timestamp time.Duration `json:"timestamp,omitempty"` } diff --git a/data/scresult.go b/data/scresult.go index a2f595ce..b246528d 100644 --- a/data/scresult.go +++ b/data/scresult.go @@ -39,6 +39,7 @@ type ScResult struct { CanBeIgnored bool `json:"canBeIgnored,omitempty"` OriginalSender string `json:"originalSender,omitempty"` HasLogs bool `json:"hasLogs,omitempty"` + ExecutionOrder int `json:"-"` SenderAddressBytes []byte `json:"-"` InitialTxGasUsed uint64 `json:"-"` InitialTxFee string `json:"-"` diff --git a/data/transaction.go b/data/transaction.go index 04b7364f..d5cefd56 100644 --- a/data/transaction.go +++ b/data/transaction.go @@ -5,8 +5,8 @@ import ( ) // Transaction is a structure containing all the fields that need -// to be saved for a transaction. It has all the default fields -// plus some extra information for ease of search and filter +// to be saved for a transaction. It has all the default fields +// plus some extra information for ease of search and filter type Transaction struct { MBHash string `json:"miniBlockHash"` Nonce uint64 `json:"nonce"` @@ -48,6 +48,7 @@ type Transaction struct { GuardianSignature string `json:"guardianSignature,omitempty"` ErrorEvent bool `json:"errorEvent,omitempty"` CompletedEvent bool `json:"completedEvent,omitempty"` + ExecutionOrder int `json:"-"` SmartContractResults []*ScResult `json:"-"` Hash string `json:"-"` BlockHash string `json:"-"` diff --git a/docker-compose.yml b/docker-compose.yml index 7b03b074..970f0b40 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,4 +1,3 @@ -version: "3.0" services: elasticsearch: container_name: es-container diff --git a/integrationtests/logsCrossShard_test.go b/integrationtests/logsCrossShard_test.go index ab5aab17..7c96688f 100644 --- a/integrationtests/logsCrossShard_test.go +++ b/integrationtests/logsCrossShard_test.go @@ -30,13 +30,21 @@ func TestIndexLogSourceShardAndAfterDestinationAndAgainSource(t *testing.T) { Round: 50, TimeStamp: 5040, } - body := &dataBlock.Body{} + + txHash := []byte("cross-log") + logID := hex.EncodeToString(txHash) + + body := &dataBlock.Body{ + MiniBlocks: []*dataBlock.MiniBlock{ + { + TxHashes: [][]byte{txHash}, + }, + }, + } address1 := "erd1ju8pkvg57cwdmjsjx58jlmnuf4l9yspstrhr9tgsrt98n9edpm2qtlgy99" address2 := "erd1w7jyzuj6cv4ngw8luhlkakatjpmjh3ql95lmxphd3vssc4vpymks6k5th7" - logID := hex.EncodeToString([]byte("cross-log")) - // index on source pool := &outport.TransactionPool{ Logs: []*outport.LogData{ @@ -55,6 +63,12 @@ func TestIndexLogSourceShardAndAfterDestinationAndAgainSource(t *testing.T) { }, }, }, + Transactions: map[string]*outport.TxInfo{ + logID: { + Transaction: &transaction.Transaction{}, + ExecutionOrder: 0, + }, + }, } err = esProc.SaveTransactions(createOutportBlockWithHeader(body, header, pool, map[string]*alteredAccount.AlteredAccount{}, testNumOfShards)) require.Nil(t, err) @@ -68,7 +82,7 @@ func TestIndexLogSourceShardAndAfterDestinationAndAgainSource(t *testing.T) { string(genericResponse.Docs[0].Source), ) - event1ID := "75dcc2d7542c8a8be1006dd2d0f8e847c00cea5e55b6b8a53e0a5483e73f4431" + event1ID := logID + "-0-0" ids = []string{event1ID} err = esClient.DoMultiGet(context.Background(), ids, indexerdata.EventsIndex, true, genericResponse) require.Nil(t, err) @@ -106,6 +120,12 @@ func TestIndexLogSourceShardAndAfterDestinationAndAgainSource(t *testing.T) { }, }, }, + Transactions: map[string]*outport.TxInfo{ + logID: { + Transaction: &transaction.Transaction{}, + ExecutionOrder: 0, + }, + }, } err = esProc.SaveTransactions(createOutportBlockWithHeader(body, header, pool, map[string]*alteredAccount.AlteredAccount{}, testNumOfShards)) require.Nil(t, err) @@ -118,7 +138,7 @@ func TestIndexLogSourceShardAndAfterDestinationAndAgainSource(t *testing.T) { string(genericResponse.Docs[0].Source), ) - event2ID, event3ID := "c7d0e7abaaf188655537da1ed642b151182aa64bbe3fed316198208bf089713a", "3a6f93093be7b045938a2a03e45a059af602331602f63a45e5aec3866d3df126" + event2ID, event3ID := logID+"-1-0", logID+"-1-1" ids = []string{event2ID, event3ID} err = esClient.DoMultiGet(context.Background(), ids, indexerdata.EventsIndex, true, genericResponse) require.Nil(t, err) @@ -153,6 +173,12 @@ func TestIndexLogSourceShardAndAfterDestinationAndAgainSource(t *testing.T) { }, }, }, + Transactions: map[string]*outport.TxInfo{ + logID: { + Transaction: &transaction.Transaction{}, + ExecutionOrder: 0, + }, + }, } err = esProc.SaveTransactions(createOutportBlockWithHeader(body, header, pool, map[string]*alteredAccount.AlteredAccount{}, testNumOfShards)) require.Nil(t, err) diff --git a/integrationtests/testdata/logsCrossShard/event-do-something.json b/integrationtests/testdata/logsCrossShard/event-do-something.json index 648ad501..2490eea4 100644 --- a/integrationtests/testdata/logsCrossShard/event-do-something.json +++ b/integrationtests/testdata/logsCrossShard/event-do-something.json @@ -9,5 +9,6 @@ "shardID": 1, "txHash": "63726f73732d6c6f67", "order": 1, - "timestamp": 6040 + "timestamp": 6040, + "txOrder": 0 } diff --git a/integrationtests/testdata/logsCrossShard/event-transfer-destination.json b/integrationtests/testdata/logsCrossShard/event-transfer-destination.json index 5d4bf49d..c373f9be 100644 --- a/integrationtests/testdata/logsCrossShard/event-transfer-destination.json +++ b/integrationtests/testdata/logsCrossShard/event-transfer-destination.json @@ -10,5 +10,6 @@ "shardID": 1, "txHash": "63726f73732d6c6f67", "order": 0, - "timestamp": 6040 + "timestamp": 6040, + "txOrder": 0 } diff --git a/integrationtests/testdata/logsCrossShard/event-transfer-source-first.json b/integrationtests/testdata/logsCrossShard/event-transfer-source-first.json index d72c8daa..d085f338 100644 --- a/integrationtests/testdata/logsCrossShard/event-transfer-source-first.json +++ b/integrationtests/testdata/logsCrossShard/event-transfer-source-first.json @@ -8,6 +8,7 @@ "01" ], "shardID": 0, + "txOrder": 0, "txHash": "63726f73732d6c6f67", "order": 0, "timestamp": 5040 diff --git a/process/elasticproc/converters/field.go b/process/elasticproc/converters/field.go index d96d3571..27b6be81 100644 --- a/process/elasticproc/converters/field.go +++ b/process/elasticproc/converters/field.go @@ -21,7 +21,7 @@ func TruncateFieldIfExceedsMaxLengthBase64(field string) string { return field } -//TruncateSliceElementsIfExceedsMaxLength will truncate the provided slice of the field if the max length is exceeded +// TruncateSliceElementsIfExceedsMaxLength will truncate the provided slice of the field if the max length is exceeded func TruncateSliceElementsIfExceedsMaxLength(fields []string) []string { var localFields []string for _, field := range fields { diff --git a/process/elasticproc/elasticProcessor.go b/process/elasticproc/elasticProcessor.go index 6e43c30b..9f129a29 100644 --- a/process/elasticproc/elasticProcessor.go +++ b/process/elasticproc/elasticProcessor.go @@ -518,18 +518,25 @@ func (ei *elasticProcessor) indexTransactionsFeeData(txsHashFeeData map[string]* } func (ei *elasticProcessor) prepareAndIndexLogs(logsAndEvents []*outport.LogData, timestamp uint64, buffSlice *data.BufferSlice, shardID uint32) error { + logsDB, eventsDB := ei.logsAndEventsProc.PrepareLogsForDB(logsAndEvents, timestamp, shardID) + err := ei.indexEvents(eventsDB, buffSlice) + if err != nil { + return err + } + if !ei.isIndexEnabled(elasticIndexer.LogsIndex) { return nil } - logsDB, eventsDB := ei.logsAndEventsProc.PrepareLogsForDB(logsAndEvents, timestamp, shardID) + return ei.logsAndEventsProc.SerializeLogs(logsDB, buffSlice, elasticIndexer.LogsIndex) +} - err := ei.logsAndEventsProc.SerializeEvents(eventsDB, buffSlice, elasticIndexer.EventsIndex) - if err != nil { - return err +func (ei *elasticProcessor) indexEvents(eventsDB []*data.LogEvent, buffSlice *data.BufferSlice) error { + if !ei.isIndexEnabled(elasticIndexer.EventsIndex) { + return nil } - return ei.logsAndEventsProc.SerializeLogs(logsDB, buffSlice, elasticIndexer.LogsIndex) + return ei.logsAndEventsProc.SerializeEvents(eventsDB, buffSlice, elasticIndexer.EventsIndex) } func (ei *elasticProcessor) indexScDeploys(deployData map[string]*data.ScDeployInfo, changeOwnerOperation map[string]*data.OwnerData, buffSlice *data.BufferSlice) error { diff --git a/process/elasticproc/logsevents/logsAndEventsProcessor.go b/process/elasticproc/logsevents/logsAndEventsProcessor.go index 8d358ef5..ee4ab074 100644 --- a/process/elasticproc/logsevents/logsAndEventsProcessor.go +++ b/process/elasticproc/logsevents/logsAndEventsProcessor.go @@ -2,7 +2,7 @@ package logsevents import ( "encoding/hex" - "encoding/json" + "fmt" "time" "github.com/multiversx/mx-chain-core-go/core" @@ -16,6 +16,8 @@ import ( "github.com/multiversx/mx-chain-es-indexer-go/process/dataindexer" ) +const eventIDFormat = "%s-%d-%d" + // ArgsLogsAndEventsProcessor holds all dependencies required to create new instances of logsAndEventsProcessor type ArgsLogsAndEventsProcessor struct { PubKeyConverter core.PubkeyConverter @@ -46,7 +48,6 @@ func NewLogsAndEventsProcessor(args ArgsLogsAndEventsProcessor) (*logsAndEventsP pubKeyConverter: args.PubKeyConverter, eventsProcessors: eventsProcessors, hasher: args.Hasher, - marshaller: args.Marshalizer, }, nil } @@ -243,18 +244,14 @@ func (lep *logsAndEventsProcessor) prepareLogsForDB( } logsDB.Events = append(logsDB.Events, logEvent) - dbEvent, ok := lep.prepareLogEvent(logsDB, logEvent, shardID) - if !ok { - continue - } - - dbEvents = append(dbEvents, dbEvent) + executionOrder := lep.getExecutionOrder(logHashHex) + dbEvents = append(dbEvents, lep.prepareLogEvent(logsDB, logEvent, shardID, executionOrder)) } return logsDB, dbEvents } -func (lep *logsAndEventsProcessor) prepareLogEvent(dbLog *data.Logs, event *data.Event, shardID uint32) (*data.LogEvent, bool) { +func (lep *logsAndEventsProcessor) prepareLogEvent(dbLog *data.Logs, event *data.Event, shardID uint32, execOrder int) *data.LogEvent { dbEvent := &data.LogEvent{ TxHash: dbLog.ID, LogAddress: dbLog.Address, @@ -265,22 +262,13 @@ func (lep *logsAndEventsProcessor) prepareLogEvent(dbLog *data.Logs, event *data Topics: hexEncodeSlice(event.Topics), Order: event.Order, ShardID: shardID, + TxOrder: execOrder, + OriginalTxHash: dbLog.OriginalTxHash, + Timestamp: dbLog.Timestamp, + ID: fmt.Sprintf(eventIDFormat, dbLog.ID, shardID, event.Order), } - dbEventBytes, err := json.Marshal(dbEvent) - if err != nil { - log.Warn("cannot marshal event", - "txHash", dbLog.ID, - "order", event.Order, - "error", err, - ) - } - - dbEvent.OriginalTxHash = dbLog.OriginalTxHash - dbEvent.Timestamp = dbLog.Timestamp - dbEvent.ID = hex.EncodeToString(lep.hasher.Compute(string(dbEventBytes))) - - return dbEvent, true + return dbEvent } func (lep *logsAndEventsProcessor) getOriginalTxHash(logHashHex string) string { @@ -296,6 +284,22 @@ func (lep *logsAndEventsProcessor) getOriginalTxHash(logHashHex string) string { return "" } +func (lep *logsAndEventsProcessor) getExecutionOrder(logHashHex string) int { + tx, ok := lep.logsData.txsMap[logHashHex] + if ok { + return tx.ExecutionOrder + } + + scr, ok := lep.logsData.scrsMap[logHashHex] + if ok { + return scr.ExecutionOrder + } + + log.Warn("cannot find hash in the txs map or scrs map", "hash", logHashHex) + + return -1 +} + func hexEncodeSlice(input [][]byte) []string { hexEncoded := make([]string, 0, len(input)) for idx := 0; idx < len(input); idx++ { diff --git a/process/elasticproc/logsevents/logsAndEventsProcessor_test.go b/process/elasticproc/logsevents/logsAndEventsProcessor_test.go index 45551529..bea30c2e 100644 --- a/process/elasticproc/logsevents/logsAndEventsProcessor_test.go +++ b/process/elasticproc/logsevents/logsAndEventsProcessor_test.go @@ -338,7 +338,7 @@ func TestPrepareLogsAndEvents_LogEvents(t *testing.T) { _, eventsDB := proc.PrepareLogsForDB(logsAndEvents, 1234, 1) require.Equal(t, []*data.LogEvent{ { - ID: "df358a19d2ed48f29c7fdba5132da589176cc6bb698cd8ee084b4efd6e8a86b3", + ID: "747848617368-1-0", TxHash: "747848617368", OriginalTxHash: "originalHash", LogAddress: "61646472657373", @@ -349,9 +349,10 @@ func TestPrepareLogsAndEvents_LogEvents(t *testing.T) { Order: 0, ShardID: 1, Timestamp: 1234, + TxOrder: 0, }, { - ID: "cd4f37eff9d15471034bbaf0886fcf62fa00eecf59410be9bdd2be8d36bab42a", + ID: "747848617368-1-1", TxHash: "747848617368", OriginalTxHash: "originalHash", LogAddress: "61646472657373", @@ -363,6 +364,7 @@ func TestPrepareLogsAndEvents_LogEvents(t *testing.T) { Order: 1, ShardID: 1, Timestamp: 1234, + TxOrder: 0, }, }, eventsDB) } diff --git a/process/elasticproc/transactions/smartContractResultsProcessor.go b/process/elasticproc/transactions/smartContractResultsProcessor.go index e5f6a72e..2fcb7ccf 100644 --- a/process/elasticproc/transactions/smartContractResultsProcessor.go +++ b/process/elasticproc/transactions/smartContractResultsProcessor.go @@ -189,6 +189,7 @@ func (proc *smartContractResultsProcessor) prepareSmartContractResult( OriginalSender: originalSenderAddr, InitialTxFee: feeInfo.Fee.String(), InitialTxGasUsed: feeInfo.GasUsed, + ExecutionOrder: int(scrInfo.ExecutionOrder), } } diff --git a/process/elasticproc/transactions/transactionDBBuilder.go b/process/elasticproc/transactions/transactionDBBuilder.go index 727563ce..0b497ef7 100644 --- a/process/elasticproc/transactions/transactionDBBuilder.go +++ b/process/elasticproc/transactions/transactionDBBuilder.go @@ -11,7 +11,6 @@ import ( "github.com/multiversx/mx-chain-core-go/data/block" "github.com/multiversx/mx-chain-core-go/data/outport" "github.com/multiversx/mx-chain-core-go/data/receipt" - "github.com/multiversx/mx-chain-core-go/data/rewardTx" "github.com/multiversx/mx-chain-es-indexer-go/data" "github.com/multiversx/mx-chain-es-indexer-go/process/dataindexer" "github.com/multiversx/mx-chain-es-indexer-go/process/elasticproc/converters" @@ -122,17 +121,19 @@ func (dtb *dbTransactionBuilder) prepareTransaction( Version: tx.Version, GuardianAddress: guardianAddress, GuardianSignature: hex.EncodeToString(tx.GuardianSignature), + ExecutionOrder: int(txInfo.ExecutionOrder), } } func (dtb *dbTransactionBuilder) prepareRewardTransaction( - rTx *rewardTx.RewardTx, + rTxInfo *outport.RewardInfo, txHash []byte, mbHash []byte, mb *block.MiniBlock, header coreData.HeaderHandler, txStatus string, ) *data.Transaction { + rTx := rTxInfo.Reward valueNum, err := dtb.balanceConverter.ConvertBigValueToFloat(rTx.Value) if err != nil { log.Warn("dbTransactionBuilder.prepareRewardTransaction cannot compute value as num", "value", rTx.Value, @@ -142,23 +143,24 @@ func (dtb *dbTransactionBuilder) prepareRewardTransaction( receiverAddr := dtb.addressPubkeyConverter.SilentEncode(rTx.RcvAddr, log) return &data.Transaction{ - Hash: hex.EncodeToString(txHash), - MBHash: hex.EncodeToString(mbHash), - Nonce: 0, - Round: rTx.Round, - Value: rTx.Value.String(), - ValueNum: valueNum, - Receiver: receiverAddr, - Sender: fmt.Sprintf("%d", core.MetachainShardId), - ReceiverShard: mb.ReceiverShardID, - SenderShard: mb.SenderShardID, - GasPrice: 0, - GasLimit: 0, - Data: make([]byte, 0), - Signature: "", - Timestamp: time.Duration(header.GetTimeStamp()), - Status: txStatus, - Operation: rewardsOperation, + Hash: hex.EncodeToString(txHash), + MBHash: hex.EncodeToString(mbHash), + Nonce: 0, + Round: rTx.Round, + Value: rTx.Value.String(), + ValueNum: valueNum, + Receiver: receiverAddr, + Sender: fmt.Sprintf("%d", core.MetachainShardId), + ReceiverShard: mb.ReceiverShardID, + SenderShard: mb.SenderShardID, + GasPrice: 0, + GasLimit: 0, + Data: make([]byte, 0), + Signature: "", + Timestamp: time.Duration(header.GetTimeStamp()), + Status: txStatus, + Operation: rewardsOperation, + ExecutionOrder: int(rTxInfo.ExecutionOrder), } } diff --git a/process/elasticproc/transactions/transactionDBBuilder_test.go b/process/elasticproc/transactions/transactionDBBuilder_test.go index 5b3d8759..ceeca3de 100644 --- a/process/elasticproc/transactions/transactionDBBuilder_test.go +++ b/process/elasticproc/transactions/transactionDBBuilder_test.go @@ -116,7 +116,10 @@ func TestGetTransactionByType_RewardTx(t *testing.T) { header := &block.Header{Nonce: 2} status := "Success" - resultTx := cp.prepareRewardTransaction(rwdTx, txHash, mbHash, mb, header, status) + rewardInfo := &outport.RewardInfo{ + Reward: rwdTx, + } + resultTx := cp.prepareRewardTransaction(rewardInfo, txHash, mbHash, mb, header, status) expectedTx := &data.Transaction{ Hash: hex.EncodeToString(txHash), MBHash: hex.EncodeToString(mbHash), diff --git a/process/elasticproc/transactions/transactionsGrouper.go b/process/elasticproc/transactions/transactionsGrouper.go index c6073914..5aaf15a4 100644 --- a/process/elasticproc/transactions/transactionsGrouper.go +++ b/process/elasticproc/transactions/transactionsGrouper.go @@ -151,7 +151,7 @@ func (tg *txsGrouper) prepareRewardTxForDB( return nil, false } - dbTx := tg.txBuilder.prepareRewardTransaction(rtx.Reward, txHash, mbHash, mb, header, mbStatus) + dbTx := tg.txBuilder.prepareRewardTransaction(rtx, txHash, mbHash, mb, header, mbStatus) return dbTx, true } diff --git a/templates/noKibana/events.go b/templates/noKibana/events.go index 6fd4789d..bbdce60e 100644 --- a/templates/noKibana/events.go +++ b/templates/noKibana/events.go @@ -6,7 +6,7 @@ var Events = Object{ "events-*", }, "settings": Object{ - "number_of_shards": 3, + "number_of_shards": 5, "number_of_replicas": 0, }, "mappings": Object{ @@ -42,6 +42,9 @@ var Events = Object{ "order": Object{ "type": "long", }, + "txOrder": Object{ + "type": "long", + }, "timestamp": Object{ "type": "date", "format": "epoch_second",