Skip to content

Commit

Permalink
Block Acceptor from Flattening during Verification (#330)
Browse files Browse the repository at this point in the history
* add more timing metrics

* nits

* add trie ops

* migrate to counters

* add logging for trie fallback

* nits

* log correct error

* limit error logging

* fix acceptor

* add logging for loading cache

* nits
  • Loading branch information
patrick-ogrady authored Nov 2, 2022
1 parent bc4f2f6 commit 59a40c1
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 41 deletions.
115 changes: 74 additions & 41 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,28 +56,34 @@ import (
)

var (
accountReadTimer = metrics.NewRegisteredTimer("chain/account/reads", nil)
accountHashTimer = metrics.NewRegisteredTimer("chain/account/hashes", nil)
accountUpdateTimer = metrics.NewRegisteredTimer("chain/account/updates", nil)
accountCommitTimer = metrics.NewRegisteredTimer("chain/account/commits", nil)
storageReadTimer = metrics.NewRegisteredTimer("chain/storage/reads", nil)
storageHashTimer = metrics.NewRegisteredTimer("chain/storage/hashes", nil)
storageUpdateTimer = metrics.NewRegisteredTimer("chain/storage/updates", nil)
storageCommitTimer = metrics.NewRegisteredTimer("chain/storage/commits", nil)
snapshotAccountReadTimer = metrics.NewRegisteredTimer("chain/snapshot/account/reads", nil)
snapshotStorageReadTimer = metrics.NewRegisteredTimer("chain/snapshot/storage/reads", nil)
snapshotCommitTimer = metrics.NewRegisteredTimer("chain/snapshot/commits", nil)
triedbCommitTimer = metrics.NewRegisteredTimer("chain/triedb/commits", nil)

blockInsertTimer = metrics.NewRegisteredTimer("chain/inserts", nil)
blockValidationTimer = metrics.NewRegisteredTimer("chain/validation", nil)
blockExecutionTimer = metrics.NewRegisteredTimer("chain/execution", nil)
blockWriteTimer = metrics.NewRegisteredTimer("chain/write", nil)
accountReadTimer = metrics.NewRegisteredCounter("chain/account/reads", nil)
accountHashTimer = metrics.NewRegisteredCounter("chain/account/hashes", nil)
accountUpdateTimer = metrics.NewRegisteredCounter("chain/account/updates", nil)
accountCommitTimer = metrics.NewRegisteredCounter("chain/account/commits", nil)
storageReadTimer = metrics.NewRegisteredCounter("chain/storage/reads", nil)
storageHashTimer = metrics.NewRegisteredCounter("chain/storage/hashes", nil)
storageUpdateTimer = metrics.NewRegisteredCounter("chain/storage/updates", nil)
storageCommitTimer = metrics.NewRegisteredCounter("chain/storage/commits", nil)
snapshotAccountReadTimer = metrics.NewRegisteredCounter("chain/snapshot/account/reads", nil)
snapshotStorageReadTimer = metrics.NewRegisteredCounter("chain/snapshot/storage/reads", nil)
snapshotCommitTimer = metrics.NewRegisteredCounter("chain/snapshot/commits", nil)
triedbCommitTimer = metrics.NewRegisteredCounter("chain/triedb/commits", nil)

blockInsertTimer = metrics.NewRegisteredCounter("chain/block/inserts", nil)
blockInsertCount = metrics.NewRegisteredCounter("chain/block/inserts/count", nil)
blockContentValidationTimer = metrics.NewRegisteredCounter("chain/block/validations/content", nil)
blockStateInitTimer = metrics.NewRegisteredCounter("chain/block/inits/state", nil)
blockExecutionTimer = metrics.NewRegisteredCounter("chain/block/executions", nil)
blockTrieOpsTimer = metrics.NewRegisteredCounter("chain/block/trie", nil)
blockStateValidationTimer = metrics.NewRegisteredCounter("chain/block/validations/state", nil)
blockWriteTimer = metrics.NewRegisteredCounter("chain/block/writes", nil)

acceptorQueueGauge = metrics.NewRegisteredGauge("chain/acceptor/queue/size", nil)
processedBlockGasUsedCounter = metrics.NewRegisteredCounter("chain/blocks/gas/used/processed", nil)
acceptedBlockGasUsedCounter = metrics.NewRegisteredCounter("chain/blocks/gas/used/accepted", nil)
badBlockCounter = metrics.NewRegisteredCounter("chain/blocks/bad/count", nil)
acceptorWorkTimer = metrics.NewRegisteredCounter("chain/acceptor/work", nil)
acceptorWorkCount = metrics.NewRegisteredCounter("chain/acceptor/work/count", nil)
processedBlockGasUsedCounter = metrics.NewRegisteredCounter("chain/block/gas/used/processed", nil)
acceptedBlockGasUsedCounter = metrics.NewRegisteredCounter("chain/block/gas/used/accepted", nil)
badBlockCounter = metrics.NewRegisteredCounter("chain/block/bad/count", nil)

ErrRefuseToCorruptArchiver = errors.New("node has operated with pruning disabled, shutting down to prevent missing tries")

Expand Down Expand Up @@ -263,6 +269,10 @@ type BlockChain struct {
// processed blocks. This may be equal to [lastAccepted].
acceptorTip *types.Block
acceptorTipLock sync.Mutex

// [flattenLock] prevents the [acceptor] from flattening snapshots while
// a block is being verified.
flattenLock sync.Mutex
}

// NewBlockChain returns a fully initialised block chain using information
Expand Down Expand Up @@ -410,6 +420,12 @@ func (bc *BlockChain) flattenSnapshot(postAbortWork func() error, hash common.Ha
return err
}

// Ensure we avoid flattening the snapshot while we are processing a block, or
// block execution will fallback to reading from the trie (which is much
// slower).
bc.flattenLock.Lock()
defer bc.flattenLock.Unlock()

// Flatten the entire snap Trie to disk
//
// Note: This resumes snapshot generation.
Expand All @@ -422,6 +438,7 @@ func (bc *BlockChain) startAcceptor() {
log.Info("Starting Acceptor", "queue length", bc.cacheConfig.AcceptorQueueLimit)

for next := range bc.acceptorQueue {
start := time.Now()
acceptorQueueGauge.Dec(1)

if err := bc.flattenSnapshot(func() error {
Expand Down Expand Up @@ -451,6 +468,9 @@ func (bc *BlockChain) startAcceptor() {
bc.acceptorTip = next
bc.acceptorTipLock.Unlock()
bc.acceptorWg.Done()

acceptorWorkTimer.Inc(time.Since(start).Milliseconds())
acceptorWorkCount.Inc(1)
}
}

Expand Down Expand Up @@ -1090,13 +1110,14 @@ func (bc *BlockChain) gatherBlockLogs(hash common.Hash, number uint64, removed b
}

func (bc *BlockChain) insertBlock(block *types.Block, writes bool) error {
start := time.Now()
bc.senderCacher.Recover(types.MakeSigner(bc.chainConfig, block.Number(), new(big.Int).SetUint64(block.Time())), block.Transactions())

substart := time.Now()
err := bc.engine.VerifyHeader(bc, block.Header())
if err == nil {
err = bc.validator.ValidateBody(block)
}

switch {
case errors.Is(err, ErrKnownBlock):
// even if the block is already known, we still need to generate the
Expand All @@ -1123,6 +1144,8 @@ func (bc *BlockChain) insertBlock(block *types.Block, writes bool) error {
bc.reportBlock(block, nil, err)
return err
}
blockContentValidationTimer.Inc(time.Since(substart).Milliseconds())

// No validation errors for the block
var activeState *state.StateDB
defer func() {
Expand All @@ -1135,13 +1158,21 @@ func (bc *BlockChain) insertBlock(block *types.Block, writes bool) error {
}
}()

// Retrieve the parent block and its state to execute on top
start := time.Now()
// Retrieve the parent block to determine which root to build state on
substart = time.Now()
parent := bc.GetHeader(block.ParentHash(), block.NumberU64()-1)

// Instantiate the statedb to use for processing transactions
//
// NOTE: Flattening a snapshot during block execution requires fetching state
// entries directly from the trie (much slower).
bc.flattenLock.Lock()
defer bc.flattenLock.Unlock()
statedb, err := state.New(parent.Root, bc.stateCache, bc.snaps)
if err != nil {
return err
}
blockStateInitTimer.Inc(time.Since(substart).Milliseconds())

// Enable prefetching to pull in trie node paths while processing transactions
statedb.StartPrefetcher("chain")
Expand All @@ -1150,7 +1181,7 @@ func (bc *BlockChain) insertBlock(block *types.Block, writes bool) error {
// If we have a followup block, run that against the current state to pre-cache
// transactions and probabilistically some of the account/storage trie nodes.
// Process block using the parent state as reference point
substart := time.Now()
substart = time.Now()
receipts, logs, usedGas, err := bc.processor.Process(block, parent, statedb, bc.vmConfig)
if serr := statedb.Error(); serr != nil {
log.Error("statedb error encountered", "err", serr, "number", block.Number(), "hash", block.Hash())
Expand All @@ -1161,16 +1192,17 @@ func (bc *BlockChain) insertBlock(block *types.Block, writes bool) error {
}

// Update the metrics touched during block processing
accountReadTimer.Update(statedb.AccountReads) // Account reads are complete, we can mark them
storageReadTimer.Update(statedb.StorageReads) // Storage reads are complete, we can mark them
accountUpdateTimer.Update(statedb.AccountUpdates) // Account updates are complete, we can mark them
storageUpdateTimer.Update(statedb.StorageUpdates) // Storage updates are complete, we can mark them
snapshotAccountReadTimer.Update(statedb.SnapshotAccountReads) // Account reads are complete, we can mark them
snapshotStorageReadTimer.Update(statedb.SnapshotStorageReads) // Storage reads are complete, we can mark them
triehash := statedb.AccountHashes + statedb.StorageHashes // Save to not double count in validation
accountReadTimer.Inc(statedb.AccountReads.Milliseconds()) // Account reads are complete, we can mark them
storageReadTimer.Inc(statedb.StorageReads.Milliseconds()) // Storage reads are complete, we can mark them
accountUpdateTimer.Inc(statedb.AccountUpdates.Milliseconds()) // Account updates are complete, we can mark them
storageUpdateTimer.Inc(statedb.StorageUpdates.Milliseconds()) // Storage updates are complete, we can mark them
snapshotAccountReadTimer.Inc(statedb.SnapshotAccountReads.Milliseconds()) // Account reads are complete, we can mark them
snapshotStorageReadTimer.Inc(statedb.SnapshotStorageReads.Milliseconds()) // Storage reads are complete, we can mark them
triehash := statedb.AccountHashes + statedb.StorageHashes // Save to not double count in validation
trieproc := statedb.SnapshotAccountReads + statedb.AccountReads + statedb.AccountUpdates
trieproc += statedb.SnapshotStorageReads + statedb.StorageReads + statedb.StorageUpdates
blockExecutionTimer.Update(time.Since(substart) - trieproc - triehash)
blockExecutionTimer.Inc((time.Since(substart) - trieproc - triehash).Milliseconds())
blockTrieOpsTimer.Inc((trieproc + triehash).Milliseconds())

// Validate the state using the default validator
substart = time.Now()
Expand All @@ -1180,9 +1212,9 @@ func (bc *BlockChain) insertBlock(block *types.Block, writes bool) error {
}

// Update the metrics touched during block validation
accountHashTimer.Update(statedb.AccountHashes) // Account hashes are complete, we can mark them
storageHashTimer.Update(statedb.StorageHashes) // Storage hashes are complete, we can mark them
blockValidationTimer.Update(time.Since(substart) - (statedb.AccountHashes + statedb.StorageHashes - triehash))
accountHashTimer.Inc(statedb.AccountHashes.Milliseconds()) // Account hashes are complete, we can mark them
storageHashTimer.Inc(statedb.StorageHashes.Milliseconds()) // Storage hashes are complete, we can mark them
blockStateValidationTimer.Inc((time.Since(substart) - (statedb.AccountHashes + statedb.StorageHashes - triehash)).Milliseconds())

// If [writes] are disabled, skip [writeBlockWithState] so that we do not write the block
// or the state trie to disk.
Expand All @@ -1201,12 +1233,12 @@ func (bc *BlockChain) insertBlock(block *types.Block, writes bool) error {
}

// Update the metrics touched during block commit
accountCommitTimer.Update(statedb.AccountCommits) // Account commits are complete, we can mark them
storageCommitTimer.Update(statedb.StorageCommits) // Storage commits are complete, we can mark them
snapshotCommitTimer.Update(statedb.SnapshotCommits) // Snapshot commits are complete, we can mark them
triedbCommitTimer.Update(statedb.TrieDBCommits) // Triedb commits are complete, we can mark them
blockWriteTimer.Update(time.Since(substart) - statedb.AccountCommits - statedb.StorageCommits - statedb.SnapshotCommits - statedb.TrieDBCommits)
blockInsertTimer.UpdateSince(start)
accountCommitTimer.Inc(statedb.AccountCommits.Milliseconds()) // Account commits are complete, we can mark them
storageCommitTimer.Inc(statedb.StorageCommits.Milliseconds()) // Storage commits are complete, we can mark them
snapshotCommitTimer.Inc(statedb.SnapshotCommits.Milliseconds()) // Snapshot commits are complete, we can mark them
triedbCommitTimer.Inc(statedb.TrieDBCommits.Milliseconds()) // Triedb commits are complete, we can mark them
blockWriteTimer.Inc((time.Since(substart) - statedb.AccountCommits - statedb.StorageCommits - statedb.SnapshotCommits - statedb.TrieDBCommits).Milliseconds())
blockInsertTimer.Inc(time.Since(start).Milliseconds())

log.Debug("Inserted new block", "number", block.Number(), "hash", block.Hash(),
"parentHash", block.ParentHash(),
Expand All @@ -1216,6 +1248,7 @@ func (bc *BlockChain) insertBlock(block *types.Block, writes bool) error {
)

processedBlockGasUsedCounter.Inc(int64(block.GasUsed()))
blockInsertCount.Inc(1)
return nil
}

Expand Down
20 changes: 20 additions & 0 deletions utils/metered_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,15 @@ package utils

import (
"fmt"
"os"
"path/filepath"
"sync/atomic"
"time"

"github.com/VictoriaMetrics/fastcache"
"github.com/ava-labs/subnet-evm/metrics"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
)

// MeteredCache wraps *fastcache.Cache and periodically pulls stats from it.
Expand All @@ -31,6 +35,20 @@ type MeteredCache struct {
updateFrequency uint64
}

func dirSize(path string) (int64, error) {
var size int64
err := filepath.Walk(path, func(_ string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.IsDir() {
size += info.Size()
}
return nil
})
return size, err
}

// NewMeteredCache returns a new MeteredCache that will update stats to the
// provided namespace once per each [updateFrequency] operations.
// Note: if [updateFrequency] is passed as 0, it will be treated as 1.
Expand All @@ -39,6 +57,8 @@ func NewMeteredCache(size int, journal string, namespace string, updateFrequency
if journal == "" {
cache = fastcache.New(size)
} else {
dirSize, err := dirSize(journal)
log.Info("attempting to load cache from disk", "path", journal, "dirSize", common.StorageSize(dirSize), "err", err)
cache = fastcache.LoadFromFileOrNew(journal, size)
}
if updateFrequency == 0 {
Expand Down

0 comments on commit 59a40c1

Please sign in to comment.