From 0e909a4b922a11dee612760c942403c95ef2605a Mon Sep 17 00:00:00 2001 From: miiu Date: Tue, 26 Mar 2024 15:41:59 +0200 Subject: [PATCH 1/4] stateless log processor --- data/logs.go | 1 + process/elasticproc/elasticProcessor.go | 6 +- process/elasticproc/interface.go | 1 - .../logsevents/logsAndEventsProcessor.go | 75 +++++++++---------- .../logsevents/logsAndEventsProcessor_test.go | 5 +- 5 files changed, 42 insertions(+), 46 deletions(-) diff --git a/data/logs.go b/data/logs.go index d27fd466..1b568fe7 100644 --- a/data/logs.go +++ b/data/logs.go @@ -37,4 +37,5 @@ type PreparedLogsResults struct { TokensInfo []*TokenInfo NFTsDataUpdates []*NFTDataUpdate TokenRolesAndProperties *tokeninfo.TokenRolesAndProperties + DBLogs []*Logs } diff --git a/process/elasticproc/elasticProcessor.go b/process/elasticproc/elasticProcessor.go index c38fad5a..7306abf6 100644 --- a/process/elasticproc/elasticProcessor.go +++ b/process/elasticproc/elasticProcessor.go @@ -424,7 +424,7 @@ func (ei *elasticProcessor) SaveTransactions(obh *outport.OutportBlockWithHeader return err } - err = ei.prepareAndIndexLogs(obh.TransactionPool.Logs, headerTimestamp, buffers) + err = ei.prepareAndIndexLogs(logsData.DBLogs, buffers) if err != nil { return err } @@ -511,13 +511,11 @@ func (ei *elasticProcessor) indexTransactionsFeeData(txsHashFeeData map[string]* return ei.transactionsProc.SerializeTransactionsFeeData(txsHashFeeData, buffSlice, elasticIndexer.OperationsIndex) } -func (ei *elasticProcessor) prepareAndIndexLogs(logsAndEvents []*outport.LogData, timestamp uint64, buffSlice *data.BufferSlice) error { +func (ei *elasticProcessor) prepareAndIndexLogs(logsDB []*data.Logs, buffSlice *data.BufferSlice) error { if !ei.isIndexEnabled(elasticIndexer.LogsIndex) { return nil } - logsDB := ei.logsAndEventsProc.PrepareLogsForDB(logsAndEvents, timestamp) - return ei.logsAndEventsProc.SerializeLogs(logsDB, buffSlice, elasticIndexer.LogsIndex) } diff --git a/process/elasticproc/interface.go b/process/elasticproc/interface.go index c059e6cb..e59357cd 100644 --- a/process/elasticproc/interface.go +++ b/process/elasticproc/interface.go @@ -91,7 +91,6 @@ type DBValidatorsHandler interface { // DBLogsAndEventsHandler defines the actions that a logs and events handler should do type DBLogsAndEventsHandler interface { - PrepareLogsForDB(logsAndEvents []*outport.LogData, timestamp uint64) []*data.Logs ExtractDataFromLogs( logsAndEvents []*outport.LogData, preparedResults *data.PreparedResults, diff --git a/process/elasticproc/logsevents/logsAndEventsProcessor.go b/process/elasticproc/logsevents/logsAndEventsProcessor.go index f633de3c..016b8d93 100644 --- a/process/elasticproc/logsevents/logsAndEventsProcessor.go +++ b/process/elasticproc/logsevents/logsAndEventsProcessor.go @@ -26,8 +26,6 @@ type logsAndEventsProcessor struct { hasher hashing.Hasher pubKeyConverter core.PubkeyConverter eventsProcessors []eventsProcessor - - logsData *logsData } // NewLogsAndEventsProcessor will create a new instance for the logsAndEventsProcessor @@ -93,22 +91,21 @@ func (lep *logsAndEventsProcessor) ExtractDataFromLogs( shardID uint32, numOfShards uint32, ) *data.PreparedLogsResults { - lep.logsData = newLogsData(timestamp, preparedResults.Transactions, preparedResults.ScResults) - + lgData := newLogsData(timestamp, preparedResults.Transactions, preparedResults.ScResults) for _, txLog := range logsAndEvents { if txLog == nil { continue } events := txLog.Log.Events - lep.processEvents(txLog.TxHash, txLog.Log.Address, events, shardID, numOfShards) + lep.processEvents(lgData, txLog.TxHash, txLog.Log.Address, events, shardID, numOfShards) - tx, ok := lep.logsData.txsMap[txLog.TxHash] + tx, ok := lgData.txsMap[txLog.TxHash] if ok { tx.HasLogs = true continue } - scr, ok := lep.logsData.scrsMap[txLog.TxHash] + scr, ok := lgData.scrsMap[txLog.TxHash] if ok { scr.HasLogs = true continue @@ -116,62 +113,63 @@ func (lep *logsAndEventsProcessor) ExtractDataFromLogs( } return &data.PreparedLogsResults{ - Tokens: lep.logsData.tokens, - ScDeploys: lep.logsData.scDeploys, - TokensInfo: lep.logsData.tokensInfo, - TokensSupply: lep.logsData.tokensSupply, - Delegators: lep.logsData.delegators, - NFTsDataUpdates: lep.logsData.nftsDataUpdates, - TokenRolesAndProperties: lep.logsData.tokenRolesAndProperties, - TxHashStatusInfo: lep.logsData.txHashStatusInfoProc.getAllRecords(), - ChangeOwnerOperations: lep.logsData.changeOwnerOperations, + Tokens: lgData.tokens, + ScDeploys: lgData.scDeploys, + TokensInfo: lgData.tokensInfo, + TokensSupply: lgData.tokensSupply, + Delegators: lgData.delegators, + NFTsDataUpdates: lgData.nftsDataUpdates, + TokenRolesAndProperties: lgData.tokenRolesAndProperties, + TxHashStatusInfo: lgData.txHashStatusInfoProc.getAllRecords(), + ChangeOwnerOperations: lgData.changeOwnerOperations, + DBLogs: lep.prepareLogsForDB(lgData, logsAndEvents, timestamp), } } -func (lep *logsAndEventsProcessor) processEvents(logHashHexEncoded string, logAddress []byte, events []*transaction.Event, shardID uint32, numOfShards uint32) { +func (lep *logsAndEventsProcessor) processEvents(lgData *logsData, logHashHexEncoded string, logAddress []byte, events []*transaction.Event, shardID uint32, numOfShards uint32) { for _, event := range events { if check.IfNil(event) { continue } - lep.processEvent(logHashHexEncoded, logAddress, event, shardID, numOfShards) + lep.processEvent(lgData, logHashHexEncoded, logAddress, event, shardID, numOfShards) } } -func (lep *logsAndEventsProcessor) processEvent(logHashHexEncoded string, logAddress []byte, event coreData.EventHandler, shardID uint32, numOfShards uint32) { +func (lep *logsAndEventsProcessor) processEvent(lgData *logsData, logHashHexEncoded string, logAddress []byte, event coreData.EventHandler, shardID uint32, numOfShards uint32) { for _, proc := range lep.eventsProcessors { res := proc.processEvent(&argsProcessEvent{ event: event, txHashHexEncoded: logHashHexEncoded, logAddress: logAddress, - tokens: lep.logsData.tokens, - tokensSupply: lep.logsData.tokensSupply, - timestamp: lep.logsData.timestamp, - scDeploys: lep.logsData.scDeploys, - txs: lep.logsData.txsMap, - scrs: lep.logsData.scrsMap, - tokenRolesAndProperties: lep.logsData.tokenRolesAndProperties, - txHashStatusInfoProc: lep.logsData.txHashStatusInfoProc, - changeOwnerOperations: lep.logsData.changeOwnerOperations, + tokens: lgData.tokens, + tokensSupply: lgData.tokensSupply, + timestamp: lgData.timestamp, + scDeploys: lgData.scDeploys, + txs: lgData.txsMap, + scrs: lgData.scrsMap, + tokenRolesAndProperties: lgData.tokenRolesAndProperties, + txHashStatusInfoProc: lgData.txHashStatusInfoProc, + changeOwnerOperations: lgData.changeOwnerOperations, selfShardID: shardID, numOfShards: numOfShards, }) if res.tokenInfo != nil { - lep.logsData.tokensInfo = append(lep.logsData.tokensInfo, res.tokenInfo) + lgData.tokensInfo = append(lgData.tokensInfo, res.tokenInfo) } if res.delegator != nil { - lep.logsData.delegators[res.delegator.Address+res.delegator.Contract] = res.delegator + lgData.delegators[res.delegator.Address+res.delegator.Contract] = res.delegator } if res.updatePropNFT != nil { - lep.logsData.nftsDataUpdates = append(lep.logsData.nftsDataUpdates, res.updatePropNFT) + lgData.nftsDataUpdates = append(lgData.nftsDataUpdates, res.updatePropNFT) } - tx, ok := lep.logsData.txsMap[logHashHexEncoded] + tx, ok := lgData.txsMap[logHashHexEncoded] if ok { tx.HasOperations = true continue } - scr, ok := lep.logsData.scrsMap[logHashHexEncoded] + scr, ok := lgData.scrsMap[logHashHexEncoded] if ok { scr.HasOperations = true continue @@ -183,8 +181,8 @@ func (lep *logsAndEventsProcessor) processEvent(logHashHexEncoded string, logAdd } } -// PrepareLogsForDB will prepare logs for database -func (lep *logsAndEventsProcessor) PrepareLogsForDB( +func (lep *logsAndEventsProcessor) prepareLogsForDB( + lgData *logsData, logsAndEvents []*outport.LogData, timestamp uint64, ) []*data.Logs { @@ -195,19 +193,20 @@ func (lep *logsAndEventsProcessor) PrepareLogsForDB( continue } - logs = append(logs, lep.prepareLogsForDB(txLog.TxHash, txLog.Log, timestamp)) + logs = append(logs, lep.prepareLog(lgData, txLog.TxHash, txLog.Log, timestamp)) } return logs } -func (lep *logsAndEventsProcessor) prepareLogsForDB( +func (lep *logsAndEventsProcessor) prepareLog( + lgData *logsData, logHashHex string, eventLogs *transaction.Log, timestamp uint64, ) *data.Logs { originalTxHash := "" - scr, ok := lep.logsData.scrsMap[logHashHex] + scr, ok := lgData.scrsMap[logHashHex] if ok { originalTxHash = scr.OriginalTxHash } diff --git a/process/elasticproc/logsevents/logsAndEventsProcessor_test.go b/process/elasticproc/logsevents/logsAndEventsProcessor_test.go index 31e4ea90..8ad3b785 100644 --- a/process/elasticproc/logsevents/logsAndEventsProcessor_test.go +++ b/process/elasticproc/logsevents/logsAndEventsProcessor_test.go @@ -229,14 +229,13 @@ func TestLogsAndEventsProcessor_PrepareLogsForDB(t *testing.T) { args := createMockArgs() proc, _ := NewLogsAndEventsProcessor(args) - _ = proc.ExtractDataFromLogs(nil, &data.PreparedResults{ScResults: []*data.ScResult{ + result := proc.ExtractDataFromLogs(logsAndEvents, &data.PreparedResults{ScResults: []*data.ScResult{ { Hash: "747848617368", OriginalTxHash: "orignalHash", }, }}, 1234, 0, 3) - logsDB := proc.PrepareLogsForDB(logsAndEvents, 1234) require.Equal(t, &data.Logs{ ID: "747848617368", Address: "61646472657373", @@ -250,7 +249,7 @@ func TestLogsAndEventsProcessor_PrepareLogsForDB(t *testing.T) { AdditionalData: [][]byte{[]byte("something")}, }, }, - }, logsDB[0]) + }, result.DBLogs[0]) } func TestLogsAndEventsProcessor_ExtractDataFromLogsNFTBurn(t *testing.T) { From d37c354d35fd5c01c9a1d80a2cb860a48fd86536 Mon Sep 17 00:00:00 2001 From: miiu Date: Tue, 9 Apr 2024 13:50:07 +0300 Subject: [PATCH 2/4] get write index --- client/elasticClient.go | 47 ++++++++++++++++- client/elasticClient_test.go | 52 ++++++++++++++++++- .../response-get-alias-only-one-index.json | 7 +++ client/testsData/response-get-alias.json | 30 +++++++++++ 4 files changed, 133 insertions(+), 3 deletions(-) create mode 100644 client/testsData/response-get-alias-only-one-index.json create mode 100644 client/testsData/response-get-alias.json diff --git a/client/elasticClient.go b/client/elasticClient.go index 3f0a66ed..f8620f6e 100644 --- a/client/elasticClient.go +++ b/client/elasticClient.go @@ -153,8 +153,14 @@ func (ec *elasticClient) DoQueryRemove(ctx context.Context, index string, body * log.Warn("elasticClient.doRefresh", "cannot do refresh", err) } + writeIndex, err := ec.getWriteIndex(index) + if err != nil { + log.Warn("elasticClient.getWriteIndex", "cannot do get write index", err) + return err + } + res, err := ec.client.DeleteByQuery( - []string{index}, + []string{writeIndex}, body, ec.client.DeleteByQuery.WithIgnoreUnavailable(true), ec.client.DeleteByQuery.WithConflicts(esConflictsPolicy), @@ -323,6 +329,45 @@ func (ec *elasticClient) createAlias(alias string, index string) error { return parseResponse(res, nil, elasticDefaultErrorResponseHandler) } +func (ec *elasticClient) getWriteIndex(alias string) (string, error) { + res, err := ec.client.Indices.GetAlias( + ec.client.Indices.GetAlias.WithIndex(alias), + ) + if err != nil { + return "", err + } + + var indexData map[string]struct { + Aliases map[string]struct { + IsWriteIndex bool `json:"is_write_index"` + } `json:"aliases"` + } + + err = parseResponse(res, &indexData, elasticDefaultErrorResponseHandler) + if err != nil { + return "", err + } + + // Iterate over the map and find the write index + var writeIndex string + for index, details := range indexData { + if len(indexData) == 1 { + return index, nil + } + + for _, indexAlias := range details.Aliases { + if indexAlias.IsWriteIndex { + writeIndex = index + break + } + } + if writeIndex != "" { + break + } + } + return writeIndex, nil +} + // UpdateByQuery will update all the documents that match the provided query from the provided index func (ec *elasticClient) UpdateByQuery(ctx context.Context, index string, buff *bytes.Buffer) error { reader := bytes.NewReader(buff.Bytes()) diff --git a/client/elasticClient_test.go b/client/elasticClient_test.go index 96b552ec..c828bb0d 100644 --- a/client/elasticClient_test.go +++ b/client/elasticClient_test.go @@ -2,7 +2,7 @@ package client import ( "context" - "io/ioutil" + "io" "net/http" "net/http/httptest" "os" @@ -53,7 +53,7 @@ func TestElasticClient_DoMultiGet(t *testing.T) { jsonFile, err := os.Open("./testsData/response-multi-get.json") require.Nil(t, err) - byteValue, _ := ioutil.ReadAll(jsonFile) + byteValue, _ := io.ReadAll(jsonFile) _, _ = w.Write(byteValue) } @@ -75,3 +75,51 @@ func TestElasticClient_DoMultiGet(t *testing.T) { _, ok := resMap["docs"] require.True(t, ok) } + +func TestElasticClient_GetWriteIndexMultipleIndicesBehind(t *testing.T) { + handler := http.NotFound + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + handler(w, r) + })) + defer ts.Close() + + handler = func(w http.ResponseWriter, r *http.Request) { + jsonFile, err := os.Open("./testsData/response-get-alias.json") + require.Nil(t, err) + + byteValue, _ := io.ReadAll(jsonFile) + _, _ = w.Write(byteValue) + } + + esClient, _ := NewElasticClient(elasticsearch.Config{ + Addresses: []string{ts.URL}, + Logger: &logging.CustomLogger{}, + }) + res, err := esClient.getWriteIndex("blocks") + require.Nil(t, err) + require.Equal(t, "blocks-000004", res) +} + +func TestElasticClient_GetWriteIndexOneIndex(t *testing.T) { + handler := http.NotFound + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + handler(w, r) + })) + defer ts.Close() + + handler = func(w http.ResponseWriter, r *http.Request) { + jsonFile, err := os.Open("./testsData/response-get-alias-only-one-index.json") + require.Nil(t, err) + + byteValue, _ := io.ReadAll(jsonFile) + _, _ = w.Write(byteValue) + } + + esClient, _ := NewElasticClient(elasticsearch.Config{ + Addresses: []string{ts.URL}, + Logger: &logging.CustomLogger{}, + }) + res, err := esClient.getWriteIndex("delegators") + require.Nil(t, err) + require.Equal(t, "delegators-000001", res) +} diff --git a/client/testsData/response-get-alias-only-one-index.json b/client/testsData/response-get-alias-only-one-index.json new file mode 100644 index 00000000..2c28eb19 --- /dev/null +++ b/client/testsData/response-get-alias-only-one-index.json @@ -0,0 +1,7 @@ +{ + "delegators-000001" : { + "aliases" : { + "delegators" : { } + } + } +} \ No newline at end of file diff --git a/client/testsData/response-get-alias.json b/client/testsData/response-get-alias.json new file mode 100644 index 00000000..a62f2670 --- /dev/null +++ b/client/testsData/response-get-alias.json @@ -0,0 +1,30 @@ +{ + "blocks-000003": { + "aliases": { + "blocks": { + "is_write_index": false + } + } + }, + "blocks-000004": { + "aliases": { + "blocks": { + "is_write_index": true + } + } + }, + "blocks-000002": { + "aliases": { + "blocks": { + "is_write_index": false + } + } + }, + "blocks-000001": { + "aliases": { + "blocks": { + "is_write_index": false + } + } + } +} \ No newline at end of file From 83bd2ee3580aa822f08d2ecbfb79d587c839f67a Mon Sep 17 00:00:00 2001 From: miiu Date: Tue, 9 Apr 2024 15:31:10 +0300 Subject: [PATCH 3/4] fixes after review --- client/elasticClient.go | 12 +++--------- .../testsData/response-get-alias-only-one-index.json | 2 +- client/testsData/response-get-alias.json | 2 +- 3 files changed, 5 insertions(+), 11 deletions(-) diff --git a/client/elasticClient.go b/client/elasticClient.go index f8620f6e..64f7e30f 100644 --- a/client/elasticClient.go +++ b/client/elasticClient.go @@ -342,14 +342,11 @@ func (ec *elasticClient) getWriteIndex(alias string) (string, error) { IsWriteIndex bool `json:"is_write_index"` } `json:"aliases"` } - err = parseResponse(res, &indexData, elasticDefaultErrorResponseHandler) if err != nil { return "", err } - // Iterate over the map and find the write index - var writeIndex string for index, details := range indexData { if len(indexData) == 1 { return index, nil @@ -357,15 +354,12 @@ func (ec *elasticClient) getWriteIndex(alias string) (string, error) { for _, indexAlias := range details.Aliases { if indexAlias.IsWriteIndex { - writeIndex = index - break + return index, nil } } - if writeIndex != "" { - break - } } - return writeIndex, nil + + return alias, nil } // UpdateByQuery will update all the documents that match the provided query from the provided index diff --git a/client/testsData/response-get-alias-only-one-index.json b/client/testsData/response-get-alias-only-one-index.json index 2c28eb19..a56b11b8 100644 --- a/client/testsData/response-get-alias-only-one-index.json +++ b/client/testsData/response-get-alias-only-one-index.json @@ -4,4 +4,4 @@ "delegators" : { } } } -} \ No newline at end of file +} diff --git a/client/testsData/response-get-alias.json b/client/testsData/response-get-alias.json index a62f2670..2c29231e 100644 --- a/client/testsData/response-get-alias.json +++ b/client/testsData/response-get-alias.json @@ -27,4 +27,4 @@ } } } -} \ No newline at end of file +} From 68f92bb4f90070f7e36a11f96bd672c406de5d5b Mon Sep 17 00:00:00 2001 From: miiu Date: Mon, 15 Apr 2024 15:46:34 +0300 Subject: [PATCH 4/4] fixes after merge --- data/logs.go | 1 + process/elasticproc/elasticProcessor.go | 16 ++++++------ process/elasticproc/interface.go | 1 - .../logsevents/logsAndEventsProcessor.go | 25 +++++++++++-------- .../logsevents/logsAndEventsProcessor_test.go | 9 +++---- 5 files changed, 25 insertions(+), 27 deletions(-) diff --git a/data/logs.go b/data/logs.go index 1b568fe7..2daf0179 100644 --- a/data/logs.go +++ b/data/logs.go @@ -38,4 +38,5 @@ type PreparedLogsResults struct { NFTsDataUpdates []*NFTDataUpdate TokenRolesAndProperties *tokeninfo.TokenRolesAndProperties DBLogs []*Logs + DBEvents []*LogEvent } diff --git a/process/elasticproc/elasticProcessor.go b/process/elasticproc/elasticProcessor.go index 17efc8c1..640e5f09 100644 --- a/process/elasticproc/elasticProcessor.go +++ b/process/elasticproc/elasticProcessor.go @@ -430,7 +430,12 @@ func (ei *elasticProcessor) SaveTransactions(obh *outport.OutportBlockWithHeader return err } - err = ei.prepareAndIndexLogs(logsData.DBLogs, buffers, obh.ShardID) + err = ei.indexLogs(logsData.DBLogs, buffers) + if err != nil { + return err + } + + err = ei.indexEvents(logsData.DBEvents, buffers) if err != nil { return err } @@ -517,14 +522,7 @@ func (ei *elasticProcessor) indexTransactionsFeeData(txsHashFeeData map[string]* return ei.transactionsProc.SerializeTransactionsFeeData(txsHashFeeData, buffSlice, elasticIndexer.OperationsIndex) } -func (ei *elasticProcessor) prepareAndIndexLogs(logsDB []*data.Logs, buffSlice *data.BufferSlice) error { -func (ei *elasticProcessor) prepareAndIndexLogs(logsAndEvents []*outport.LogData, timestamp uint64, buffSlice *data.BufferSlice, shardID uint32) error { - logsDB, eventsDB := ei.logsAndEventsProc.PrepareLogsForDB(logsAndEvents, timestamp, shardID) - err := ei.indexEvents(eventsDB, buffSlice) - if err != nil { - return err - } - +func (ei *elasticProcessor) indexLogs(logsDB []*data.Logs, buffSlice *data.BufferSlice) error { if !ei.isIndexEnabled(elasticIndexer.LogsIndex) { return nil } diff --git a/process/elasticproc/interface.go b/process/elasticproc/interface.go index ae1f9c1d..997697b2 100644 --- a/process/elasticproc/interface.go +++ b/process/elasticproc/interface.go @@ -91,7 +91,6 @@ type DBValidatorsHandler interface { // DBLogsAndEventsHandler defines the actions that a logs and events handler should do type DBLogsAndEventsHandler interface { - PrepareLogsForDB(logsAndEvents []*outport.LogData, timestamp uint64, shardID uint32) ([]*data.Logs, []*data.LogEvent) ExtractDataFromLogs( logsAndEvents []*outport.LogData, preparedResults *data.PreparedResults, diff --git a/process/elasticproc/logsevents/logsAndEventsProcessor.go b/process/elasticproc/logsevents/logsAndEventsProcessor.go index 02928d16..e048d2e6 100644 --- a/process/elasticproc/logsevents/logsAndEventsProcessor.go +++ b/process/elasticproc/logsevents/logsAndEventsProcessor.go @@ -117,6 +117,8 @@ func (lep *logsAndEventsProcessor) ExtractDataFromLogs( } } + dbLogs, dbEvents := lep.prepareLogsForDB(lgData, logsAndEvents, timestamp, shardID) + return &data.PreparedLogsResults{ Tokens: lgData.tokens, ScDeploys: lgData.scDeploys, @@ -127,7 +129,8 @@ func (lep *logsAndEventsProcessor) ExtractDataFromLogs( TokenRolesAndProperties: lgData.tokenRolesAndProperties, TxHashStatusInfo: lgData.txHashStatusInfoProc.getAllRecords(), ChangeOwnerOperations: lgData.changeOwnerOperations, - DBLogs: lep.prepareLogsForDB(lgData, logsAndEvents, timestamp), + DBLogs: dbLogs, + DBEvents: dbEvents, } } @@ -200,11 +203,10 @@ func (lep *logsAndEventsProcessor) prepareLogsForDB( continue } - dbLog, logEvents := lep.prepareLogsForDB(txLog.TxHash, txLog.Log, timestamp, shardID) + dbLog, logEvents := lep.prepareLog(lgData, txLog.TxHash, txLog.Log, timestamp, shardID) logs = append(logs, dbLog) events = append(events, logEvents...) - } return logs, events @@ -215,8 +217,9 @@ func (lep *logsAndEventsProcessor) prepareLog( logHashHex string, eventLogs *transaction.Log, timestamp uint64, + shardID uint32, ) (*data.Logs, []*data.LogEvent) { - originalTxHash := lep.getOriginalTxHash(logHashHex) + originalTxHash := lep.getOriginalTxHash(lgData, logHashHex) encodedAddr := lep.pubKeyConverter.SilentEncode(eventLogs.GetAddress(), log) logsDB := &data.Logs{ ID: logHashHex, @@ -242,7 +245,7 @@ func (lep *logsAndEventsProcessor) prepareLog( } logsDB.Events = append(logsDB.Events, logEvent) - executionOrder := lep.getExecutionOrder(logHashHex) + executionOrder := lep.getExecutionOrder(lgData, logHashHex) dbEvents = append(dbEvents, lep.prepareLogEvent(logsDB, logEvent, shardID, executionOrder)) } @@ -269,12 +272,12 @@ func (lep *logsAndEventsProcessor) prepareLogEvent(dbLog *data.Logs, event *data return dbEvent } -func (lep *logsAndEventsProcessor) getOriginalTxHash(logHashHex string) string { - if lep.logsData.scrsMap == nil { +func (lep *logsAndEventsProcessor) getOriginalTxHash(lgData *logsData, logHashHex string) string { + if lgData.scrsMap == nil { return "" } - scr, ok := lep.logsData.scrsMap[logHashHex] + scr, ok := lgData.scrsMap[logHashHex] if ok { return scr.OriginalTxHash } @@ -282,13 +285,13 @@ func (lep *logsAndEventsProcessor) getOriginalTxHash(logHashHex string) string { return "" } -func (lep *logsAndEventsProcessor) getExecutionOrder(logHashHex string) int { - tx, ok := lep.logsData.txsMap[logHashHex] +func (lep *logsAndEventsProcessor) getExecutionOrder(lgData *logsData, logHashHex string) int { + tx, ok := lgData.txsMap[logHashHex] if ok { return tx.ExecutionOrder } - scr, ok := lep.logsData.scrsMap[logHashHex] + scr, ok := lgData.scrsMap[logHashHex] if ok { return scr.ExecutionOrder } diff --git a/process/elasticproc/logsevents/logsAndEventsProcessor_test.go b/process/elasticproc/logsevents/logsAndEventsProcessor_test.go index 35d7f9bc..321e007b 100644 --- a/process/elasticproc/logsevents/logsAndEventsProcessor_test.go +++ b/process/elasticproc/logsevents/logsAndEventsProcessor_test.go @@ -236,7 +236,6 @@ func TestLogsAndEventsProcessor_PrepareLogsForDB(t *testing.T) { }, }}, 1234, 0, 3) - logsDB, _ := proc.PrepareLogsForDB(logsAndEvents, 1234, 0) require.Equal(t, &data.Logs{ ID: "747848617368", Address: "61646472657373", @@ -301,7 +300,6 @@ func TestPrepareLogsAndEvents_LogEvents(t *testing.T) { t.Parallel() logsAndEvents := []*outport.LogData{ - nil, { TxHash: hex.EncodeToString([]byte("txHash")), Log: &transaction.Log{ @@ -328,14 +326,13 @@ func TestPrepareLogsAndEvents_LogEvents(t *testing.T) { args := createMockArgs() proc, _ := NewLogsAndEventsProcessor(args) - _ = proc.ExtractDataFromLogs(nil, &data.PreparedResults{ScResults: []*data.ScResult{ + results := proc.ExtractDataFromLogs(logsAndEvents, &data.PreparedResults{ScResults: []*data.ScResult{ { Hash: "747848617368", OriginalTxHash: "originalHash", }, - }}, 1234, 0, 3) + }}, 1234, 1, 3) - _, eventsDB := proc.PrepareLogsForDB(logsAndEvents, 1234, 1) require.Equal(t, []*data.LogEvent{ { ID: "747848617368-1-0", @@ -366,7 +363,7 @@ func TestPrepareLogsAndEvents_LogEvents(t *testing.T) { Timestamp: 1234, TxOrder: 0, }, - }, eventsDB) + }, results.DBEvents) } func TestHexEncodeSlice(t *testing.T) {