Skip to content

Commit

Permalink
stateless log processor
Browse files Browse the repository at this point in the history
  • Loading branch information
miiu96 committed Mar 26, 2024
1 parent a33494b commit 0e909a4
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 46 deletions.
1 change: 1 addition & 0 deletions data/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,5 @@ type PreparedLogsResults struct {
TokensInfo []*TokenInfo
NFTsDataUpdates []*NFTDataUpdate
TokenRolesAndProperties *tokeninfo.TokenRolesAndProperties
DBLogs []*Logs
}
6 changes: 2 additions & 4 deletions process/elasticproc/elasticProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ func (ei *elasticProcessor) SaveTransactions(obh *outport.OutportBlockWithHeader
return err
}

err = ei.prepareAndIndexLogs(obh.TransactionPool.Logs, headerTimestamp, buffers)
err = ei.prepareAndIndexLogs(logsData.DBLogs, buffers)
if err != nil {
return err
}
Expand Down Expand Up @@ -511,13 +511,11 @@ func (ei *elasticProcessor) indexTransactionsFeeData(txsHashFeeData map[string]*
return ei.transactionsProc.SerializeTransactionsFeeData(txsHashFeeData, buffSlice, elasticIndexer.OperationsIndex)
}

func (ei *elasticProcessor) prepareAndIndexLogs(logsAndEvents []*outport.LogData, timestamp uint64, buffSlice *data.BufferSlice) error {
func (ei *elasticProcessor) prepareAndIndexLogs(logsDB []*data.Logs, buffSlice *data.BufferSlice) error {
if !ei.isIndexEnabled(elasticIndexer.LogsIndex) {
return nil
}

logsDB := ei.logsAndEventsProc.PrepareLogsForDB(logsAndEvents, timestamp)

return ei.logsAndEventsProc.SerializeLogs(logsDB, buffSlice, elasticIndexer.LogsIndex)
}

Expand Down
1 change: 0 additions & 1 deletion process/elasticproc/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ type DBValidatorsHandler interface {

// DBLogsAndEventsHandler defines the actions that a logs and events handler should do
type DBLogsAndEventsHandler interface {
PrepareLogsForDB(logsAndEvents []*outport.LogData, timestamp uint64) []*data.Logs
ExtractDataFromLogs(
logsAndEvents []*outport.LogData,
preparedResults *data.PreparedResults,
Expand Down
75 changes: 37 additions & 38 deletions process/elasticproc/logsevents/logsAndEventsProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ type logsAndEventsProcessor struct {
hasher hashing.Hasher
pubKeyConverter core.PubkeyConverter
eventsProcessors []eventsProcessor

logsData *logsData
}

// NewLogsAndEventsProcessor will create a new instance for the logsAndEventsProcessor
Expand Down Expand Up @@ -93,85 +91,85 @@ func (lep *logsAndEventsProcessor) ExtractDataFromLogs(
shardID uint32,
numOfShards uint32,
) *data.PreparedLogsResults {
lep.logsData = newLogsData(timestamp, preparedResults.Transactions, preparedResults.ScResults)

lgData := newLogsData(timestamp, preparedResults.Transactions, preparedResults.ScResults)
for _, txLog := range logsAndEvents {
if txLog == nil {
continue
}

events := txLog.Log.Events
lep.processEvents(txLog.TxHash, txLog.Log.Address, events, shardID, numOfShards)
lep.processEvents(lgData, txLog.TxHash, txLog.Log.Address, events, shardID, numOfShards)

tx, ok := lep.logsData.txsMap[txLog.TxHash]
tx, ok := lgData.txsMap[txLog.TxHash]
if ok {
tx.HasLogs = true
continue
}
scr, ok := lep.logsData.scrsMap[txLog.TxHash]
scr, ok := lgData.scrsMap[txLog.TxHash]
if ok {
scr.HasLogs = true
continue
}
}

return &data.PreparedLogsResults{
Tokens: lep.logsData.tokens,
ScDeploys: lep.logsData.scDeploys,
TokensInfo: lep.logsData.tokensInfo,
TokensSupply: lep.logsData.tokensSupply,
Delegators: lep.logsData.delegators,
NFTsDataUpdates: lep.logsData.nftsDataUpdates,
TokenRolesAndProperties: lep.logsData.tokenRolesAndProperties,
TxHashStatusInfo: lep.logsData.txHashStatusInfoProc.getAllRecords(),
ChangeOwnerOperations: lep.logsData.changeOwnerOperations,
Tokens: lgData.tokens,
ScDeploys: lgData.scDeploys,
TokensInfo: lgData.tokensInfo,
TokensSupply: lgData.tokensSupply,
Delegators: lgData.delegators,
NFTsDataUpdates: lgData.nftsDataUpdates,
TokenRolesAndProperties: lgData.tokenRolesAndProperties,
TxHashStatusInfo: lgData.txHashStatusInfoProc.getAllRecords(),
ChangeOwnerOperations: lgData.changeOwnerOperations,
DBLogs: lep.prepareLogsForDB(lgData, logsAndEvents, timestamp),
}
}

func (lep *logsAndEventsProcessor) processEvents(logHashHexEncoded string, logAddress []byte, events []*transaction.Event, shardID uint32, numOfShards uint32) {
func (lep *logsAndEventsProcessor) processEvents(lgData *logsData, logHashHexEncoded string, logAddress []byte, events []*transaction.Event, shardID uint32, numOfShards uint32) {
for _, event := range events {
if check.IfNil(event) {
continue
}

lep.processEvent(logHashHexEncoded, logAddress, event, shardID, numOfShards)
lep.processEvent(lgData, logHashHexEncoded, logAddress, event, shardID, numOfShards)
}
}

func (lep *logsAndEventsProcessor) processEvent(logHashHexEncoded string, logAddress []byte, event coreData.EventHandler, shardID uint32, numOfShards uint32) {
func (lep *logsAndEventsProcessor) processEvent(lgData *logsData, logHashHexEncoded string, logAddress []byte, event coreData.EventHandler, shardID uint32, numOfShards uint32) {
for _, proc := range lep.eventsProcessors {
res := proc.processEvent(&argsProcessEvent{
event: event,
txHashHexEncoded: logHashHexEncoded,
logAddress: logAddress,
tokens: lep.logsData.tokens,
tokensSupply: lep.logsData.tokensSupply,
timestamp: lep.logsData.timestamp,
scDeploys: lep.logsData.scDeploys,
txs: lep.logsData.txsMap,
scrs: lep.logsData.scrsMap,
tokenRolesAndProperties: lep.logsData.tokenRolesAndProperties,
txHashStatusInfoProc: lep.logsData.txHashStatusInfoProc,
changeOwnerOperations: lep.logsData.changeOwnerOperations,
tokens: lgData.tokens,
tokensSupply: lgData.tokensSupply,
timestamp: lgData.timestamp,
scDeploys: lgData.scDeploys,
txs: lgData.txsMap,
scrs: lgData.scrsMap,
tokenRolesAndProperties: lgData.tokenRolesAndProperties,
txHashStatusInfoProc: lgData.txHashStatusInfoProc,
changeOwnerOperations: lgData.changeOwnerOperations,
selfShardID: shardID,
numOfShards: numOfShards,
})
if res.tokenInfo != nil {
lep.logsData.tokensInfo = append(lep.logsData.tokensInfo, res.tokenInfo)
lgData.tokensInfo = append(lgData.tokensInfo, res.tokenInfo)
}
if res.delegator != nil {
lep.logsData.delegators[res.delegator.Address+res.delegator.Contract] = res.delegator
lgData.delegators[res.delegator.Address+res.delegator.Contract] = res.delegator
}
if res.updatePropNFT != nil {
lep.logsData.nftsDataUpdates = append(lep.logsData.nftsDataUpdates, res.updatePropNFT)
lgData.nftsDataUpdates = append(lgData.nftsDataUpdates, res.updatePropNFT)
}

tx, ok := lep.logsData.txsMap[logHashHexEncoded]
tx, ok := lgData.txsMap[logHashHexEncoded]
if ok {
tx.HasOperations = true
continue
}
scr, ok := lep.logsData.scrsMap[logHashHexEncoded]
scr, ok := lgData.scrsMap[logHashHexEncoded]
if ok {
scr.HasOperations = true
continue
Expand All @@ -183,8 +181,8 @@ func (lep *logsAndEventsProcessor) processEvent(logHashHexEncoded string, logAdd
}
}

// PrepareLogsForDB will prepare logs for database
func (lep *logsAndEventsProcessor) PrepareLogsForDB(
func (lep *logsAndEventsProcessor) prepareLogsForDB(
lgData *logsData,
logsAndEvents []*outport.LogData,
timestamp uint64,
) []*data.Logs {
Expand All @@ -195,19 +193,20 @@ func (lep *logsAndEventsProcessor) PrepareLogsForDB(
continue
}

logs = append(logs, lep.prepareLogsForDB(txLog.TxHash, txLog.Log, timestamp))
logs = append(logs, lep.prepareLog(lgData, txLog.TxHash, txLog.Log, timestamp))
}

return logs
}

func (lep *logsAndEventsProcessor) prepareLogsForDB(
func (lep *logsAndEventsProcessor) prepareLog(
lgData *logsData,
logHashHex string,
eventLogs *transaction.Log,
timestamp uint64,
) *data.Logs {
originalTxHash := ""
scr, ok := lep.logsData.scrsMap[logHashHex]
scr, ok := lgData.scrsMap[logHashHex]
if ok {
originalTxHash = scr.OriginalTxHash
}
Expand Down
5 changes: 2 additions & 3 deletions process/elasticproc/logsevents/logsAndEventsProcessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,14 +229,13 @@ func TestLogsAndEventsProcessor_PrepareLogsForDB(t *testing.T) {
args := createMockArgs()
proc, _ := NewLogsAndEventsProcessor(args)

_ = proc.ExtractDataFromLogs(nil, &data.PreparedResults{ScResults: []*data.ScResult{
result := proc.ExtractDataFromLogs(logsAndEvents, &data.PreparedResults{ScResults: []*data.ScResult{
{
Hash: "747848617368",
OriginalTxHash: "orignalHash",
},
}}, 1234, 0, 3)

logsDB := proc.PrepareLogsForDB(logsAndEvents, 1234)
require.Equal(t, &data.Logs{
ID: "747848617368",
Address: "61646472657373",
Expand All @@ -250,7 +249,7 @@ func TestLogsAndEventsProcessor_PrepareLogsForDB(t *testing.T) {
AdditionalData: [][]byte{[]byte("something")},
},
},
}, logsDB[0])
}, result.DBLogs[0])
}

func TestLogsAndEventsProcessor_ExtractDataFromLogsNFTBurn(t *testing.T) {
Expand Down

0 comments on commit 0e909a4

Please sign in to comment.