Skip to content

Commit

Permalink
fix: add added blocks to ChainEvent if reorg happened
Browse files Browse the repository at this point in the history
  • Loading branch information
DNK90 committed Mar 29, 2023
1 parent b1f9b34 commit 41abbca
Show file tree
Hide file tree
Showing 10 changed files with 241 additions and 62 deletions.
1 change: 1 addition & 0 deletions cmd/ronin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ var (
configFileFlag,
utils.CatalystFlag,
utils.MonitorDoubleSign,
utils.StoreInternalTransactions,
}

rpcFlags = []cli.Flag{
Expand Down
1 change: 1 addition & 0 deletions cmd/ronin/usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ var AppHelpFlagGroups = []flags.FlagGroup{
utils.WhitelistFlag,
utils.ForceOverrideChainConfigFlag,
utils.MonitorDoubleSign,
utils.StoreInternalTransactions,
},
},
{
Expand Down
4 changes: 4 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,10 @@ var (
Name: "monitor.doublesign",
Usage: "Enable double sign monitoring",
}
StoreInternalTransactions = cli.BoolFlag{
Name: "internaltxs",
Usage: "Enable storing internal transactions to db",
}
LightKDFFlag = cli.BoolFlag{
Name: "lightkdf",
Usage: "Reduce key-derivation RAM & CPU usage at some expense of KDF strength",
Expand Down
78 changes: 49 additions & 29 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,15 @@ var (
)

const (
bodyCacheLimit = 256
blockCacheLimit = 256
receiptsCacheLimit = 32
txLookupCacheLimit = 1024
maxFutureBlocks = 256
maxTimeFutureBlocks = 30
TriesInMemory = 128
bodyCacheLimit = 256
blockCacheLimit = 256
receiptsCacheLimit = 32
txLookupCacheLimit = 1024
maxFutureBlocks = 256
maxTimeFutureBlocks = 30
TriesInMemory = 128
dirtyAccountsCacheLimit = 32
internalTxsCacheLimit = 32

// BlockChainVersion ensures that an incompatible database forces a resync from scratch.
//
Expand Down Expand Up @@ -196,13 +198,15 @@ type BlockChain struct {
currentBlock atomic.Value // Current head of the block chain
currentFastBlock atomic.Value // Current head of the fast-sync chain (may be above the block chain!)

stateCache state.Database // State database to reuse between imports (contains state cache)
bodyCache *lru.Cache // Cache for the most recent block bodies
bodyRLPCache *lru.Cache // Cache for the most recent block bodies in RLP encoded format
receiptsCache *lru.Cache // Cache for the most recent receipts per block
blockCache *lru.Cache // Cache for the most recent entire blocks
txLookupCache *lru.Cache // Cache for the most recent transaction lookup data.
futureBlocks *lru.Cache // future blocks are blocks added for later processing
stateCache state.Database // State database to reuse between imports (contains state cache)
bodyCache *lru.Cache // Cache for the most recent block bodies
bodyRLPCache *lru.Cache // Cache for the most recent block bodies in RLP encoded format
receiptsCache *lru.Cache // Cache for the most recent receipts per block
blockCache *lru.Cache // Cache for the most recent entire blocks
txLookupCache *lru.Cache // Cache for the most recent transaction lookup data.
futureBlocks *lru.Cache // future blocks are blocks added for later processing
dirtyAccountsCache *lru.Cache // Cache for the most recent dirtyAccounts
internalTransactionsCache *lru.Cache // Cache for most recent internal transactions with block hash at key

wg sync.WaitGroup //
quit chan struct{} // shutdown signal, closed in Stop.
Expand Down Expand Up @@ -231,6 +235,8 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
blockCache, _ := lru.New(blockCacheLimit)
txLookupCache, _ := lru.New(txLookupCacheLimit)
futureBlocks, _ := lru.New(maxFutureBlocks)
dirtyAccountsCache, _ := lru.New(dirtyAccountsCacheLimit)
internalTxsCache, _ := lru.New(internalTxsCacheLimit)

bc := &BlockChain{
chainConfig: chainConfig,
Expand All @@ -242,17 +248,19 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
Journal: cacheConfig.TrieCleanJournal,
Preimages: cacheConfig.Preimages,
}),
quit: make(chan struct{}),
chainmu: syncx.NewClosableMutex(),
shouldPreserve: shouldPreserve,
bodyCache: bodyCache,
bodyRLPCache: bodyRLPCache,
receiptsCache: receiptsCache,
blockCache: blockCache,
txLookupCache: txLookupCache,
futureBlocks: futureBlocks,
engine: engine,
vmConfig: vmConfig,
quit: make(chan struct{}),
chainmu: syncx.NewClosableMutex(),
shouldPreserve: shouldPreserve,
bodyCache: bodyCache,
bodyRLPCache: bodyRLPCache,
receiptsCache: receiptsCache,
blockCache: blockCache,
txLookupCache: txLookupCache,
dirtyAccountsCache: dirtyAccountsCache,
internalTransactionsCache: internalTxsCache,
futureBlocks: futureBlocks,
engine: engine,
vmConfig: vmConfig,
}
bc.validator = NewBlockValidator(chainConfig, bc, engine)
bc.prefetcher = newStatePrefetcher(chainConfig, bc, engine)
Expand Down Expand Up @@ -1669,8 +1677,9 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
return it.index, err
}

// send internalTxs events
// store internal txs to db and send them to internalTxFeed
if len(internalTxs) > 0 {
bc.WriteInternalTransactions(block.Hash(), internalTxs)
bc.internalTxFeed.Send(internalTxs)
}

Expand Down Expand Up @@ -1712,6 +1721,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er

if dirtyAccounts != nil {
bc.dirtyAccountFeed.Send(dirtyAccounts)
bc.dirtyAccountsCache.Add(block.Hash(), dirtyAccounts)
}

// Update the metrics touched during block commit
Expand Down Expand Up @@ -1915,10 +1925,10 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
// collectLogs collects the logs that were generated or removed during
// the processing of the block that corresponds with the given hash.
// These logs are later announced as deleted or reborn
collectLogs = func(hash common.Hash, removed bool) {
collectLogs = func(hash common.Hash, removed bool) (types.Receipts, []*types.Log) {
number := bc.hc.GetBlockNumber(hash)
if number == nil {
return
return nil, nil
}
receipts := rawdb.ReadReceipts(bc.db, hash, *number, bc.chainConfig)

Expand All @@ -1939,6 +1949,7 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
rebirthLogs = append(rebirthLogs, logs)
}
}
return receipts, logs
}
// mergeLogs returns a merged log slice with specified sort order.
mergeLogs = func(logs [][]*types.Log, reverse bool) []*types.Log {
Expand Down Expand Up @@ -2023,7 +2034,16 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
bc.writeHeadBlock(newChain[i])

// Collect reborn logs due to chain reorg
collectLogs(newChain[i].Hash(), false)
receipts, logs := collectLogs(newChain[i].Hash(), false)

// get dirty accounts
dirtyAccounts := bc.ReadDirtyAccounts(newChain[i].Hash())

// get internal transactions
internalTxs := bc.ReadInternalTransactions(newChain[i].Hash())

log.Info("send new block event due to reorg", "height", newChain[i].NumberU64(), "txs", len(newChain[i].Transactions()), "logs", len(logs))
bc.chainFeed.Send(ChainEvent{Block: newChain[i], Hash: newChain[i].Hash(), Logs: logs, InternalTxs: internalTxs, DirtyAccounts: dirtyAccounts, Receipts: receipts})

// Collect the new added transactions.
addedTxs = append(addedTxs, newChain[i].Transactions()...)
Expand Down
92 changes: 64 additions & 28 deletions core/blockchain_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,38 @@ func (bc *BlockChain) SubscribeDirtyAccountEvent(ch chan<- []*types.DirtyStateAc
return bc.scope.Track(bc.dirtyAccountFeed.Subscribe(ch))
}

func (bc *BlockChain) WriteInternalTransactions(hash common.Hash, internalTxs []*types.InternalTransaction) {
// cache first
bc.internalTransactionsCache.Add(hash, internalTxs)

// check if store internal transactions is enabled or not
if !rawdb.ReadStoreInternalTransactionsEnabled(bc.db) {
return
}
rawdb.WriteInternalTransactions(bc.db, hash, internalTxs)
}

func (bc *BlockChain) ReadInternalTransactions(hash common.Hash) []*types.InternalTransaction {
// get internal txs from cache
if internalTxs, exist := bc.internalTransactionsCache.Get(hash); exist {
return internalTxs.([]*types.InternalTransaction)
}
// otherwise get from db
internalTxs := rawdb.ReadInternalTransactions(bc.db, hash)
if internalTxs == nil {
return nil
}
bc.internalTransactionsCache.Add(hash, internalTxs)
return internalTxs
}

func (bc *BlockChain) ReadDirtyAccounts(hash common.Hash) []*types.DirtyStateAccount {
if dirtyAccount, _ := bc.dirtyAccountsCache.Get(hash); dirtyAccount != nil {
return dirtyAccount.([]*types.DirtyStateAccount)
}
return nil
}

func (bc *BlockChain) OpEvents() []*vm.PublishEvent {
return []*vm.PublishEvent{
{
Expand Down Expand Up @@ -427,20 +459,22 @@ func (tx *InternalTransferOrSmcCallEvent) Publish(
err error,
) *types.InternalTransaction {
internal := &types.InternalTransaction{
Opcode: opcode.String(),
Order: order,
TransactionHash: hash,
Type: types.InternalTransactionContractCall,
Value: value,
Input: input,
Output: output,
From: from,
To: to,
Success: err == nil,
Error: "",
Height: blockHeight,
BlockHash: blockHash,
BlockTime: blockTime,
Opcode: opcode.String(),
Type: types.InternalTransactionContractCall,
Success: err == nil,
Error: "",
InternalTransactionBody: &types.InternalTransactionBody{
Order: order,
TransactionHash: hash,
Value: value,
Input: input,
Output: output,
From: from,
To: to,
Height: blockHeight,
BlockHash: blockHash,
BlockTime: blockTime,
},
}
if value != nil && value.Cmp(big.NewInt(0)) > 0 && (input == nil || len(input) == 0) {
internal.Type = types.InternalTransactionTransfer
Expand All @@ -465,20 +499,22 @@ func (tx *InternalTransactionContractCreation) Publish(
err error,
) *types.InternalTransaction {
internal := &types.InternalTransaction{
Opcode: opcode.String(),
Order: order,
TransactionHash: hash,
Type: types.InternalTransactionContractCreation,
Value: value,
Input: input,
Output: output,
From: from,
To: to,
Success: err == nil,
Error: "",
Height: blockHeight,
BlockHash: blockHash,
BlockTime: blockTime,
Opcode: opcode.String(),
Type: types.InternalTransactionContractCreation,
Success: err == nil,
Error: "",
InternalTransactionBody: &types.InternalTransactionBody{
Order: order,
TransactionHash: hash,
Value: value,
Input: input,
Output: output,
From: from,
To: to,
Height: blockHeight,
BlockHash: blockHash,
BlockTime: blockTime,
},
}
if err != nil {
internal.Error = err.Error()
Expand Down
26 changes: 26 additions & 0 deletions core/rawdb/accessors_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,32 @@ func DeleteBody(db ethdb.KeyValueWriter, hash common.Hash, number uint64) {
}
}

// ReadInternalTransactions retrieves the internal transactions corresponding to the hash.
func ReadInternalTransactions(db ethdb.Reader, hash common.Hash) []*types.InternalTransaction {
// If not, try reading from leveldb
data, _ := db.Get(internalTxsKey(hash))
if len(data) == 0 {
return nil
}
var internalTxs []*types.InternalTransaction
if err := rlp.Decode(bytes.NewReader(data), &internalTxs); err != nil {
log.Error("Invalid internal transactions RLP", "hash", hash, "err", err)
return nil
}
return internalTxs
}

// WriteInternalTransactions stores internal transactions into the database.
func WriteInternalTransactions(db ethdb.KeyValueWriter, hash common.Hash, internalTxs []*types.InternalTransaction) {
data, err := rlp.EncodeToBytes(internalTxs)
if err != nil {
log.Crit("Failed to RLP encode internal txs", "err", err)
}
if err := db.Put(internalTxsKey(hash), data); err != nil {
log.Crit("Failed to store internal txs", "err", err)
}
}

// ReadTdRLP retrieves a block's total difficulty corresponding to the hash in RLP encoding.
func ReadTdRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue {
var data []byte
Expand Down
61 changes: 61 additions & 0 deletions core/rawdb/accessors_chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"os"
"reflect"
"testing"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
Expand Down Expand Up @@ -883,3 +884,63 @@ func BenchmarkDecodeRLPLogs(b *testing.B) {
}
})
}

func TestReadWriteInternalTransactions(t *testing.T) {
db := NewMemoryDatabase()
blockTime := time.Now().Unix()
blockHash := common.HexToHash("0x3")
internalTxs := []*types.InternalTransaction{
{
Opcode: "CALL",
Type: "call",
Success: true,
Error: "",
InternalTransactionBody: &types.InternalTransactionBody{
Order: 1,
TransactionHash: common.HexToHash("0x4"),
Value: nil,
Input: nil,
Output: nil,
From: common.HexToAddress("0x1"),
To: common.HexToAddress("0x2"),
Height: 100,
BlockHash: blockHash,
BlockTime: uint64(blockTime),
},
},
{
Opcode: "CALL",
Type: "call",
Success: true,
Error: "",
InternalTransactionBody: &types.InternalTransactionBody{
Order: 2,
TransactionHash: common.HexToHash("0x4"),
Value: nil,
Input: nil,
Output: nil,
From: common.HexToAddress("0x3"),
To: common.HexToAddress("0x4"),
Height: 100,
BlockHash: blockHash,
BlockTime: uint64(blockTime),
},
},
}
WriteInternalTransactions(db, blockHash, internalTxs)
results := ReadInternalTransactions(db, blockHash)
if results == nil {
t.Fatal("no internal transactions found at hash")
}
if len(results) != len(internalTxs) {
t.Fatalf("mismatched length between input and output internal transactions, expected %d got %d", len(internalTxs), len(results))
}
for i, tx := range internalTxs {
if tx.Hash().Hex() != results[i].Hash().Hex() {
t.Fatalf("mismatched hash at index %d, expected %s got %s", i, tx.Hash().Hex(), results[i].Hash().Hex())
}
if tx.Opcode != results[i].Opcode {
t.Fatalf("mismatched Opcode at index %d, expected %s got %s", i, tx.Opcode, results[i].Opcode)
}
}
}
Loading

0 comments on commit 41abbca

Please sign in to comment.