Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix overwriting of atomic trie roots #407

Merged
merged 4 commits into from
Dec 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 8 additions & 10 deletions plugin/evm/atomic_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ type atomicSyncer struct {
// syncer is used to sync leaves from the network.
syncer *syncclient.CallbackLeafSyncer

// nextHeight is the height which key / values
// are being inserted into [atomicTrie] for
nextHeight uint64
// lastHeight is the greatest height for which key / values
// were last inserted into the [atomicTrie]
lastHeight uint64
}

// addZeros adds [common.HashLenth] zeros to [height] and returns the result as []byte
Expand All @@ -64,7 +64,7 @@ func newAtomicSyncer(client syncclient.LeafClient, atomicBackend *atomicBackend,
trie: trie,
targetRoot: targetRoot,
targetHeight: targetHeight,
nextHeight: lastCommit + 1,
lastHeight: lastCommit,
}
tasks := make(chan syncclient.LeafSyncTask, 1)
tasks <- &atomicSyncerLeafTask{atomicSyncer: atomicSyncer}
Expand All @@ -81,15 +81,13 @@ func (s *atomicSyncer) Start(ctx context.Context) error {

// onLeafs is the callback for the leaf syncer, which will insert the key-value pairs into the trie.
func (s *atomicSyncer) onLeafs(keys [][]byte, values [][]byte) error {
_, lastCommittedHeight := s.atomicTrie.LastCommitted()
lastHeight := lastCommittedHeight // track heights so we calculate roots after each height
for i, key := range keys {
if len(key) != atomicKeyLength {
return fmt.Errorf("unexpected key len (%d) in atomic trie sync", len(key))
}
// key = height + blockchainID
height := binary.BigEndian.Uint64(key[:wrappers.LongLen])
if height > lastHeight {
if height > s.lastHeight {
// If this key belongs to a new height, we commit
// the trie at the previous height before adding this key.
root, nodes := s.trie.Commit(false)
Expand All @@ -98,7 +96,7 @@ func (s *atomicSyncer) onLeafs(keys [][]byte, values [][]byte) error {
}
// AcceptTrie commits the trieDB and returns [isCommit] as true
// if we have reached or crossed a commit interval.
isCommit, err := s.atomicTrie.AcceptTrie(lastHeight, root)
isCommit, err := s.atomicTrie.AcceptTrie(s.lastHeight, root)
if err != nil {
return err
}
Expand All @@ -109,7 +107,7 @@ func (s *atomicSyncer) onLeafs(keys [][]byte, values [][]byte) error {
return err
}
}
lastHeight = height
s.lastHeight = height
}

if err := s.trie.Update(key, values[i]); err != nil {
Expand Down Expand Up @@ -155,7 +153,7 @@ type atomicSyncerLeafTask struct {
atomicSyncer *atomicSyncer
}

func (a *atomicSyncerLeafTask) Start() []byte { return addZeroes(a.atomicSyncer.nextHeight) }
func (a *atomicSyncerLeafTask) Start() []byte { return addZeroes(a.atomicSyncer.lastHeight + 1) }
darioush marked this conversation as resolved.
Show resolved Hide resolved
func (a *atomicSyncerLeafTask) End() []byte { return nil }
func (a *atomicSyncerLeafTask) NodeType() message.NodeType { return message.AtomicTrieNode }
func (a *atomicSyncerLeafTask) OnFinish(context.Context) error { return a.atomicSyncer.onFinish() }
Expand Down
36 changes: 31 additions & 5 deletions plugin/evm/atomic_syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,19 @@
package evm

import (
"bytes"
"context"
"fmt"
"math/rand"
"testing"

"github.com/stretchr/testify/assert"

"github.com/ava-labs/avalanchego/database"
"github.com/ava-labs/avalanchego/database/memdb"
"github.com/ava-labs/avalanchego/database/versiondb"

"github.com/ava-labs/coreth/core/rawdb"
"github.com/ava-labs/coreth/ethdb/memorydb"
"github.com/ava-labs/coreth/plugin/evm/message"
syncclient "github.com/ava-labs/coreth/sync/client"
Expand Down Expand Up @@ -56,7 +59,6 @@ func testAtomicSyncer(t *testing.T, serverTrieDB *trie.Database, targetHeight ui
if err != nil {
t.Fatal("could not initialize atomic backend", err)
}
atomicTrie := atomicBackend.AtomicTrie()

// For each checkpoint, replace the leafsIntercept to shut off the syncer at the correct point and force resume from the checkpoint's
// next trie.
Expand Down Expand Up @@ -110,14 +112,38 @@ func testAtomicSyncer(t *testing.T, serverTrieDB *trie.Database, targetHeight ui

// we re-initialise trie DB for asserting the trie to make sure any issues with unflushed writes
// are caught here as this will only pass if all trie nodes have been written to the underlying DB
atomicTrie := atomicBackend.AtomicTrie()
clientTrieDB := atomicTrie.TrieDB()
syncutils.AssertTrieConsistency(t, targetRoot, serverTrieDB, clientTrieDB, nil)

// check all commit heights are created
for height := uint64(commitInterval); height <= targetHeight; height += commitInterval {
root, err := atomicTrie.Root(height)
// check all commit heights are created correctly
hasher := trie.NewEmpty(trie.NewDatabase(rawdb.NewMemoryDatabase()))
assert.NoError(t, err)

serverTrie, err := trie.New(trie.TrieID(targetRoot), serverTrieDB)
assert.NoError(t, err)
addAllKeysWithPrefix := func(prefix []byte) error {
it := trie.NewIterator(serverTrie.NodeIterator(prefix))
for it.Next() {
if !bytes.HasPrefix(it.Key, prefix) {
return it.Err
}
err := hasher.Update(it.Key, it.Value)
assert.NoError(t, err)
}
return it.Err
}

for height := uint64(0); height <= targetHeight; height++ {
err := addAllKeysWithPrefix(database.PackUInt64(height))
assert.NoError(t, err)
assert.NotZero(t, root)

if height%commitInterval == 0 {
expected := hasher.Hash()
root, err := atomicTrie.Root(height)
assert.NoError(t, err)
assert.Equal(t, expected, root)
}
}
}

Expand Down
Loading