Skip to content

Commit

Permalink
Tail lock revisited (#1328)
Browse files Browse the repository at this point in the history
* avoid modifying the pointer

* improve readability

* prevent race in tail reading

* improve repairTxIndexTail readability

* use shared func
  • Loading branch information
ceyonur authored Sep 2, 2024
1 parent 356a93d commit e407ce4
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 33 deletions.
16 changes: 5 additions & 11 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ func NewBlockChain(
// if txlookup limit is 0 (uindexing disabled), we don't need to repair the tx index tail.
if bc.cacheConfig.TransactionHistory != 0 {
latestStateSynced := rawdb.GetLatestSyncPerformed(bc.db)
bc.setTxIndexTail(latestStateSynced)
bc.repairTxIndexTail(latestStateSynced)
}

// Start processing accepted blocks effects in the background
Expand Down Expand Up @@ -2124,7 +2124,7 @@ func (bc *BlockChain) ResetToStateSyncedBlock(block *types.Block) error {

// if txlookup limit is 0 (uindexing disabled), we don't need to repair the tx index tail.
if bc.cacheConfig.TransactionHistory != 0 {
bc.setTxIndexTail(block.NumberU64())
bc.repairTxIndexTail(block.NumberU64())
}

// Update all in-memory chain markers
Expand Down Expand Up @@ -2160,18 +2160,12 @@ func (bc *BlockChain) CacheConfig() *CacheConfig {
return bc.cacheConfig
}

func (bc *BlockChain) setTxIndexTail(newTail uint64) error {
func (bc *BlockChain) repairTxIndexTail(newTail uint64) error {
bc.txIndexTailLock.Lock()
defer bc.txIndexTailLock.Unlock()

tailP := rawdb.ReadTxIndexTail(bc.db)
var tailV uint64
if tailP != nil {
tailV = *tailP
}

if newTail > tailV {
log.Info("Repairing tx index tail", "old", tailV, "new", newTail)
if curr := rawdb.ReadTxIndexTail(bc.db); curr == nil || *curr < newTail {
log.Info("Repairing tx index tail", "old", curr, "new", newTail)
rawdb.WriteTxIndexTail(bc.db, newTail)
}
return nil
Expand Down
47 changes: 25 additions & 22 deletions core/txindexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,9 @@ func newTxIndexer(limit uint64, chain *BlockChain) *txIndexer {
// possible, the done channel will be closed once the task is finished.
func (indexer *txIndexer) run(tail *uint64, head uint64, stop chan struct{}, done chan struct{}) {
start := time.Now()
indexer.chain.txIndexTailLock.Lock()
defer func() {
txUnindexTimer.Inc(time.Since(start).Milliseconds())
indexer.chain.txIndexTailLock.Unlock()
close(done)
indexer.chain.wg.Done()
}()

// Short circuit if chain is empty and nothing to index.
Expand All @@ -99,33 +96,27 @@ func (indexer *txIndexer) run(tail *uint64, head uint64, stop chan struct{}, don
}

// Defensively ensure tail is not nil.
if tail == nil {
tail = new(uint64)
tailValue := uint64(0)
if tail != nil {
// use intermediate variable to avoid modifying the pointer
tailValue = *tail
}

if head-indexer.limit+1 >= *tail {
if head-indexer.limit+1 >= tailValue {
// Unindex a part of stale indices and forward index tail to HEAD-limit
rawdb.UnindexTransactions(indexer.db, *tail, head-indexer.limit+1, stop, false)
rawdb.UnindexTransactions(indexer.db, tailValue, head-indexer.limit+1, stop, false)
}
}

// loop is the scheduler of the indexer, assigning indexing/unindexing tasks depending
// on the received chain event.
func (indexer *txIndexer) loop(chain *BlockChain) {
defer close(indexer.closed)

// If the user just upgraded to a new version which supports transaction
// index pruning, write the new tail and remove anything older.
if rawdb.ReadTxIndexTail(indexer.db) == nil {
rawdb.WriteTxIndexTail(indexer.db, 0)
}

// Listening to chain events and manipulate the transaction indexes.
var (
stop chan struct{} // Non-nil if background routine is active.
done chan struct{} // Non-nil if background routine is active.
lastHead uint64 // The latest announced chain head (whose tx indexes are assumed created)
lastTail = rawdb.ReadTxIndexTail(indexer.db) // The oldest indexed block, nil means nothing indexed
stop chan struct{} // Non-nil if background routine is active.
done chan struct{} // Non-nil if background routine is active.
lastHead uint64 // The latest announced chain head (whose tx indexes are assumed created)

headCh = make(chan ChainEvent)
sub = chain.SubscribeChainAcceptedEvent(headCh)
Expand All @@ -145,7 +136,9 @@ func (indexer *txIndexer) loop(chain *BlockChain) {
done = make(chan struct{})
lastHead = head.Number.Uint64()
indexer.chain.wg.Add(1)
go indexer.run(rawdb.ReadTxIndexTail(indexer.db), head.Number.Uint64(), stop, done)
go func() {
indexer.lockedRun(head.Number.Uint64(), stop, done)
}()
}
for {
select {
Expand All @@ -159,15 +152,16 @@ func (indexer *txIndexer) loop(chain *BlockChain) {
stop = make(chan struct{})
done = make(chan struct{})
indexer.chain.wg.Add(1)
go indexer.run(rawdb.ReadTxIndexTail(indexer.db), head.Block.NumberU64(), stop, done)
go func() {
indexer.lockedRun(headNum, stop, done)
}()
}
lastHead = head.Block.NumberU64()
case <-done:
stop = nil
done = nil
lastTail = rawdb.ReadTxIndexTail(indexer.db)
case ch := <-indexer.progress:
ch <- indexer.report(lastHead, lastTail)
ch <- indexer.report(lastHead, rawdb.ReadTxIndexTail(indexer.db))
case ch := <-indexer.term:
if stop != nil {
close(stop)
Expand Down Expand Up @@ -213,3 +207,12 @@ func (indexer *txIndexer) close() {
case <-indexer.closed:
}
}

// lockedRun runs the indexing/unindexing task in a locked manner. It reads
// the current tail index from the database.
func (indexer *txIndexer) lockedRun(head uint64, stop chan struct{}, done chan struct{}) {
indexer.chain.txIndexTailLock.Lock()
indexer.run(rawdb.ReadTxIndexTail(indexer.db), head, stop, done)
indexer.chain.txIndexTailLock.Unlock()
indexer.chain.wg.Done()
}

0 comments on commit e407ce4

Please sign in to comment.