Skip to content

Commit

Permalink
cmd/geth, core/state/snapshot: rework journal loading, implement acco…
Browse files Browse the repository at this point in the history
…unt-check (ethereum#24765)

* cmd/geth, core/state/snapshot: rework journal loading, implement account-check

* core/state/snapshot, cmd/geth: polish code (#37)

* core/state/snapshot: minor nits

* core/state/snapshot: simplify error logic

* cmd/geth: go format

Co-authored-by: rjl493456442 <garyrong0905@gmail.com>
  • Loading branch information
holiman and rjl493456442 authored Jun 6, 2022
1 parent d6b5574 commit c375ee9
Show file tree
Hide file tree
Showing 4 changed files with 205 additions and 151 deletions.
44 changes: 44 additions & 0 deletions cmd/geth/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,18 @@ In other words, this command does the snapshot to trie conversion.
Description: `
geth snapshot check-dangling-storage <state-root> traverses the snap storage
data, and verifies that all snapshot storage data has a corresponding account.
`,
},
{
Name: "inspect-account",
Usage: "Check all snapshot layers for the a specific account",
ArgsUsage: "<address | hash>",
Action: utils.MigrateFlags(checkAccount),
Category: "MISCELLANEOUS COMMANDS",
Flags: utils.GroupFlags(utils.NetworkFlags, utils.DatabasePathFlags),
Description: `
geth snapshot inspect-account <address | hash> checks all snapshot layers and prints out
information about the specified address.
`,
},
{
Expand Down Expand Up @@ -535,3 +547,35 @@ func dumpState(ctx *cli.Context) error {
"elapsed", common.PrettyDuration(time.Since(start)))
return nil
}

// checkAccount iterates the snap data layers, and looks up the given account
// across all layers.
func checkAccount(ctx *cli.Context) error {
if ctx.NArg() != 1 {
return errors.New("need <address|hash> arg")
}
var (
hash common.Hash
addr common.Address
)
switch len(ctx.Args()[0]) {
case 40, 42:
addr = common.HexToAddress(ctx.Args()[0])
hash = crypto.Keccak256Hash(addr.Bytes())
case 64, 66:
hash = common.HexToHash(ctx.Args()[0])
default:
return errors.New("malformed address or hash")
}
stack, _ := makeConfigNode(ctx)
defer stack.Close()
chaindb := utils.MakeChainDatabase(ctx, stack, true)
defer chaindb.Close()
start := time.Now()
log.Info("Checking difflayer journal", "address", addr, "hash", hash)
if err := snapshot.CheckJournalAccount(chaindb, hash); err != nil {
return err
}
log.Info("Checked the snapshot journalled storage", "time", common.PrettyDuration(time.Since(start)))
return nil
}
2 changes: 1 addition & 1 deletion core/state/snapshot/generate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func checkSnapRoot(t *testing.T, snap *diskLayer, trieRoot common.Hash) {
t.Fatalf("snaproot: %#x != trieroot #%x", snapRoot, trieRoot)
}
if err := CheckDanglingStorage(snap.diskdb); err != nil {
t.Fatalf("Detected dangling storages %v", err)
t.Fatalf("Detected dangling storages: %v", err)
}
}

Expand Down
185 changes: 99 additions & 86 deletions core/state/snapshot/journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,44 +108,15 @@ func loadAndParseJournal(db ethdb.KeyValueStore, base *diskLayer) (snapshot, jou
// So if there is no journal, or the journal is invalid(e.g. the journal
// is not matched with disk layer; or the it's the legacy-format journal,
// etc.), we just discard all diffs and try to recover them later.
journal := rawdb.ReadSnapshotJournal(db)
if len(journal) == 0 {
log.Warn("Loaded snapshot journal", "diskroot", base.root, "diffs", "missing")
return base, generator, nil
}
r := rlp.NewStream(bytes.NewReader(journal), 0)

// Firstly, resolve the first element as the journal version
version, err := r.Uint()
var current snapshot = base
err := iterateJournal(db, func(parent common.Hash, root common.Hash, destructSet map[common.Hash]struct{}, accountData map[common.Hash][]byte, storageData map[common.Hash]map[common.Hash][]byte) error {
current = newDiffLayer(current, root, destructSet, accountData, storageData)
return nil
})
if err != nil {
log.Warn("Failed to resolve the journal version", "error", err)
return base, generator, nil
}
if version != journalVersion {
log.Warn("Discarded the snapshot journal with wrong version", "required", journalVersion, "got", version)
return base, generator, nil
}
// Secondly, resolve the disk layer root, ensure it's continuous
// with disk layer. Note now we can ensure it's the snapshot journal
// correct version, so we expect everything can be resolved properly.
var root common.Hash
if err := r.Decode(&root); err != nil {
return nil, journalGenerator{}, errors.New("missing disk layer root")
}
// The diff journal is not matched with disk, discard them.
// It can happen that Geth crashes without persisting the latest
// diff journal.
if !bytes.Equal(root.Bytes(), base.root.Bytes()) {
log.Warn("Loaded snapshot journal", "diskroot", base.root, "diffs", "unmatched")
return base, generator, nil
}
// Load all the snapshot diffs from the journal
snapshot, err := loadDiffLayer(base, r)
if err != nil {
return nil, journalGenerator{}, err
}
log.Debug("Loaded snapshot journal", "diskroot", base.root, "diffhead", snapshot.Root())
return snapshot, generator, nil
return current, generator, nil
}

// loadSnapshot loads a pre-existing state snapshot backed by a key-value store.
Expand Down Expand Up @@ -218,57 +189,6 @@ func loadSnapshot(diskdb ethdb.KeyValueStore, triedb *trie.Database, cache int,
return snapshot, false, nil
}

// loadDiffLayer reads the next sections of a snapshot journal, reconstructing a new
// diff and verifying that it can be linked to the requested parent.
func loadDiffLayer(parent snapshot, r *rlp.Stream) (snapshot, error) {
// Read the next diff journal entry
var root common.Hash
if err := r.Decode(&root); err != nil {
// The first read may fail with EOF, marking the end of the journal
if err == io.EOF {
return parent, nil
}
return nil, fmt.Errorf("load diff root: %v", err)
}
var destructs []journalDestruct
if err := r.Decode(&destructs); err != nil {
return nil, fmt.Errorf("load diff destructs: %v", err)
}
destructSet := make(map[common.Hash]struct{})
for _, entry := range destructs {
destructSet[entry.Hash] = struct{}{}
}
var accounts []journalAccount
if err := r.Decode(&accounts); err != nil {
return nil, fmt.Errorf("load diff accounts: %v", err)
}
accountData := make(map[common.Hash][]byte)
for _, entry := range accounts {
if len(entry.Blob) > 0 { // RLP loses nil-ness, but `[]byte{}` is not a valid item, so reinterpret that
accountData[entry.Hash] = entry.Blob
} else {
accountData[entry.Hash] = nil
}
}
var storage []journalStorage
if err := r.Decode(&storage); err != nil {
return nil, fmt.Errorf("load diff storage: %v", err)
}
storageData := make(map[common.Hash]map[common.Hash][]byte)
for _, entry := range storage {
slots := make(map[common.Hash][]byte)
for i, key := range entry.Keys {
if len(entry.Vals[i]) > 0 { // RLP loses nil-ness, but `[]byte{}` is not a valid item, so reinterpret that
slots[key] = entry.Vals[i]
} else {
slots[key] = nil
}
}
storageData[entry.Hash] = slots
}
return loadDiffLayer(newDiffLayer(parent, root, destructSet, accountData, storageData), r)
}

// Journal terminates any in-progress snapshot generation, also implicitly pushing
// the progress into the database.
func (dl *diskLayer) Journal(buffer *bytes.Buffer) (common.Hash, error) {
Expand Down Expand Up @@ -345,3 +265,96 @@ func (dl *diffLayer) Journal(buffer *bytes.Buffer) (common.Hash, error) {
log.Debug("Journalled diff layer", "root", dl.root, "parent", dl.parent.Root())
return base, nil
}

// journalCallback is a function which is invoked by iterateJournal, every
// time a difflayer is loaded from disk.
type journalCallback = func(parent common.Hash, root common.Hash, destructs map[common.Hash]struct{}, accounts map[common.Hash][]byte, storage map[common.Hash]map[common.Hash][]byte) error

// iterateJournal iterates through the journalled difflayers, loading them from
// the database, and invoking the callback for each loaded layer.
// The order is incremental; starting with the bottom-most difflayer, going towards
// the most recent layer.
// This method returns error either if there was some error reading from disk,
// OR if the callback returns an error when invoked.
func iterateJournal(db ethdb.KeyValueReader, callback journalCallback) error {
journal := rawdb.ReadSnapshotJournal(db)
if len(journal) == 0 {
log.Warn("Loaded snapshot journal", "diffs", "missing")
return nil
}
r := rlp.NewStream(bytes.NewReader(journal), 0)
// Firstly, resolve the first element as the journal version
version, err := r.Uint()
if err != nil {
log.Warn("Failed to resolve the journal version", "error", err)
return errors.New("failed to resolve journal version")
}
if version != journalVersion {
log.Warn("Discarded the snapshot journal with wrong version", "required", journalVersion, "got", version)
return errors.New("wrong journal version")
}
// Secondly, resolve the disk layer root, ensure it's continuous
// with disk layer. Note now we can ensure it's the snapshot journal
// correct version, so we expect everything can be resolved properly.
var parent common.Hash
if err := r.Decode(&parent); err != nil {
return errors.New("missing disk layer root")
}
if baseRoot := rawdb.ReadSnapshotRoot(db); baseRoot != parent {
log.Warn("Loaded snapshot journal", "diskroot", baseRoot, "diffs", "unmatched")
return fmt.Errorf("mismatched disk and diff layers")
}
for {
var (
root common.Hash
destructs []journalDestruct
accounts []journalAccount
storage []journalStorage
destructSet = make(map[common.Hash]struct{})
accountData = make(map[common.Hash][]byte)
storageData = make(map[common.Hash]map[common.Hash][]byte)
)
// Read the next diff journal entry
if err := r.Decode(&root); err != nil {
// The first read may fail with EOF, marking the end of the journal
if errors.Is(err, io.EOF) {
return nil
}
return fmt.Errorf("load diff root: %v", err)
}
if err := r.Decode(&destructs); err != nil {
return fmt.Errorf("load diff destructs: %v", err)
}
if err := r.Decode(&accounts); err != nil {
return fmt.Errorf("load diff accounts: %v", err)
}
if err := r.Decode(&storage); err != nil {
return fmt.Errorf("load diff storage: %v", err)
}
for _, entry := range destructs {
destructSet[entry.Hash] = struct{}{}
}
for _, entry := range accounts {
if len(entry.Blob) > 0 { // RLP loses nil-ness, but `[]byte{}` is not a valid item, so reinterpret that
accountData[entry.Hash] = entry.Blob
} else {
accountData[entry.Hash] = nil
}
}
for _, entry := range storage {
slots := make(map[common.Hash][]byte)
for i, key := range entry.Keys {
if len(entry.Vals[i]) > 0 { // RLP loses nil-ness, but `[]byte{}` is not a valid item, so reinterpret that
slots[key] = entry.Vals[i]
} else {
slots[key] = nil
}
}
storageData[entry.Hash] = slots
}
if err := callback(parent, root, destructSet, accountData, storageData); err != nil {
return err
}
parent = root
}
}
Loading

0 comments on commit c375ee9

Please sign in to comment.