Skip to content

Commit

Permalink
Fix overwriting of atomic trie roots (#407)
Browse files Browse the repository at this point in the history
  • Loading branch information
darioush authored Dec 1, 2023
1 parent 0a6a7a3 commit 6d7e6b8
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 15 deletions.
18 changes: 8 additions & 10 deletions plugin/evm/atomic_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ type atomicSyncer struct {
// syncer is used to sync leaves from the network.
syncer *syncclient.CallbackLeafSyncer

// nextHeight is the height which key / values
// are being inserted into [atomicTrie] for
nextHeight uint64
// lastHeight is the greatest height for which key / values
// were last inserted into the [atomicTrie]
lastHeight uint64
}

// addZeros adds [common.HashLenth] zeros to [height] and returns the result as []byte
Expand All @@ -64,7 +64,7 @@ func newAtomicSyncer(client syncclient.LeafClient, atomicBackend *atomicBackend,
trie: trie,
targetRoot: targetRoot,
targetHeight: targetHeight,
nextHeight: lastCommit + 1,
lastHeight: lastCommit,
}
tasks := make(chan syncclient.LeafSyncTask, 1)
tasks <- &atomicSyncerLeafTask{atomicSyncer: atomicSyncer}
Expand All @@ -81,15 +81,13 @@ func (s *atomicSyncer) Start(ctx context.Context) error {

// onLeafs is the callback for the leaf syncer, which will insert the key-value pairs into the trie.
func (s *atomicSyncer) onLeafs(keys [][]byte, values [][]byte) error {
_, lastCommittedHeight := s.atomicTrie.LastCommitted()
lastHeight := lastCommittedHeight // track heights so we calculate roots after each height
for i, key := range keys {
if len(key) != atomicKeyLength {
return fmt.Errorf("unexpected key len (%d) in atomic trie sync", len(key))
}
// key = height + blockchainID
height := binary.BigEndian.Uint64(key[:wrappers.LongLen])
if height > lastHeight {
if height > s.lastHeight {
// If this key belongs to a new height, we commit
// the trie at the previous height before adding this key.
root, nodes := s.trie.Commit(false)
Expand All @@ -98,7 +96,7 @@ func (s *atomicSyncer) onLeafs(keys [][]byte, values [][]byte) error {
}
// AcceptTrie commits the trieDB and returns [isCommit] as true
// if we have reached or crossed a commit interval.
isCommit, err := s.atomicTrie.AcceptTrie(lastHeight, root)
isCommit, err := s.atomicTrie.AcceptTrie(s.lastHeight, root)
if err != nil {
return err
}
Expand All @@ -109,7 +107,7 @@ func (s *atomicSyncer) onLeafs(keys [][]byte, values [][]byte) error {
return err
}
}
lastHeight = height
s.lastHeight = height
}

if err := s.trie.Update(key, values[i]); err != nil {
Expand Down Expand Up @@ -155,7 +153,7 @@ type atomicSyncerLeafTask struct {
atomicSyncer *atomicSyncer
}

func (a *atomicSyncerLeafTask) Start() []byte { return addZeroes(a.atomicSyncer.nextHeight) }
func (a *atomicSyncerLeafTask) Start() []byte { return addZeroes(a.atomicSyncer.lastHeight + 1) }
func (a *atomicSyncerLeafTask) End() []byte { return nil }
func (a *atomicSyncerLeafTask) NodeType() message.NodeType { return message.AtomicTrieNode }
func (a *atomicSyncerLeafTask) OnFinish(context.Context) error { return a.atomicSyncer.onFinish() }
Expand Down
36 changes: 31 additions & 5 deletions plugin/evm/atomic_syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,19 @@
package evm

import (
"bytes"
"context"
"fmt"
"math/rand"
"testing"

"github.com/stretchr/testify/assert"

"github.com/ava-labs/avalanchego/database"
"github.com/ava-labs/avalanchego/database/memdb"
"github.com/ava-labs/avalanchego/database/versiondb"

"github.com/ava-labs/coreth/core/rawdb"
"github.com/ava-labs/coreth/ethdb/memorydb"
"github.com/ava-labs/coreth/plugin/evm/message"
syncclient "github.com/ava-labs/coreth/sync/client"
Expand Down Expand Up @@ -56,7 +59,6 @@ func testAtomicSyncer(t *testing.T, serverTrieDB *trie.Database, targetHeight ui
if err != nil {
t.Fatal("could not initialize atomic backend", err)
}
atomicTrie := atomicBackend.AtomicTrie()

// For each checkpoint, replace the leafsIntercept to shut off the syncer at the correct point and force resume from the checkpoint's
// next trie.
Expand Down Expand Up @@ -110,14 +112,38 @@ func testAtomicSyncer(t *testing.T, serverTrieDB *trie.Database, targetHeight ui

// we re-initialise trie DB for asserting the trie to make sure any issues with unflushed writes
// are caught here as this will only pass if all trie nodes have been written to the underlying DB
atomicTrie := atomicBackend.AtomicTrie()
clientTrieDB := atomicTrie.TrieDB()
syncutils.AssertTrieConsistency(t, targetRoot, serverTrieDB, clientTrieDB, nil)

// check all commit heights are created
for height := uint64(commitInterval); height <= targetHeight; height += commitInterval {
root, err := atomicTrie.Root(height)
// check all commit heights are created correctly
hasher := trie.NewEmpty(trie.NewDatabase(rawdb.NewMemoryDatabase()))
assert.NoError(t, err)

serverTrie, err := trie.New(trie.TrieID(targetRoot), serverTrieDB)
assert.NoError(t, err)
addAllKeysWithPrefix := func(prefix []byte) error {
it := trie.NewIterator(serverTrie.NodeIterator(prefix))
for it.Next() {
if !bytes.HasPrefix(it.Key, prefix) {
return it.Err
}
err := hasher.Update(it.Key, it.Value)
assert.NoError(t, err)
}
return it.Err
}

for height := uint64(0); height <= targetHeight; height++ {
err := addAllKeysWithPrefix(database.PackUInt64(height))
assert.NoError(t, err)
assert.NotZero(t, root)

if height%commitInterval == 0 {
expected := hasher.Hash()
root, err := atomicTrie.Root(height)
assert.NoError(t, err)
assert.Equal(t, expected, root)
}
}
}

Expand Down

0 comments on commit 6d7e6b8

Please sign in to comment.