Skip to content

Commit

Permalink
Fix TestVMShutdownWhileSyncing (#323)
Browse files Browse the repository at this point in the history
* Fix TestVMShutdownWhileSyncing

* fix
  • Loading branch information
darioush authored Sep 11, 2023
1 parent 347fe53 commit 545b4f1
Showing 1 changed file with 85 additions and 141 deletions.
226 changes: 85 additions & 141 deletions plugin/evm/syncervm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/ava-labs/avalanchego/chains/atomic"
"github.com/ava-labs/avalanchego/database/manager"
"github.com/ava-labs/avalanchego/database/prefixdb"
"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/snow"
"github.com/ava-labs/avalanchego/snow/choices"
Expand Down Expand Up @@ -52,7 +53,6 @@ func TestSkipStateSync(t *testing.T) {
syncMode: block.StateSyncSkipped,
}
vmSetup := createSyncServerAndClientVMs(t, test)
defer vmSetup.Teardown(t)

testSyncerVM(t, vmSetup, test)
}
Expand All @@ -65,7 +65,6 @@ func TestStateSyncFromScratch(t *testing.T) {
syncMode: block.StateSyncStatic,
}
vmSetup := createSyncServerAndClientVMs(t, test)
defer vmSetup.Teardown(t)

testSyncerVM(t, vmSetup, test)
}
Expand Down Expand Up @@ -107,7 +106,6 @@ func TestStateSyncToggleEnabledToDisabled(t *testing.T) {
expectedErr: context.Canceled,
}
vmSetup := createSyncServerAndClientVMs(t, test)
defer vmSetup.Teardown(t)

// Perform sync resulting in early termination.
testSyncerVM(t, vmSetup, test)
Expand Down Expand Up @@ -250,61 +248,34 @@ func TestVMShutdownWhileSyncing(t *testing.T) {
// Shutdown the VM after 50 requests to interrupt the sync
if reqCount == 50 {
// Note this verifies the VM shutdown does not time out while syncing.
require.NoError(t, vmSetup.syncerVM.Shutdown(context.Background()))
require.NoError(t, vmSetup.shutdownOnceSyncerVM.Shutdown(context.Background()))
} else if reqCount < 50 {
syncerVM.AppResponse(context.Background(), nodeID, requestID, response)
err := syncerVM.AppResponse(context.Background(), nodeID, requestID, response)
require.NoError(t, err)
}
},
expectedErr: context.Canceled,
}
vmSetup = createSyncServerAndClientVMs(t, test)
defer func() {
require.NoError(t, vmSetup.serverVM.Shutdown(context.Background()))
}()

// Perform sync resulting in early termination.
testSyncerVM(t, vmSetup, test)
}

func createSyncServerAndClientVMs(t *testing.T, test syncTest) *syncVMSetup {
var (
serverVM, syncerVM *VM
)
// If there is an error shutdown the VMs if they have been instantiated
defer func() {
// If the test has not already failed, shut down the VMs since the caller
// will not get the chance to shut them down.
if !t.Failed() {
return
}

// If the test already failed, shut down the VMs if they were instantiated.
if serverVM != nil {
log.Info("Shutting down server VM")
if err := serverVM.Shutdown(context.Background()); err != nil {
t.Fatal(err)
}
}
if syncerVM != nil {
log.Info("Shutting down syncerVM")
if err := syncerVM.Shutdown(context.Background()); err != nil {
t.Fatal(err)
}
require = require.New(t)
importAmount = 2000000 * units.Avax // 2M avax
alloc = map[ids.ShortID]uint64{
testShortIDAddrs[0]: importAmount,
}
}()

// configure [serverVM]
importAmount := 2000000 * units.Avax // 2M avax
)
_, serverVM, _, serverAtomicMemory, serverAppSender := GenesisVMWithUTXOs(
t,
true,
"",
"",
"",
map[ids.ShortID]uint64{
testShortIDAddrs[0]: importAmount,
},
t, true, "", "", "", alloc,
)
t.Cleanup(func() {
log.Info("Shutting down server VM")
require.NoError(serverVM.Shutdown(context.Background()))
})

var (
importTx, exportTx *Tx
Expand All @@ -315,12 +286,8 @@ func createSyncServerAndClientVMs(t *testing.T, test syncTest) *syncVMSetup {
case 0:
// spend the UTXOs from shared memory
importTx, err = serverVM.newImportTx(serverVM.ctx.XChainID, testEthAddrs[0], initialBaseFee, []*secp256k1.PrivateKey{testKeys[0]})
if err != nil {
t.Fatal(err)
}
if err := serverVM.issueTx(importTx, true /*=local*/); err != nil {
t.Fatal(err)
}
require.NoError(err)
require.NoError(serverVM.issueTx(importTx, true /*=local*/))
case 1:
// export some of the imported UTXOs to test exportTx is properly synced
exportTx, err = serverVM.newExportTx(
Expand All @@ -331,19 +298,13 @@ func createSyncServerAndClientVMs(t *testing.T, test syncTest) *syncVMSetup {
initialBaseFee,
[]*secp256k1.PrivateKey{testKeys[0]},
)
if err != nil {
t.Fatal(err)
}
if err := serverVM.issueTx(exportTx, true /*=local*/); err != nil {
t.Fatal(err)
}
require.NoError(err)
require.NoError(serverVM.issueTx(exportTx, true /*=local*/))
default: // Generate simple transfer transactions.
pk := testKeys[0].ToECDSA()
tx := types.NewTransaction(gen.TxNonce(testEthAddrs[0]), testEthAddrs[1], common.Big1, params.TxGas, initialBaseFee, nil)
signedTx, err := types.SignTx(tx, types.NewEIP155Signer(serverVM.chainID), pk)
if err != nil {
t.Fatal(t)
}
require.NoError(err)
gen.AddTx(signedTx)
}
})
Expand All @@ -353,8 +314,8 @@ func createSyncServerAndClientVMs(t *testing.T, test syncTest) *syncVMSetup {
// fetching a state summary.
serverAtomicTrie := serverVM.atomicTrie.(*atomicTrie)
serverAtomicTrie.commitInterval = test.syncableInterval
assert.NoError(t, serverAtomicTrie.commit(test.syncableInterval, serverAtomicTrie.LastAcceptedRoot()))
assert.NoError(t, serverVM.db.Commit())
require.NoError(serverAtomicTrie.commit(test.syncableInterval, serverAtomicTrie.LastAcceptedRoot()))
require.NoError(serverVM.db.Commit())

serverSharedMemories := newSharedMemories(serverAtomicMemory, serverVM.ctx.ChainID, serverVM.ctx.XChainID)
serverSharedMemories.assertOpsApplied(t, importTx.mustAtomicOps())
Expand All @@ -370,37 +331,28 @@ func createSyncServerAndClientVMs(t *testing.T, test syncTest) *syncVMSetup {
lastAccepted := serverVM.blockChain.LastAcceptedBlock()
patchedBlock := patchBlock(lastAccepted, root, serverVM.chaindb)
blockBytes, err := rlp.EncodeToBytes(patchedBlock)
if err != nil {
t.Fatal(err)
}
require.NoError(err)
internalBlock, err := serverVM.parseBlock(context.Background(), blockBytes)
if err != nil {
t.Fatal(err)
}
require.NoError(err)
internalBlock.(*Block).SetStatus(choices.Accepted)
assert.NoError(t, serverVM.State.SetLastAcceptedBlock(internalBlock))
require.NoError(serverVM.State.SetLastAcceptedBlock(internalBlock))

// patch syncableInterval for test
serverVM.StateSyncServer.(*stateSyncServer).syncableInterval = test.syncableInterval

// initialise [syncerVM] with blank genesis state
stateSyncEnabledJSON := fmt.Sprintf("{\"state-sync-enabled\":true, \"state-sync-min-blocks\": %d}", test.stateSyncMinBlocks)
stateSyncEnabledJSON := fmt.Sprintf(`{"state-sync-enabled":true, "state-sync-min-blocks": %d}`, test.stateSyncMinBlocks)
syncerEngineChan, syncerVM, syncerDBManager, syncerAtomicMemory, syncerAppSender := GenesisVMWithUTXOs(
t,
false,
"",
stateSyncEnabledJSON,
"",
map[ids.ShortID]uint64{
testShortIDAddrs[0]: importAmount,
},
t, false, "", stateSyncEnabledJSON, "", alloc,
)
if err := syncerVM.SetState(context.Background(), snow.StateSyncing); err != nil {
t.Fatal(err)
}
shutdownOnceSyncerVM := &shutdownOnceVM{VM: syncerVM}
t.Cleanup(func() {
require.NoError(shutdownOnceSyncerVM.Shutdown(context.Background()))
})
require.NoError(syncerVM.SetState(context.Background(), snow.StateSyncing))
enabled, err := syncerVM.StateSyncEnabled(context.Background())
assert.NoError(t, err)
assert.True(t, enabled)
require.NoError(err)
require.True(enabled)

// override [syncerVM]'s commit interval so the atomic trie works correctly.
syncerVM.atomicTrie.(*atomicTrie).commitInterval = test.syncableInterval
Expand All @@ -417,19 +369,20 @@ func createSyncServerAndClientVMs(t *testing.T, test syncTest) *syncVMSetup {
}

// connect peer to [syncerVM]
assert.NoError(t, syncerVM.Connected(
context.Background(),
serverVM.ctx.NodeID,
statesyncclient.StateSyncVersion,
))
require.NoError(
syncerVM.Connected(
context.Background(),
serverVM.ctx.NodeID,
statesyncclient.StateSyncVersion,
),
)

// override [syncerVM]'s SendAppRequest function to trigger AppRequest on [serverVM]
syncerAppSender.SendAppRequestF = func(ctx context.Context, nodeSet set.Set[ids.NodeID], requestID uint32, request []byte) error {
nodeID, hasItem := nodeSet.Pop()
if !hasItem {
t.Fatal("expected nodeSet to contain at least 1 nodeID")
}
go serverVM.AppRequest(ctx, nodeID, requestID, time.Now().Add(1*time.Second), request)
require.True(hasItem, "expected nodeSet to contain at least 1 nodeID")
err := serverVM.AppRequest(ctx, nodeID, requestID, time.Now().Add(1*time.Second), request)
require.NoError(err)
return nil
}

Expand All @@ -440,11 +393,12 @@ func createSyncServerAndClientVMs(t *testing.T, test syncTest) *syncVMSetup {
importTx,
exportTx,
},
fundedAccounts: accounts,
syncerVM: syncerVM,
syncerDBManager: syncerDBManager,
syncerEngineChan: syncerEngineChan,
syncerAtomicMemory: syncerAtomicMemory,
fundedAccounts: accounts,
syncerVM: syncerVM,
syncerDBManager: syncerDBManager,
syncerEngineChan: syncerEngineChan,
syncerAtomicMemory: syncerAtomicMemory,
shutdownOnceSyncerVM: shutdownOnceSyncerVM,
}
}

Expand All @@ -457,17 +411,22 @@ type syncVMSetup struct {
includedAtomicTxs []*Tx
fundedAccounts map[*keystore.Key]*types.StateAccount

syncerVM *VM
syncerDBManager manager.Manager
syncerEngineChan <-chan commonEng.Message
syncerAtomicMemory *atomic.Memory
syncerVM *VM
syncerDBManager manager.Manager
syncerEngineChan <-chan commonEng.Message
syncerAtomicMemory *atomic.Memory
shutdownOnceSyncerVM *shutdownOnceVM
}

// Teardown shuts down both VMs and asserts that both exit without error.
// Note: assumes both serverVM and sycnerVM have been initialized.
func (s *syncVMSetup) Teardown(t *testing.T) {
assert.NoError(t, s.serverVM.Shutdown(context.Background()))
assert.NoError(t, s.syncerVM.Shutdown(context.Background()))
type shutdownOnceVM struct {
*VM
shutdownOnce sync.Once
}

func (vm *shutdownOnceVM) Shutdown(ctx context.Context) error {
var err error
vm.shutdownOnce.Do(func() { err = vm.VM.Shutdown(ctx) })
return err
}

// syncTest contains both the actual VMs as well as the parameters with the expected output.
Expand All @@ -482,61 +441,50 @@ type syncTest struct {
func testSyncerVM(t *testing.T, vmSetup *syncVMSetup, test syncTest) {
t.Helper()
var (
require = require.New(t)
serverVM = vmSetup.serverVM
includedAtomicTxs = vmSetup.includedAtomicTxs
fundedAccounts = vmSetup.fundedAccounts
syncerVM = vmSetup.syncerVM
syncerEngineChan = vmSetup.syncerEngineChan
syncerAtomicMemory = vmSetup.syncerAtomicMemory
)

// get last summary and test related methods
summary, err := serverVM.GetLastStateSummary(context.Background())
if err != nil {
t.Fatal("error getting state sync last summary", "err", err)
}
require.NoError(err, "error getting state sync last summary")
parsedSummary, err := syncerVM.ParseStateSummary(context.Background(), summary.Bytes())
if err != nil {
t.Fatal("error getting state sync last summary", "err", err)
}
require.NoError(err, "error parsing state summary")
retrievedSummary, err := serverVM.GetStateSummary(context.Background(), parsedSummary.Height())
if err != nil {
t.Fatal("error when checking if summary is accepted", "err", err)
}
assert.Equal(t, summary, retrievedSummary)
require.NoError(err, "error getting state sync summary at height")
require.Equal(summary, retrievedSummary)

syncMode, err := parsedSummary.Accept(context.Background())
if err != nil {
t.Fatal("unexpected error accepting state summary", "err", err)
}
if syncMode != test.syncMode {
t.Fatal("unexpected value returned from accept", "expected", test.syncMode, "got", syncMode)
}
require.NoError(err, "error accepting state summary")
require.Equal(syncMode, test.syncMode)
if syncMode == block.StateSyncSkipped {
return
}

msg := <-syncerEngineChan
assert.Equal(t, commonEng.StateSyncDone, msg)
require.Equal(commonEng.StateSyncDone, msg)

// If the test is expected to error, assert the correct error is returned and finish the test.
err = syncerVM.StateSyncClient.Error()
if test.expectedErr != nil {
assert.ErrorIs(t, err, test.expectedErr)
assertSyncPerformedHeights(t, syncerVM.chaindb, map[uint64]struct{}{})
require.ErrorIs(err, test.expectedErr)
// Note we re-open the database here to avoid a closed error when the test is for a shutdown VM.
chaindb := Database{prefixdb.NewNested(ethDBPrefix, syncerVM.db)}
assertSyncPerformedHeights(t, chaindb, map[uint64]struct{}{})
return
}
if err != nil {
t.Fatal("state sync failed", err)
}
require.NoError(err, "state sync failed")

// set [syncerVM] to bootstrapping and verify the last accepted block has been updated correctly
// and that we can bootstrap and process some blocks.
if err := syncerVM.SetState(context.Background(), snow.Bootstrapping); err != nil {
t.Fatal(err)
}
assert.Equal(t, serverVM.LastAcceptedBlock().Height(), syncerVM.LastAcceptedBlock().Height(), "block height mismatch between syncer and server")
assert.Equal(t, serverVM.LastAcceptedBlock().ID(), syncerVM.LastAcceptedBlock().ID(), "blockID mismatch between syncer and server")
assert.True(t, syncerVM.blockChain.HasState(syncerVM.blockChain.LastAcceptedBlock().Root()), "unavailable state for last accepted block")
require.NoError(syncerVM.SetState(context.Background(), snow.Bootstrapping))
require.Equal(serverVM.LastAcceptedBlock().Height(), syncerVM.LastAcceptedBlock().Height(), "block height mismatch between syncer and server")
require.Equal(serverVM.LastAcceptedBlock().ID(), syncerVM.LastAcceptedBlock().ID(), "blockID mismatch between syncer and server")
require.True(syncerVM.blockChain.HasState(syncerVM.blockChain.LastAcceptedBlock().Root()), "unavailable state for last accepted block")
assertSyncPerformedHeights(t, syncerVM.chaindb, map[uint64]struct{}{retrievedSummary.Height(): {}})

blocksToBuild := 10
Expand All @@ -547,9 +495,7 @@ func testSyncerVM(t *testing.T, vmSetup *syncVMSetup, test syncTest) {
for k := range fundedAccounts {
tx := types.NewTransaction(gen.TxNonce(k.Address), toAddress, big.NewInt(1), 21000, initialBaseFee, nil)
signedTx, err := types.SignTx(tx, types.NewEIP155Signer(serverVM.chainID), k.PrivateKey)
if err != nil {
t.Fatal(err)
}
require.NoError(err)
gen.AddTx(signedTx)
i++
if i >= txsPerBlock {
Expand All @@ -559,8 +505,8 @@ func testSyncerVM(t *testing.T, vmSetup *syncVMSetup, test syncTest) {
})

// check we can transition to [NormalOp] state and continue to process blocks.
assert.NoError(t, syncerVM.SetState(context.Background(), snow.NormalOp))
assert.True(t, syncerVM.bootstrapped)
require.NoError(syncerVM.SetState(context.Background(), snow.NormalOp))
require.True(syncerVM.bootstrapped)

// check atomic memory was synced properly
syncerSharedMemories := newSharedMemories(syncerAtomicMemory, syncerVM.ctx.ChainID, syncerVM.ctx.XChainID)
Expand All @@ -575,9 +521,7 @@ func testSyncerVM(t *testing.T, vmSetup *syncVMSetup, test syncTest) {
for k := range fundedAccounts {
tx := types.NewTransaction(gen.TxNonce(k.Address), toAddress, big.NewInt(1), 21000, initialBaseFee, nil)
signedTx, err := types.SignTx(tx, types.NewEIP155Signer(serverVM.chainID), k.PrivateKey)
if err != nil {
t.Fatal(err)
}
require.NoError(err)
gen.AddTx(signedTx)
i++
if i >= txsPerBlock {
Expand Down

0 comments on commit 545b4f1

Please sign in to comment.