Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tail lock revisited #1328

Merged
merged 7 commits into from
Sep 2, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 5 additions & 11 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ func NewBlockChain(
// if txlookup limit is 0 (uindexing disabled), we don't need to repair the tx index tail.
if bc.cacheConfig.TransactionHistory != 0 {
latestStateSynced := rawdb.GetLatestSyncPerformed(bc.db)
bc.setTxIndexTail(latestStateSynced)
bc.repairTxIndexTail(latestStateSynced)
}

// Start processing accepted blocks effects in the background
Expand Down Expand Up @@ -2147,7 +2147,7 @@ func (bc *BlockChain) ResetToStateSyncedBlock(block *types.Block) error {

// if txlookup limit is 0 (uindexing disabled), we don't need to repair the tx index tail.
if bc.cacheConfig.TransactionHistory != 0 {
bc.setTxIndexTail(block.NumberU64())
bc.repairTxIndexTail(block.NumberU64())
}

// Update all in-memory chain markers
Expand Down Expand Up @@ -2183,18 +2183,12 @@ func (bc *BlockChain) CacheConfig() *CacheConfig {
return bc.cacheConfig
}

func (bc *BlockChain) setTxIndexTail(newTail uint64) error {
func (bc *BlockChain) repairTxIndexTail(newTail uint64) error {
bc.txIndexTailLock.Lock()
defer bc.txIndexTailLock.Unlock()

tailP := rawdb.ReadTxIndexTail(bc.db)
var tailV uint64
if tailP != nil {
tailV = *tailP
}

if newTail > tailV {
log.Info("Repairing tx index tail", "old", tailV, "new", newTail)
if currentTail := rawdb.ReadTxIndexTail(bc.db); currentTail == nil || newTail > *currentTail {
ARR4N marked this conversation as resolved.
Show resolved Hide resolved
log.Info("Repairing tx index tail", "old", currentTail, "new", newTail)
rawdb.WriteTxIndexTail(bc.db, newTail)
}
return nil
Expand Down
44 changes: 22 additions & 22 deletions core/txindexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,9 @@ func newTxIndexer(limit uint64, chain *BlockChain) *txIndexer {
// possible, the done channel will be closed once the task is finished.
func (indexer *txIndexer) run(tail *uint64, head uint64, stop chan struct{}, done chan struct{}) {
start := time.Now()
indexer.chain.txIndexTailLock.Lock()
defer func() {
txUnindexTimer.Inc(time.Since(start).Milliseconds())
indexer.chain.txIndexTailLock.Unlock()
close(done)
indexer.chain.wg.Done()
}()

// Short circuit if chain is empty and nothing to index.
Expand All @@ -99,33 +96,27 @@ func (indexer *txIndexer) run(tail *uint64, head uint64, stop chan struct{}, don
}

// Defensively ensure tail is not nil.
if tail == nil {
tail = new(uint64)
tailValue := uint64(0)
ARR4N marked this conversation as resolved.
Show resolved Hide resolved
if tail != nil {
// use intermediate variable to avoid modifying the pointer
tailValue = *tail
}

if head-indexer.limit+1 >= *tail {
if head-indexer.limit+1 >= tailValue {
// Unindex a part of stale indices and forward index tail to HEAD-limit
rawdb.UnindexTransactions(indexer.db, *tail, head-indexer.limit+1, stop, false)
rawdb.UnindexTransactions(indexer.db, tailValue, head-indexer.limit+1, stop, false)
}
}

// loop is the scheduler of the indexer, assigning indexing/unindexing tasks depending
// on the received chain event.
func (indexer *txIndexer) loop(chain *BlockChain) {
defer close(indexer.closed)

// If the user just upgraded to a new version which supports transaction
// index pruning, write the new tail and remove anything older.
if rawdb.ReadTxIndexTail(indexer.db) == nil {
rawdb.WriteTxIndexTail(indexer.db, 0)
}

// Listening to chain events and manipulate the transaction indexes.
var (
stop chan struct{} // Non-nil if background routine is active.
done chan struct{} // Non-nil if background routine is active.
lastHead uint64 // The latest announced chain head (whose tx indexes are assumed created)
lastTail = rawdb.ReadTxIndexTail(indexer.db) // The oldest indexed block, nil means nothing indexed
stop chan struct{} // Non-nil if background routine is active.
done chan struct{} // Non-nil if background routine is active.
lastHead uint64 // The latest announced chain head (whose tx indexes are assumed created)

headCh = make(chan ChainEvent)
sub = chain.SubscribeChainAcceptedEvent(headCh)
Expand All @@ -145,7 +136,12 @@ func (indexer *txIndexer) loop(chain *BlockChain) {
done = make(chan struct{})
lastHead = head.Number.Uint64()
indexer.chain.wg.Add(1)
go indexer.run(rawdb.ReadTxIndexTail(indexer.db), head.Number.Uint64(), stop, done)
go func() {
ARR4N marked this conversation as resolved.
Show resolved Hide resolved
indexer.chain.txIndexTailLock.Lock()
indexer.run(rawdb.ReadTxIndexTail(indexer.db), head.Number.Uint64(), stop, done)
indexer.chain.txIndexTailLock.Unlock()
indexer.chain.wg.Done()
}()
}
for {
select {
Expand All @@ -159,15 +155,19 @@ func (indexer *txIndexer) loop(chain *BlockChain) {
stop = make(chan struct{})
done = make(chan struct{})
indexer.chain.wg.Add(1)
go indexer.run(rawdb.ReadTxIndexTail(indexer.db), head.Block.NumberU64(), stop, done)
go func() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this must always be the same as the equivalent go routine then consider abstracting it into shared code, otherwise all LGTM.

indexer.chain.txIndexTailLock.Lock()
indexer.run(rawdb.ReadTxIndexTail(indexer.db), head.Block.NumberU64(), stop, done)
indexer.chain.txIndexTailLock.Unlock()
indexer.chain.wg.Done()
}()
}
lastHead = head.Block.NumberU64()
case <-done:
stop = nil
done = nil
lastTail = rawdb.ReadTxIndexTail(indexer.db)
case ch := <-indexer.progress:
ch <- indexer.report(lastHead, lastTail)
ch <- indexer.report(lastHead, rawdb.ReadTxIndexTail(indexer.db))
case ch := <-indexer.term:
if stop != nil {
close(stop)
Expand Down
Loading