From e407ce4b710f1aad1a4abddefa03ffaf2569ffce Mon Sep 17 00:00:00 2001 From: Ceyhun Onur Date: Mon, 2 Sep 2024 17:32:57 +0300 Subject: [PATCH] Tail lock revisited (#1328) * avoid modifying the pointer * improve readability * prevent race in tail reading * improve repairTxIndexTail readability * use shared func --- core/blockchain.go | 16 +++++----------- core/txindexer.go | 47 ++++++++++++++++++++++++---------------------- 2 files changed, 30 insertions(+), 33 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index d9c81b5a20..a1272336e3 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -459,7 +459,7 @@ func NewBlockChain( // if txlookup limit is 0 (uindexing disabled), we don't need to repair the tx index tail. if bc.cacheConfig.TransactionHistory != 0 { latestStateSynced := rawdb.GetLatestSyncPerformed(bc.db) - bc.setTxIndexTail(latestStateSynced) + bc.repairTxIndexTail(latestStateSynced) } // Start processing accepted blocks effects in the background @@ -2124,7 +2124,7 @@ func (bc *BlockChain) ResetToStateSyncedBlock(block *types.Block) error { // if txlookup limit is 0 (uindexing disabled), we don't need to repair the tx index tail. if bc.cacheConfig.TransactionHistory != 0 { - bc.setTxIndexTail(block.NumberU64()) + bc.repairTxIndexTail(block.NumberU64()) } // Update all in-memory chain markers @@ -2160,18 +2160,12 @@ func (bc *BlockChain) CacheConfig() *CacheConfig { return bc.cacheConfig } -func (bc *BlockChain) setTxIndexTail(newTail uint64) error { +func (bc *BlockChain) repairTxIndexTail(newTail uint64) error { bc.txIndexTailLock.Lock() defer bc.txIndexTailLock.Unlock() - tailP := rawdb.ReadTxIndexTail(bc.db) - var tailV uint64 - if tailP != nil { - tailV = *tailP - } - - if newTail > tailV { - log.Info("Repairing tx index tail", "old", tailV, "new", newTail) + if curr := rawdb.ReadTxIndexTail(bc.db); curr == nil || *curr < newTail { + log.Info("Repairing tx index tail", "old", curr, "new", newTail) rawdb.WriteTxIndexTail(bc.db, newTail) } return nil diff --git a/core/txindexer.go b/core/txindexer.go index 85e492b06a..178f31d05f 100644 --- a/core/txindexer.go +++ b/core/txindexer.go @@ -85,12 +85,9 @@ func newTxIndexer(limit uint64, chain *BlockChain) *txIndexer { // possible, the done channel will be closed once the task is finished. func (indexer *txIndexer) run(tail *uint64, head uint64, stop chan struct{}, done chan struct{}) { start := time.Now() - indexer.chain.txIndexTailLock.Lock() defer func() { txUnindexTimer.Inc(time.Since(start).Milliseconds()) - indexer.chain.txIndexTailLock.Unlock() close(done) - indexer.chain.wg.Done() }() // Short circuit if chain is empty and nothing to index. @@ -99,13 +96,15 @@ func (indexer *txIndexer) run(tail *uint64, head uint64, stop chan struct{}, don } // Defensively ensure tail is not nil. - if tail == nil { - tail = new(uint64) + tailValue := uint64(0) + if tail != nil { + // use intermediate variable to avoid modifying the pointer + tailValue = *tail } - if head-indexer.limit+1 >= *tail { + if head-indexer.limit+1 >= tailValue { // Unindex a part of stale indices and forward index tail to HEAD-limit - rawdb.UnindexTransactions(indexer.db, *tail, head-indexer.limit+1, stop, false) + rawdb.UnindexTransactions(indexer.db, tailValue, head-indexer.limit+1, stop, false) } } @@ -113,19 +112,11 @@ func (indexer *txIndexer) run(tail *uint64, head uint64, stop chan struct{}, don // on the received chain event. func (indexer *txIndexer) loop(chain *BlockChain) { defer close(indexer.closed) - - // If the user just upgraded to a new version which supports transaction - // index pruning, write the new tail and remove anything older. - if rawdb.ReadTxIndexTail(indexer.db) == nil { - rawdb.WriteTxIndexTail(indexer.db, 0) - } - // Listening to chain events and manipulate the transaction indexes. var ( - stop chan struct{} // Non-nil if background routine is active. - done chan struct{} // Non-nil if background routine is active. - lastHead uint64 // The latest announced chain head (whose tx indexes are assumed created) - lastTail = rawdb.ReadTxIndexTail(indexer.db) // The oldest indexed block, nil means nothing indexed + stop chan struct{} // Non-nil if background routine is active. + done chan struct{} // Non-nil if background routine is active. + lastHead uint64 // The latest announced chain head (whose tx indexes are assumed created) headCh = make(chan ChainEvent) sub = chain.SubscribeChainAcceptedEvent(headCh) @@ -145,7 +136,9 @@ func (indexer *txIndexer) loop(chain *BlockChain) { done = make(chan struct{}) lastHead = head.Number.Uint64() indexer.chain.wg.Add(1) - go indexer.run(rawdb.ReadTxIndexTail(indexer.db), head.Number.Uint64(), stop, done) + go func() { + indexer.lockedRun(head.Number.Uint64(), stop, done) + }() } for { select { @@ -159,15 +152,16 @@ func (indexer *txIndexer) loop(chain *BlockChain) { stop = make(chan struct{}) done = make(chan struct{}) indexer.chain.wg.Add(1) - go indexer.run(rawdb.ReadTxIndexTail(indexer.db), head.Block.NumberU64(), stop, done) + go func() { + indexer.lockedRun(headNum, stop, done) + }() } lastHead = head.Block.NumberU64() case <-done: stop = nil done = nil - lastTail = rawdb.ReadTxIndexTail(indexer.db) case ch := <-indexer.progress: - ch <- indexer.report(lastHead, lastTail) + ch <- indexer.report(lastHead, rawdb.ReadTxIndexTail(indexer.db)) case ch := <-indexer.term: if stop != nil { close(stop) @@ -213,3 +207,12 @@ func (indexer *txIndexer) close() { case <-indexer.closed: } } + +// lockedRun runs the indexing/unindexing task in a locked manner. It reads +// the current tail index from the database. +func (indexer *txIndexer) lockedRun(head uint64, stop chan struct{}, done chan struct{}) { + indexer.chain.txIndexTailLock.Lock() + indexer.run(rawdb.ReadTxIndexTail(indexer.db), head, stop, done) + indexer.chain.txIndexTailLock.Unlock() + indexer.chain.wg.Done() +}