Skip to content

Commit

Permalink
perf: update current block number when loop read logs
Browse files Browse the repository at this point in the history
  • Loading branch information
CanvasL committed Nov 6, 2023
1 parent a21fd02 commit 6317e23
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 41 deletions.
2 changes: 1 addition & 1 deletion config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ contracts:
CollectPaidMw:
address: "0xb09ae63a2fd28686a0f386d1ddfd4b53687bf298"
start_at: 25173248
query_history: true
query_history: false
ProfileNFT:
address: "0x2723522702093601e6360cae665518c4f63e9da6"
start_at: 21275193
Expand Down
22 changes: 12 additions & 10 deletions dao/mysql/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,23 +34,25 @@ func InsertCollectPaidMwSetEvent(param *model.ParamCollectPaidMwSetEvent) error
}

func GetCollectPaidMwSetEventParams(chainID uint64, profileID string, essenceID string) (*model.ParamCollectPaidMwSetEvent, error) {
collectInfoList := new(model.ParamCollectPaidMwSetEvent)
collectInfo := new(model.ParamCollectPaidMwSetEvent)
sqlStr := "SELECT chain_name, chain_id, block_number, tx_hash, namespace, profile_id, essence_id, total_supply, amount, recipient, currency, subscribe_required FROM collect_paid_mw_set_events WHERE chain_id = ? AND profile_id = ? AND essence_id = ?"
err := db.Get(collectInfoList, sqlStr, chainID, profileID, essenceID)
if(err != nil) {
err := db.Get(collectInfo, sqlStr, chainID, profileID, essenceID)
if err != nil {
if err == sql.ErrNoRows {
return nil, nil
} else {
return nil, err
}
}
return collectInfoList, nil
return collectInfo, nil
}

func GetLatestTrackedCollectPaidMwSetBlockNumber(chainID uint64) (uint64, error) {
collectInfoList := make([]*model.ParamCollectPaidMwSetEvent, 0, 2)
collectInfo := new(model.ParamCollectPaidMwSetEvent)
sqlStr := "SELECT block_number FROM collect_paid_mw_set_events WHERE chain_id = ? ORDER BY id DESC LIMIT 1"
err := db.Select(&collectInfoList, sqlStr, chainID)
if(err != nil) {
return 0 ,err
err := db.Get(collectInfo, sqlStr, chainID)
if err != nil {
return 0, err
}
return collectInfoList[0].BlockNumber, nil
}
return collectInfo.BlockNumber, nil
}
20 changes: 7 additions & 13 deletions dao/mysql/profile.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package mysql

import (
"cyber-events-tracker/model"
"database/sql"
)

func InsertCreateProfileEvent(param *model.ParamCreateProfileEvent) (err error) {
Expand All @@ -24,20 +23,15 @@ func GetCreateProfileEventParams(chainID uint64, account string) ([]*model.Param
profileInfoList := make([]*model.ParamCreateProfileEvent, 0, 2)
sqlStr := "SELECT chain_name, chain_id, block_number, tx_hash, `to`, profile_id, handle, avatar, metadata FROM create_profile_events WHERE chain_id = ? AND `to` = ? ORDER BY block_number DESC"
err := db.Select(&profileInfoList, sqlStr, chainID, account)
if(err != nil) {
if err == sql.ErrNoRows {
return []*model.ParamCreateProfileEvent{}, nil
}
}
return profileInfoList, nil
return profileInfoList, err
}

func GetLatestTrackedCreateProfileBlockNumber(chainID uint64) (uint64, error) {
profileInfoList := make([]*model.ParamCreateProfileEvent, 0, 2)
profileInfo := new(model.ParamCreateProfileEvent)
sqlStr := "SELECT block_number FROM create_profile_events WHERE chain_id = ? ORDER BY id DESC LIMIT 1"
err := db.Select(&profileInfoList, sqlStr, chainID)
if(err != nil) {
return 0 ,err
err := db.Get(profileInfo, sqlStr, chainID)
if err != nil {
return 0, err
}
return profileInfoList[0].BlockNumber, nil
}
return profileInfo.BlockNumber, nil
}
28 changes: 19 additions & 9 deletions listener/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func CollectPaidMwSetEventListener(chainID uint64, contractAddress common.Addres
if err != nil {
log.Fatalf("[%s]: SubscribeFilterLogs CollectPaidMwSet failed, %v", chainID, err)
}
log.Printf("[%d] Chan CollectPaidMwSet started.", chainID)
log.Printf("[%d]: Chan CollectPaidMwSet started.", chainID)

for {
select {
Expand All @@ -69,13 +69,6 @@ func QueryCollectPaidMwSetEvents(chainID uint64, contractAddress common.Address,
return
}

_currentBlockNumber, err := ethClient.BlockNumber(context.Background())
if err != nil {
log.Fatalf("[%d]: Get current BlockNumber failed, %v", chainID, err)
}

currentBlockNumber := big.NewInt(int64(_currentBlockNumber))

var _startAt *big.Int
var _endAt *big.Int = big.NewInt(0)
previousAt, err := logic.GetPreviousTrackedCollectPaidMwSetBlockNumber(chainID)
Expand All @@ -92,6 +85,16 @@ func QueryCollectPaidMwSetEvents(chainID uint64, contractAddress common.Address,
log.Printf("[%d]: Continue query CollectPaidMwSet events at [%d]...", chainID, _startAt.Uint64())
}

var _currentBlockNumber uint64
var currentBlockNumber *big.Int

_currentBlockNumber, err = ethClient.BlockNumber(context.Background())
if err != nil {
log.Fatalf("[%d]: Get current BlockNumber failed, %v", chainID, err)
return
}
currentBlockNumber = big.NewInt(int64(_currentBlockNumber))

for {
_endAt.Add(_startAt, big.NewInt(49999))
if _endAt.Cmp(currentBlockNumber) > 0 {
Expand All @@ -113,13 +116,20 @@ func QueryCollectPaidMwSetEvents(chainID uint64, contractAddress common.Address,

for _, historyLog := range historyLogs {
err = logic.SetCollectInfo(chainID, historyLog)
if(err != nil) {
if err != nil {
log.Fatalf("[%d]: SetCollectInfo failed, %v", chainID, err)
}
}

time.Sleep(200 * time.Millisecond)

_currentBlockNumber, err = ethClient.BlockNumber(context.Background())
if err != nil {
log.Fatalf("[%d]: Get current BlockNumber failed, %v", chainID, err)
} else {
currentBlockNumber = big.NewInt(int64(_currentBlockNumber))
}

if _endAt.Cmp(currentBlockNumber) == 0 {
break
}
Expand Down
26 changes: 18 additions & 8 deletions listener/profile.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func CreateProfileEventListener(chainID uint64, contractAddress common.Address)
if err != nil {
log.Fatalf("[%s]: SubscribeFilterLogs CreateProfile failed, %v", chainID, err)
}
log.Printf("[%d] Chan CreateProfile started.", chainID)
log.Printf("[%d]: Chan CreateProfile started.", chainID)

for {
select {
Expand All @@ -69,13 +69,6 @@ func QueryCreateProfileEvents(chainID uint64, contractAddress common.Address, st
return
}

_currentBlockNumber, err := ethClient.BlockNumber(context.Background())
if err != nil {
log.Fatalf("[%d]: Get current BlockNumber failed, %v", chainID, err)
}

currentBlockNumber := big.NewInt(int64(_currentBlockNumber))

var _startAt *big.Int
var _endAt *big.Int = big.NewInt(0)
previousAt, err := logic.GetPreviousTrackedCreateProfileBlockNumber(chainID)
Expand All @@ -92,6 +85,16 @@ func QueryCreateProfileEvents(chainID uint64, contractAddress common.Address, st
log.Printf("[%d]: Continue query CreateProfile events at [%d]...", chainID, _startAt.Uint64())
}

var _currentBlockNumber uint64
var currentBlockNumber *big.Int

_currentBlockNumber, err = ethClient.BlockNumber(context.Background())
if err != nil {
log.Fatalf("[%d]: Get current BlockNumber failed, %v", chainID, err)
return
}
currentBlockNumber = big.NewInt(int64(_currentBlockNumber))

for {
_endAt.Add(_startAt, big.NewInt(49999))
if _endAt.Cmp(currentBlockNumber) > 0 {
Expand Down Expand Up @@ -120,6 +123,13 @@ func QueryCreateProfileEvents(chainID uint64, contractAddress common.Address, st

time.Sleep(200 * time.Millisecond)

_currentBlockNumber, err = ethClient.BlockNumber(context.Background())
if err != nil {
log.Fatalf("[%d]: Get current BlockNumber failed, %v", chainID, err)
} else {
currentBlockNumber = big.NewInt(int64(_currentBlockNumber))
}

if _endAt.Cmp(currentBlockNumber) == 0 {
break
}
Expand Down

0 comments on commit 6317e23

Please sign in to comment.