Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core, cmd/utils: when configured via flag, in block execution: prefetch all reads from account/storage tries #29807

Merged
merged 4 commits into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ var (
utils.BeaconGenesisRootFlag,
utils.BeaconGenesisTimeFlag,
utils.BeaconCheckpointFlag,
utils.CollectWitnessFlag,
}, utils.NetworkFlags, utils.DatabaseFlags)

rpcFlags = []cli.Flag{
Expand Down
13 changes: 12 additions & 1 deletion cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,11 @@ var (
Usage: "Disables db compaction after import",
Category: flags.LoggingCategory,
}
CollectWitnessFlag = &cli.BoolFlag{
Name: "collectwitness",
Usage: "Enable state witness generation during block execution. Work in progress flag, don't use.",
Category: flags.MiscCategory,
}

// MISC settings
SyncTargetFlag = &cli.StringFlag{
Expand Down Expand Up @@ -1760,6 +1765,9 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) {
// TODO(fjl): force-enable this in --dev mode
cfg.EnablePreimageRecording = ctx.Bool(VMEnableDebugFlag.Name)
}
if ctx.IsSet(CollectWitnessFlag.Name) {
cfg.EnableWitnessCollection = ctx.Bool(CollectWitnessFlag.Name)
}

if ctx.IsSet(RPCGlobalGasCapFlag.Name) {
cfg.RPCGasCap = ctx.Uint64(RPCGlobalGasCapFlag.Name)
Expand Down Expand Up @@ -2190,7 +2198,10 @@ func MakeChain(ctx *cli.Context, stack *node.Node, readonly bool) (*core.BlockCh
if ctx.IsSet(CacheFlag.Name) || ctx.IsSet(CacheGCFlag.Name) {
cache.TrieDirtyLimit = ctx.Int(CacheFlag.Name) * ctx.Int(CacheGCFlag.Name) / 100
}
vmcfg := vm.Config{EnablePreimageRecording: ctx.Bool(VMEnableDebugFlag.Name)}
vmcfg := vm.Config{
EnablePreimageRecording: ctx.Bool(VMEnableDebugFlag.Name),
EnableWitnessCollection: ctx.Bool(CollectWitnessFlag.Name),
}
if ctx.IsSet(VMTraceFlag.Name) {
if name := ctx.String(VMTraceFlag.Name); name != "" {
var config json.RawMessage
Expand Down
2 changes: 1 addition & 1 deletion core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1809,7 +1809,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
// while processing transactions. Before Byzantium the prefetcher is mostly
// useless due to the intermediate root hashing after each transaction.
if bc.chainConfig.IsByzantium(block.Number()) {
statedb.StartPrefetcher("chain")
statedb.StartPrefetcher("chain", !bc.vmConfig.EnableWitnessCollection)
}
activeState = statedb

Expand Down
10 changes: 9 additions & 1 deletion core/state/state_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,14 @@ func (s *stateObject) GetCommittedState(key common.Hash) common.Hash {
}
value.SetBytes(val)
}
// Independent of where we loaded the data from, add it to the prefetcher.
// Whilst this would be a bit weird if snapshots are disabled, but we still
// want the trie nodes to end up in the prefetcher too, so just push through.
if s.db.prefetcher != nil && s.data.Root != types.EmptyRootHash {
if err = s.db.prefetcher.prefetch(s.addrHash, s.origin.Root, s.address, [][]byte{key[:]}, true); err != nil {
log.Error("Failed to prefetch storage slot", "addr", s.address, "key", key, "err", err)
}
}
s.originStorage[key] = value
return value
}
Expand Down Expand Up @@ -293,7 +301,7 @@ func (s *stateObject) finalise() {
s.pendingStorage[key] = value
}
if s.db.prefetcher != nil && len(slotsToPrefetch) > 0 && s.data.Root != types.EmptyRootHash {
if err := s.db.prefetcher.prefetch(s.addrHash, s.data.Root, s.address, slotsToPrefetch); err != nil {
if err := s.db.prefetcher.prefetch(s.addrHash, s.data.Root, s.address, slotsToPrefetch, false); err != nil {
log.Error("Failed to prefetch slots", "addr", s.address, "slots", len(slotsToPrefetch), "err", err)
}
}
Expand Down
16 changes: 12 additions & 4 deletions core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,14 +200,14 @@ func (s *StateDB) SetLogger(l *tracing.Hooks) {
// StartPrefetcher initializes a new trie prefetcher to pull in nodes from the
// state trie concurrently while the state is mutated so that when we reach the
// commit phase, most of the needed data is already hot.
func (s *StateDB) StartPrefetcher(namespace string) {
func (s *StateDB) StartPrefetcher(namespace string, noreads bool) {
if s.prefetcher != nil {
s.prefetcher.terminate(false)
s.prefetcher.report()
s.prefetcher = nil
}
if s.snap != nil {
s.prefetcher = newTriePrefetcher(s.db, s.originalRoot, namespace)
s.prefetcher = newTriePrefetcher(s.db, s.originalRoot, namespace, noreads)

// With the switch to the Proof-of-Stake consensus algorithm, block production
// rewards are now handled at the consensus layer. Consequently, a block may
Expand All @@ -218,7 +218,7 @@ func (s *StateDB) StartPrefetcher(namespace string) {
// To prevent this, the account trie is always scheduled for prefetching once
// the prefetcher is constructed. For more details, see:
// https://github.com/ethereum/go-ethereum/issues/29880
if err := s.prefetcher.prefetch(common.Hash{}, s.originalRoot, common.Address{}, nil); err != nil {
if err := s.prefetcher.prefetch(common.Hash{}, s.originalRoot, common.Address{}, nil, false); err != nil {
log.Error("Failed to prefetch account trie", "root", s.originalRoot, "err", err)
}
}
Expand Down Expand Up @@ -616,6 +616,14 @@ func (s *StateDB) getStateObject(addr common.Address) *stateObject {
return nil
}
}
// Independent of where we loaded the data from, add it to the prefetcher.
// Whilst this would be a bit weird if snapshots are disabled, but we still
// want the trie nodes to end up in the prefetcher too, so just push through.
if s.prefetcher != nil {
if err := s.prefetcher.prefetch(common.Hash{}, s.originalRoot, common.Address{}, [][]byte{addr[:]}, true); err != nil {
log.Error("Failed to prefetch account", "addr", addr, "err", err)
}
}
// Insert into the live set
obj := newObject(s, addr, data)
s.setStateObject(obj)
Expand Down Expand Up @@ -792,7 +800,7 @@ func (s *StateDB) Finalise(deleteEmptyObjects bool) {
addressesToPrefetch = append(addressesToPrefetch, common.CopyBytes(addr[:])) // Copy needed for closure
}
if s.prefetcher != nil && len(addressesToPrefetch) > 0 {
if err := s.prefetcher.prefetch(common.Hash{}, s.originalRoot, common.Address{}, addressesToPrefetch); err != nil {
if err := s.prefetcher.prefetch(common.Hash{}, s.originalRoot, common.Address{}, addressesToPrefetch, false); err != nil {
log.Error("Failed to prefetch addresses", "addresses", len(addressesToPrefetch), "err", err)
}
}
Expand Down
162 changes: 116 additions & 46 deletions core/state/trie_prefetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,31 +44,49 @@ type triePrefetcher struct {
root common.Hash // Root hash of the account trie for metrics
fetchers map[string]*subfetcher // Subfetchers for each trie
term chan struct{} // Channel to signal interruption
noreads bool // Whether to ignore state-read-only prefetch requests

deliveryMissMeter metrics.Meter
accountLoadMeter metrics.Meter
accountDupMeter metrics.Meter
accountWasteMeter metrics.Meter
storageLoadMeter metrics.Meter
storageDupMeter metrics.Meter
storageWasteMeter metrics.Meter

accountLoadReadMeter metrics.Meter
accountLoadWriteMeter metrics.Meter
accountDupReadMeter metrics.Meter
accountDupWriteMeter metrics.Meter
accountDupCrossMeter metrics.Meter
accountWasteMeter metrics.Meter

storageLoadReadMeter metrics.Meter
storageLoadWriteMeter metrics.Meter
storageDupReadMeter metrics.Meter
storageDupWriteMeter metrics.Meter
storageDupCrossMeter metrics.Meter
storageWasteMeter metrics.Meter
}

func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePrefetcher {
func newTriePrefetcher(db Database, root common.Hash, namespace string, noreads bool) *triePrefetcher {
prefix := triePrefetchMetricsPrefix + namespace
return &triePrefetcher{
db: db,
root: root,
fetchers: make(map[string]*subfetcher), // Active prefetchers use the fetchers map
term: make(chan struct{}),
noreads: noreads,

deliveryMissMeter: metrics.GetOrRegisterMeter(prefix+"/deliverymiss", nil),
accountLoadMeter: metrics.GetOrRegisterMeter(prefix+"/account/load", nil),
accountDupMeter: metrics.GetOrRegisterMeter(prefix+"/account/dup", nil),
accountWasteMeter: metrics.GetOrRegisterMeter(prefix+"/account/waste", nil),
storageLoadMeter: metrics.GetOrRegisterMeter(prefix+"/storage/load", nil),
storageDupMeter: metrics.GetOrRegisterMeter(prefix+"/storage/dup", nil),
storageWasteMeter: metrics.GetOrRegisterMeter(prefix+"/storage/waste", nil),

accountLoadReadMeter: metrics.GetOrRegisterMeter(prefix+"/account/load/read", nil),
accountLoadWriteMeter: metrics.GetOrRegisterMeter(prefix+"/account/load/write", nil),
accountDupReadMeter: metrics.GetOrRegisterMeter(prefix+"/account/dup/read", nil),
accountDupWriteMeter: metrics.GetOrRegisterMeter(prefix+"/account/dup/write", nil),
accountDupCrossMeter: metrics.GetOrRegisterMeter(prefix+"/account/dup/cross", nil),
accountWasteMeter: metrics.GetOrRegisterMeter(prefix+"/account/waste", nil),

storageLoadReadMeter: metrics.GetOrRegisterMeter(prefix+"/storage/load/read", nil),
storageLoadWriteMeter: metrics.GetOrRegisterMeter(prefix+"/storage/load/write", nil),
storageDupReadMeter: metrics.GetOrRegisterMeter(prefix+"/storage/dup/read", nil),
storageDupWriteMeter: metrics.GetOrRegisterMeter(prefix+"/storage/dup/write", nil),
storageDupCrossMeter: metrics.GetOrRegisterMeter(prefix+"/storage/dup/cross", nil),
storageWasteMeter: metrics.GetOrRegisterMeter(prefix+"/storage/waste", nil),
}
}

Expand Down Expand Up @@ -98,19 +116,31 @@ func (p *triePrefetcher) report() {
fetcher.wait() // ensure the fetcher's idle before poking in its internals

if fetcher.root == p.root {
p.accountLoadMeter.Mark(int64(len(fetcher.seen)))
p.accountDupMeter.Mark(int64(fetcher.dups))
p.accountLoadReadMeter.Mark(int64(len(fetcher.seenRead)))
p.accountLoadWriteMeter.Mark(int64(len(fetcher.seenWrite)))

p.accountDupReadMeter.Mark(int64(fetcher.dupsRead))
p.accountDupWriteMeter.Mark(int64(fetcher.dupsWrite))
p.accountDupCrossMeter.Mark(int64(fetcher.dupsCross))

for _, key := range fetcher.used {
delete(fetcher.seen, string(key))
delete(fetcher.seenRead, string(key))
delete(fetcher.seenWrite, string(key))
}
p.accountWasteMeter.Mark(int64(len(fetcher.seen)))
p.accountWasteMeter.Mark(int64(len(fetcher.seenRead) + len(fetcher.seenWrite)))
} else {
p.storageLoadMeter.Mark(int64(len(fetcher.seen)))
p.storageDupMeter.Mark(int64(fetcher.dups))
p.storageLoadReadMeter.Mark(int64(len(fetcher.seenRead)))
p.storageLoadWriteMeter.Mark(int64(len(fetcher.seenWrite)))

p.storageDupReadMeter.Mark(int64(fetcher.dupsRead))
p.storageDupWriteMeter.Mark(int64(fetcher.dupsWrite))
p.storageDupCrossMeter.Mark(int64(fetcher.dupsCross))

for _, key := range fetcher.used {
delete(fetcher.seen, string(key))
delete(fetcher.seenRead, string(key))
delete(fetcher.seenWrite, string(key))
}
p.storageWasteMeter.Mark(int64(len(fetcher.seen)))
p.storageWasteMeter.Mark(int64(len(fetcher.seenRead) + len(fetcher.seenWrite)))
}
}
}
Expand All @@ -126,7 +156,11 @@ func (p *triePrefetcher) report() {
// upon the same contract, the parameters invoking this method may be
// repeated.
// 2. Finalize of the main account trie. This happens only once per block.
func (p *triePrefetcher) prefetch(owner common.Hash, root common.Hash, addr common.Address, keys [][]byte) error {
func (p *triePrefetcher) prefetch(owner common.Hash, root common.Hash, addr common.Address, keys [][]byte, read bool) error {
// If the state item is only being read, but reads are disabled, return
if read && p.noreads {
return nil
}
// Ensure the subfetcher is still alive
select {
case <-p.term:
Expand All @@ -139,7 +173,7 @@ func (p *triePrefetcher) prefetch(owner common.Hash, root common.Hash, addr comm
fetcher = newSubfetcher(p.db, p.root, owner, root, addr)
p.fetchers[id] = fetcher
}
return fetcher.schedule(keys)
return fetcher.schedule(keys, read)
}

// trie returns the trie matching the root hash, blocking until the fetcher of
Expand Down Expand Up @@ -186,38 +220,51 @@ type subfetcher struct {
addr common.Address // Address of the account that the trie belongs to
trie Trie // Trie being populated with nodes

tasks [][]byte // Items queued up for retrieval
lock sync.Mutex // Lock protecting the task queue
tasks []*subfetcherTask // Items queued up for retrieval
lock sync.Mutex // Lock protecting the task queue

wake chan struct{} // Wake channel if a new task is scheduled
stop chan struct{} // Channel to interrupt processing
term chan struct{} // Channel to signal interruption

seen map[string]struct{} // Tracks the entries already loaded
dups int // Number of duplicate preload tasks
used [][]byte // Tracks the entries used in the end
seenRead map[string]struct{} // Tracks the entries already loaded via read operations
seenWrite map[string]struct{} // Tracks the entries already loaded via write operations

dupsRead int // Number of duplicate preload tasks via reads only
dupsWrite int // Number of duplicate preload tasks via writes only
dupsCross int // Number of duplicate preload tasks via read-write-crosses

used [][]byte // Tracks the entries used in the end
}

// subfetcherTask is a trie path to prefetch, tagged with whether it originates
// from a read or a write request.
type subfetcherTask struct {
read bool
key []byte
}

// newSubfetcher creates a goroutine to prefetch state items belonging to a
// particular root hash.
func newSubfetcher(db Database, state common.Hash, owner common.Hash, root common.Hash, addr common.Address) *subfetcher {
sf := &subfetcher{
db: db,
state: state,
owner: owner,
root: root,
addr: addr,
wake: make(chan struct{}, 1),
stop: make(chan struct{}),
term: make(chan struct{}),
seen: make(map[string]struct{}),
db: db,
state: state,
owner: owner,
root: root,
addr: addr,
wake: make(chan struct{}, 1),
stop: make(chan struct{}),
term: make(chan struct{}),
seenRead: make(map[string]struct{}),
seenWrite: make(map[string]struct{}),
}
go sf.loop()
return sf
}

// schedule adds a batch of trie keys to the queue to prefetch.
func (sf *subfetcher) schedule(keys [][]byte) error {
func (sf *subfetcher) schedule(keys [][]byte, read bool) error {
// Ensure the subfetcher is still alive
select {
case <-sf.term:
Expand All @@ -226,7 +273,10 @@ func (sf *subfetcher) schedule(keys [][]byte) error {
}
// Append the tasks to the current queue
sf.lock.Lock()
sf.tasks = append(sf.tasks, keys...)
for _, key := range keys {
key := key // closure for the append below
sf.tasks = append(sf.tasks, &subfetcherTask{read: read, key: key})
}
sf.lock.Unlock()

// Notify the background thread to execute scheduled tasks
Expand Down Expand Up @@ -303,16 +353,36 @@ func (sf *subfetcher) loop() {
sf.lock.Unlock()

for _, task := range tasks {
if _, ok := sf.seen[string(task)]; ok {
sf.dups++
continue
key := string(task.key)
if task.read {
if _, ok := sf.seenRead[key]; ok {
sf.dupsRead++
continue
}
if _, ok := sf.seenWrite[key]; ok {
sf.dupsCross++
continue
}
} else {
if _, ok := sf.seenRead[key]; ok {
sf.dupsCross++
continue
}
if _, ok := sf.seenWrite[key]; ok {
sf.dupsWrite++
continue
}
}
if len(task.key) == common.AddressLength {
sf.trie.GetAccount(common.BytesToAddress(task.key))
} else {
sf.trie.GetStorage(sf.addr, task.key)
}
if len(task) == common.AddressLength {
sf.trie.GetAccount(common.BytesToAddress(task))
if task.read {
sf.seenRead[key] = struct{}{}
} else {
sf.trie.GetStorage(sf.addr, task)
sf.seenWrite[key] = struct{}{}
}
sf.seen[string(task)] = struct{}{}
}

case <-sf.stop:
Expand Down
6 changes: 3 additions & 3 deletions core/state/trie_prefetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,15 @@ func filledStateDB() *StateDB {

func TestUseAfterTerminate(t *testing.T) {
db := filledStateDB()
prefetcher := newTriePrefetcher(db.db, db.originalRoot, "")
prefetcher := newTriePrefetcher(db.db, db.originalRoot, "", true)
skey := common.HexToHash("aaa")

if err := prefetcher.prefetch(common.Hash{}, db.originalRoot, common.Address{}, [][]byte{skey.Bytes()}); err != nil {
if err := prefetcher.prefetch(common.Hash{}, db.originalRoot, common.Address{}, [][]byte{skey.Bytes()}, false); err != nil {
t.Errorf("Prefetch failed before terminate: %v", err)
}
prefetcher.terminate(false)

if err := prefetcher.prefetch(common.Hash{}, db.originalRoot, common.Address{}, [][]byte{skey.Bytes()}); err == nil {
if err := prefetcher.prefetch(common.Hash{}, db.originalRoot, common.Address{}, [][]byte{skey.Bytes()}, false); err == nil {
t.Errorf("Prefetch succeeded after terminate: %v", err)
}
if tr := prefetcher.trie(common.Hash{}, db.originalRoot); tr == nil {
Expand Down
1 change: 1 addition & 0 deletions core/vm/interpreter.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type Config struct {
NoBaseFee bool // Forces the EIP-1559 baseFee to 0 (needed for 0 price calls)
EnablePreimageRecording bool // Enables recording of SHA3/keccak preimages
ExtraEips []int // Additional EIPS that are to be enabled
EnableWitnessCollection bool // true if witness collection is enabled
}

// ScopeContext contains the things that are per-call, such as stack and memory,
Expand Down
Loading
Loading