Skip to content

Commit

Permalink
fix: missing reorged blocks (#250)
Browse files Browse the repository at this point in the history
* fix: add `added blocks` to ChainEvent if reorg happened

* chore: fix unit test failed

* chore: move read/write store internal transaction flag to `accessors_chain`

* chore: add `shouldStoreInternalTxs` into blockchain and use it to check if internal txs should be stored into db or not

* chore: backup dirty accounts to db when blockchain is stop

* chore: change struct of storing latest dirty accounts to make sure canonical or sidechain blockhash can be found
  • Loading branch information
DNK90 authored Apr 3, 2023
1 parent 24e2aa2 commit bf13c3e
Show file tree
Hide file tree
Showing 12 changed files with 408 additions and 78 deletions.
4 changes: 4 additions & 0 deletions cmd/ronin/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"bufio"
"errors"
"fmt"
"github.com/ethereum/go-ethereum/core/rawdb"
"math/big"
"os"
"reflect"
Expand Down Expand Up @@ -161,6 +162,9 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) {
}
backend, eth := utils.RegisterEthService(stack, &cfg.Eth)

// store internal transaction is enabled flag
rawdb.WriteStoreInternalTransactionsEnabled(backend.ChainDb(), ctx.GlobalBool(utils.StoreInternalTransactions.Name))

// Configure catalyst.
if ctx.GlobalBool(utils.CatalystFlag.Name) {
if eth == nil {
Expand Down
1 change: 1 addition & 0 deletions cmd/ronin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ var (
configFileFlag,
utils.CatalystFlag,
utils.MonitorDoubleSign,
utils.StoreInternalTransactions,
}

rpcFlags = []cli.Flag{
Expand Down
1 change: 1 addition & 0 deletions cmd/ronin/usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ var AppHelpFlagGroups = []flags.FlagGroup{
utils.WhitelistFlag,
utils.ForceOverrideChainConfigFlag,
utils.MonitorDoubleSign,
utils.StoreInternalTransactions,
},
},
{
Expand Down
4 changes: 4 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,10 @@ var (
Name: "monitor.doublesign",
Usage: "Enable double sign monitoring",
}
StoreInternalTransactions = cli.BoolFlag{
Name: "internaltxs",
Usage: "Enable storing internal transactions to db",
}
LightKDFFlag = cli.BoolFlag{
Name: "lightkdf",
Usage: "Reduce key-derivation RAM & CPU usage at some expense of KDF strength",
Expand Down
108 changes: 77 additions & 31 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,15 @@ var (
)

const (
bodyCacheLimit = 256
blockCacheLimit = 256
receiptsCacheLimit = 32
txLookupCacheLimit = 1024
maxFutureBlocks = 256
maxTimeFutureBlocks = 30
TriesInMemory = 128
bodyCacheLimit = 256
blockCacheLimit = 256
receiptsCacheLimit = 32
txLookupCacheLimit = 1024
maxFutureBlocks = 256
maxTimeFutureBlocks = 30
TriesInMemory = 128
dirtyAccountsCacheLimit = 32
internalTxsCacheLimit = 32

// BlockChainVersion ensures that an incompatible database forces a resync from scratch.
//
Expand Down Expand Up @@ -196,13 +198,15 @@ type BlockChain struct {
currentBlock atomic.Value // Current head of the block chain
currentFastBlock atomic.Value // Current head of the fast-sync chain (may be above the block chain!)

stateCache state.Database // State database to reuse between imports (contains state cache)
bodyCache *lru.Cache // Cache for the most recent block bodies
bodyRLPCache *lru.Cache // Cache for the most recent block bodies in RLP encoded format
receiptsCache *lru.Cache // Cache for the most recent receipts per block
blockCache *lru.Cache // Cache for the most recent entire blocks
txLookupCache *lru.Cache // Cache for the most recent transaction lookup data.
futureBlocks *lru.Cache // future blocks are blocks added for later processing
stateCache state.Database // State database to reuse between imports (contains state cache)
bodyCache *lru.Cache // Cache for the most recent block bodies
bodyRLPCache *lru.Cache // Cache for the most recent block bodies in RLP encoded format
receiptsCache *lru.Cache // Cache for the most recent receipts per block
blockCache *lru.Cache // Cache for the most recent entire blocks
txLookupCache *lru.Cache // Cache for the most recent transaction lookup data.
futureBlocks *lru.Cache // future blocks are blocks added for later processing
dirtyAccountsCache *lru.Cache // Cache for the most recent dirtyAccounts
internalTransactionsCache *lru.Cache // Cache for most recent internal transactions with block hash at key

wg sync.WaitGroup //
quit chan struct{} // shutdown signal, closed in Stop.
Expand All @@ -215,7 +219,8 @@ type BlockChain struct {
processor Processor // Block transaction processor interface
vmConfig vm.Config

shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block.
shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block.
shouldStoreInternalTxs bool
}

// NewBlockChain returns a fully initialised block chain using information
Expand All @@ -231,6 +236,8 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
blockCache, _ := lru.New(blockCacheLimit)
txLookupCache, _ := lru.New(txLookupCacheLimit)
futureBlocks, _ := lru.New(maxFutureBlocks)
dirtyAccountsCache, _ := lru.New(dirtyAccountsCacheLimit)
internalTxsCache, _ := lru.New(internalTxsCacheLimit)

bc := &BlockChain{
chainConfig: chainConfig,
Expand All @@ -242,17 +249,20 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
Journal: cacheConfig.TrieCleanJournal,
Preimages: cacheConfig.Preimages,
}),
quit: make(chan struct{}),
chainmu: syncx.NewClosableMutex(),
shouldPreserve: shouldPreserve,
bodyCache: bodyCache,
bodyRLPCache: bodyRLPCache,
receiptsCache: receiptsCache,
blockCache: blockCache,
txLookupCache: txLookupCache,
futureBlocks: futureBlocks,
engine: engine,
vmConfig: vmConfig,
quit: make(chan struct{}),
chainmu: syncx.NewClosableMutex(),
shouldPreserve: shouldPreserve,
bodyCache: bodyCache,
bodyRLPCache: bodyRLPCache,
receiptsCache: receiptsCache,
blockCache: blockCache,
txLookupCache: txLookupCache,
dirtyAccountsCache: dirtyAccountsCache,
internalTransactionsCache: internalTxsCache,
futureBlocks: futureBlocks,
engine: engine,
vmConfig: vmConfig,
shouldStoreInternalTxs: rawdb.ReadStoreInternalTransactionsEnabled(db),
}
bc.validator = NewBlockValidator(chainConfig, bc, engine)
bc.prefetcher = newStatePrefetcher(chainConfig, bc, engine)
Expand Down Expand Up @@ -406,9 +416,20 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
triedb.SaveCachePeriodically(bc.cacheConfig.TrieCleanJournal, bc.cacheConfig.TrieCleanRejournal, bc.quit)
}()
}

// load the latest dirty accounts stored from last stop to cache
bc.loadLatestDirtyAccounts()

return bc, nil
}

func (bc *BlockChain) loadLatestDirtyAccounts() {
dirtyStateAccounts := rawdb.ReadDirtyAccounts(bc.db)
for _, data := range dirtyStateAccounts {
bc.dirtyAccountsCache.Add(data.BlockHash, data.DirtyAccounts)
}
}

func (bc *BlockChain) StartDoubleSignMonitor() {
log.Info("Starting double sign monitor")
doubleSignMonitor, err := monitor.NewDoubleSignMonitor()
Expand Down Expand Up @@ -821,6 +842,19 @@ func (bc *BlockChain) Stop() {
// Unsubscribe all subscriptions registered from blockchain.
bc.scope.Close()

// store cached dirty accounts to db
dirtyStateAccounts := make([]*types.DirtyStateAccountsAndBlock, 0)
for _, blockHash := range bc.dirtyAccountsCache.Keys() {
dirtyAccounts, _ := bc.dirtyAccountsCache.Get(blockHash)
dirtyStateAccounts = append(dirtyStateAccounts, &types.DirtyStateAccountsAndBlock{
BlockHash: blockHash.(common.Hash),
DirtyAccounts: dirtyAccounts.([]*types.DirtyStateAccount),
})
}
if len(dirtyStateAccounts) > 0 {
rawdb.WriteDirtyAccounts(bc.db, dirtyStateAccounts)
}

// Ensure that the entirety of the state snapshot is journalled to disk.
var snapBase common.Hash
if bc.snaps != nil {
Expand Down Expand Up @@ -1448,7 +1482,7 @@ func (bc *BlockChain) InsertChainWithoutSealVerification(block *types.Block) (in
return 0, errChainStopped
}
defer bc.chainmu.Unlock()
return bc.insertChain(types.Blocks([]*types.Block{block}), false)
return bc.insertChain([]*types.Block{block}, false)
}

// insertChain is the internal implementation of InsertChain, which assumes that
Expand Down Expand Up @@ -1669,8 +1703,9 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
return it.index, err
}

// send internalTxs events
// store internal txs to db and send them to internalTxFeed
if len(internalTxs) > 0 {
bc.WriteInternalTransactions(block.Hash(), internalTxs)
bc.internalTxFeed.Send(internalTxs)
}

Expand Down Expand Up @@ -1712,6 +1747,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er

if dirtyAccounts != nil {
bc.dirtyAccountFeed.Send(dirtyAccounts)
bc.dirtyAccountsCache.Add(block.Hash(), dirtyAccounts)
}

// Update the metrics touched during block commit
Expand Down Expand Up @@ -1915,10 +1951,10 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
// collectLogs collects the logs that were generated or removed during
// the processing of the block that corresponds with the given hash.
// These logs are later announced as deleted or reborn
collectLogs = func(hash common.Hash, removed bool) {
collectLogs = func(hash common.Hash, removed bool) (types.Receipts, []*types.Log) {
number := bc.hc.GetBlockNumber(hash)
if number == nil {
return
return nil, nil
}
receipts := rawdb.ReadReceipts(bc.db, hash, *number, bc.chainConfig)

Expand All @@ -1939,6 +1975,7 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
rebirthLogs = append(rebirthLogs, logs)
}
}
return receipts, logs
}
// mergeLogs returns a merged log slice with specified sort order.
mergeLogs = func(logs [][]*types.Log, reverse bool) []*types.Log {
Expand Down Expand Up @@ -2023,7 +2060,16 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
bc.writeHeadBlock(newChain[i])

// Collect reborn logs due to chain reorg
collectLogs(newChain[i].Hash(), false)
receipts, logs := collectLogs(newChain[i].Hash(), false)

// get dirty accounts
dirtyAccounts := bc.ReadDirtyAccounts(newChain[i].Hash())

// get internal transactions
internalTxs := bc.ReadInternalTransactions(newChain[i].Hash())

log.Info("send new block event due to reorg", "height", newChain[i].NumberU64(), "txs", len(newChain[i].Transactions()), "logs", len(logs))
bc.chainFeed.Send(ChainEvent{Block: newChain[i], Hash: newChain[i].Hash(), Logs: logs, InternalTxs: internalTxs, DirtyAccounts: dirtyAccounts, Receipts: receipts})

// Collect the new added transactions.
addedTxs = append(addedTxs, newChain[i].Transactions()...)
Expand Down
92 changes: 64 additions & 28 deletions core/blockchain_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,38 @@ func (bc *BlockChain) SubscribeDirtyAccountEvent(ch chan<- []*types.DirtyStateAc
return bc.scope.Track(bc.dirtyAccountFeed.Subscribe(ch))
}

func (bc *BlockChain) WriteInternalTransactions(hash common.Hash, internalTxs []*types.InternalTransaction) {
// cache first
bc.internalTransactionsCache.Add(hash, internalTxs)

// check if store internal transactions is enabled or not
if !bc.shouldStoreInternalTxs {
return
}
rawdb.WriteInternalTransactions(bc.db, hash, internalTxs)
}

func (bc *BlockChain) ReadInternalTransactions(hash common.Hash) []*types.InternalTransaction {
// get internal txs from cache
if internalTxs, exist := bc.internalTransactionsCache.Get(hash); exist {
return internalTxs.([]*types.InternalTransaction)
}
// otherwise get from db
internalTxs := rawdb.ReadInternalTransactions(bc.db, hash)
if internalTxs == nil {
return nil
}
bc.internalTransactionsCache.Add(hash, internalTxs)
return internalTxs
}

func (bc *BlockChain) ReadDirtyAccounts(hash common.Hash) []*types.DirtyStateAccount {
if dirtyAccount, _ := bc.dirtyAccountsCache.Get(hash); dirtyAccount != nil {
return dirtyAccount.([]*types.DirtyStateAccount)
}
return nil
}

func (bc *BlockChain) OpEvents() []*vm.PublishEvent {
return []*vm.PublishEvent{
{
Expand Down Expand Up @@ -427,20 +459,22 @@ func (tx *InternalTransferOrSmcCallEvent) Publish(
err error,
) *types.InternalTransaction {
internal := &types.InternalTransaction{
Opcode: opcode.String(),
Order: order,
TransactionHash: hash,
Type: types.InternalTransactionContractCall,
Value: value,
Input: input,
Output: output,
From: from,
To: to,
Success: err == nil,
Error: "",
Height: blockHeight,
BlockHash: blockHash,
BlockTime: blockTime,
Opcode: opcode.String(),
Type: types.InternalTransactionContractCall,
Success: err == nil,
Error: "",
InternalTransactionBody: &types.InternalTransactionBody{
Order: order,
TransactionHash: hash,
Value: value,
Input: input,
Output: output,
From: from,
To: to,
Height: blockHeight,
BlockHash: blockHash,
BlockTime: blockTime,
},
}
if value != nil && value.Cmp(big.NewInt(0)) > 0 && (input == nil || len(input) == 0) {
internal.Type = types.InternalTransactionTransfer
Expand All @@ -465,20 +499,22 @@ func (tx *InternalTransactionContractCreation) Publish(
err error,
) *types.InternalTransaction {
internal := &types.InternalTransaction{
Opcode: opcode.String(),
Order: order,
TransactionHash: hash,
Type: types.InternalTransactionContractCreation,
Value: value,
Input: input,
Output: output,
From: from,
To: to,
Success: err == nil,
Error: "",
Height: blockHeight,
BlockHash: blockHash,
BlockTime: blockTime,
Opcode: opcode.String(),
Type: types.InternalTransactionContractCreation,
Success: err == nil,
Error: "",
InternalTransactionBody: &types.InternalTransactionBody{
Order: order,
TransactionHash: hash,
Value: value,
Input: input,
Output: output,
From: from,
To: to,
Height: blockHeight,
BlockHash: blockHash,
BlockTime: blockTime,
},
}
if err != nil {
internal.Error = err.Error()
Expand Down
Loading

0 comments on commit bf13c3e

Please sign in to comment.