Skip to content

Commit

Permalink
add matrix and race to UTs (#530)
Browse files Browse the repository at this point in the history
* add matrix and race to UTs

* remove unnecessary runners

* clean script

* lazy read bonus blocks

* increase timeout

* increase frequency

* apply fix fow windows

* increase timeout

* disable fail fast

* increase frequency and timeout

* use latest

* fix coma

* better log

* fix require

* Fix formatting

* fix eventually formats

* test out new wg

* Revert "test out new wg"

This reverts commit 5d4a0a8.

* check tx indexes after each block accept

* mark as flaky

* disable blobpool

* add IsSubscribed for testing

* remove subscribed when return

* use atomic bool

* attempt to fix flaky indexing test
  • Loading branch information
ceyonur authored and darioush committed Apr 25, 2024
1 parent ca4f840 commit be03c95
Show file tree
Hide file tree
Showing 11 changed files with 133 additions and 79 deletions.
5 changes: 0 additions & 5 deletions core/blockchain_snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"fmt"
"math/big"
"os"
"path"
"strings"
"testing"

Expand All @@ -60,7 +59,6 @@ type snapshotTestBasic struct {

// share fields, set in runtime
datadir string
ancient string
db ethdb.Database
genDb ethdb.Database
engine consensus.Engine
Expand All @@ -72,7 +70,6 @@ type snapshotTestBasic struct {
func (basic *snapshotTestBasic) prepare(t *testing.T) (*BlockChain, []*types.Block) {
// Create a temporary persistent database
datadir := t.TempDir()
ancient := path.Join(datadir, "ancient")

db, err := rawdb.Open(rawdb.OpenOptions{
Directory: datadir,
Expand Down Expand Up @@ -130,7 +127,6 @@ func (basic *snapshotTestBasic) prepare(t *testing.T) (*BlockChain, []*types.Blo

// Set runtime fields
basic.datadir = datadir
basic.ancient = ancient
basic.db = db
basic.genDb = genDb
basic.engine = engine
Expand Down Expand Up @@ -212,7 +208,6 @@ func (basic *snapshotTestBasic) teardown() {
basic.db.Close()
basic.genDb.Close()
os.RemoveAll(basic.datadir)
os.RemoveAll(basic.ancient)
}

// snapshotTest is a test case type for normal snapshot recovery.
Expand Down
44 changes: 28 additions & 16 deletions core/blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -689,40 +689,49 @@ func TestTransactionSkipIndexing(t *testing.T) {

// test1: Init block chain and check all indices has been skipped.
chainDB := rawdb.NewMemoryDatabase()
chain, err := createAndInsertChain(chainDB, conf, gspec, blocks, common.Hash{})
chain, err := createAndInsertChain(chainDB, conf, gspec, blocks, common.Hash{},
func(b *types.Block) {
bNumber := b.NumberU64()
checkTxIndicesHelper(t, nil, bNumber+1, bNumber+1, bNumber, chainDB, false) // check all indices has been skipped
})
require.NoError(err)
currentBlockNumber := chain.CurrentBlock().Number.Uint64()
checkTxIndicesHelper(t, nil, currentBlockNumber+1, currentBlockNumber+1, currentBlockNumber, chainDB, false) // check all indices has been skipped
chain.Stop()

// test2: specify lookuplimit with tx index skipping enabled. Blocks should not be indexed but tail should be updated.
conf.TxLookupLimit = 2
chain, err = createAndInsertChain(chainDB, conf, gspec, blocks2[0:1], chain.CurrentHeader().Hash())
chainDB = rawdb.NewMemoryDatabase()
chain, err = createAndInsertChain(chainDB, conf, gspec, blocks, common.Hash{},
func(b *types.Block) {
bNumber := b.NumberU64()
tail := bNumber - conf.TxLookupLimit + 1
checkTxIndicesHelper(t, &tail, bNumber+1, bNumber+1, bNumber, chainDB, false) // check all indices has been skipped
})
require.NoError(err)
currentBlockNumber = chain.CurrentBlock().Number.Uint64()
tail := currentBlockNumber - conf.TxLookupLimit + 1
checkTxIndicesHelper(t, &tail, currentBlockNumber+1, currentBlockNumber+1, currentBlockNumber, chainDB, false) // check all indices has been skipped
chain.Stop()

// test3: tx index skipping and unindexer disabled. Blocks should be indexed and tail should be updated.
conf.TxLookupLimit = 0
conf.SkipTxIndexing = false
chainDB = rawdb.NewMemoryDatabase()
chain, err = createAndInsertChain(chainDB, conf, gspec, blocks, common.Hash{})
chain, err = createAndInsertChain(chainDB, conf, gspec, blocks, common.Hash{},
func(b *types.Block) {
bNumber := b.NumberU64()
checkTxIndicesHelper(t, nil, 0, bNumber, bNumber, chainDB, false) // check all indices has been indexed
})
require.NoError(err)
currentBlockNumber = chain.CurrentBlock().Number.Uint64()
checkTxIndicesHelper(t, nil, 0, currentBlockNumber, currentBlockNumber, chainDB, false) // check all indices has been indexed
chain.Stop()

// now change tx index skipping to true and check that the indices are skipped for the last block
// and old indices are removed up to the tail, but [tail, current) indices are still there.
conf.TxLookupLimit = 2
conf.SkipTxIndexing = true
chain, err = createAndInsertChain(chainDB, conf, gspec, blocks2[0:1], chain.CurrentHeader().Hash())
chain, err = createAndInsertChain(chainDB, conf, gspec, blocks2[0:1], chain.CurrentHeader().Hash(),
func(b *types.Block) {
bNumber := b.NumberU64()
tail := bNumber - conf.TxLookupLimit + 1
checkTxIndicesHelper(t, &tail, tail, bNumber-1, bNumber, chainDB, false)
})
require.NoError(err)
currentBlockNumber = chain.CurrentBlock().Number.Uint64()
tail = currentBlockNumber - conf.TxLookupLimit + 1
checkTxIndicesHelper(t, &tail, tail, currentBlockNumber-1, currentBlockNumber, chainDB, false)
chain.Stop()
}

Expand Down Expand Up @@ -1299,7 +1308,7 @@ func TestEIP3651(t *testing.T) {
}
}

func createAndInsertChain(db ethdb.Database, cacheConfig *CacheConfig, gspec *Genesis, blocks types.Blocks, lastAcceptedHash common.Hash) (*BlockChain, error) {
func createAndInsertChain(db ethdb.Database, cacheConfig *CacheConfig, gspec *Genesis, blocks types.Blocks, lastAcceptedHash common.Hash, accepted func(*types.Block)) (*BlockChain, error) {
chain, err := createBlockChain(db, cacheConfig, gspec, lastAcceptedHash)
if err != nil {
return nil, err
Expand All @@ -1313,8 +1322,11 @@ func createAndInsertChain(db ethdb.Database, cacheConfig *CacheConfig, gspec *Ge
if err != nil {
return nil, err
}
chain.DrainAcceptorQueue()
if accepted != nil {
accepted(block)
}
}
chain.DrainAcceptorQueue()

return chain, nil
}
17 changes: 9 additions & 8 deletions core/test_blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -1478,18 +1479,18 @@ func CheckTxIndices(t *testing.T, expectedTail *uint64, head uint64, db ethdb.Da
// [indexedTo] is the block number to which the transactions should be indexed.
// [head] is the block number of the head block.
func checkTxIndicesHelper(t *testing.T, expectedTail *uint64, indexedFrom uint64, indexedTo uint64, head uint64, db ethdb.Database, allowNilBlocks bool) {
require := require.New(t)
if expectedTail == nil {
require.Nil(rawdb.ReadTxIndexTail(db))
require.Nil(t, rawdb.ReadTxIndexTail(db))
} else {
var stored uint64
tailValue := *expectedTail
require.Eventually(
func() bool {

require.EventuallyWithTf(t,
func(c *assert.CollectT) {
stored = *rawdb.ReadTxIndexTail(db)
return tailValue == stored
require.Equalf(t, tailValue, stored, "expected tail to be %d, found %d", tailValue, stored)
},
10*time.Second, 100*time.Millisecond, "expected tail to be %d eventually (was %d)", tailValue, stored)
30*time.Second, 500*time.Millisecond, "expected tail to be %d eventually", tailValue)
}

for i := uint64(0); i <= head; i++ {
Expand All @@ -1500,9 +1501,9 @@ func checkTxIndicesHelper(t *testing.T, expectedTail *uint64, indexedFrom uint64
for _, tx := range block.Transactions() {
index := rawdb.ReadTxLookupEntry(db, tx.Hash())
if i < indexedFrom {
require.Nilf(index, "Transaction indices should be deleted, number %d hash %s", i, tx.Hash().Hex())
require.Nilf(t, index, "Transaction indices should be deleted, number %d hash %s", i, tx.Hash().Hex())
} else if i <= indexedTo {
require.NotNilf(index, "Missing transaction indices, number %d hash %s", i, tx.Hash().Hex())
require.NotNilf(t, index, "Missing transaction indices, number %d hash %s", i, tx.Hash().Hex())
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion metrics/opentsdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ func TestExampleOpenTSB(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if have, want := w.String(), string(wantB); have != want {
want := strings.ReplaceAll(string(wantB), "\r\n", "\n")
if have := w.String(); have != want {
t.Errorf("\nhave:\n%v\nwant:\n%v\n", have, want)
t.Logf("have vs want:\n%v", findFirstDiffPos(have, want))
}
Expand Down
6 changes: 6 additions & 0 deletions plugin/evm/atomic_trie_repair_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ type atomicTrieRepairTest struct {

func TestAtomicTrieRepair(t *testing.T) {
require := require.New(t)
mainnetBonusBlocksParsed, bonusBlockMainnetHeights, err := readMainnetBonusBlocks()
require.NoError(err)
for name, test := range map[string]atomicTrieRepairTest{
"needs repair": {
setup: func(a *atomicTrie, db *versiondb.Database) {},
Expand Down Expand Up @@ -65,6 +67,8 @@ func (test atomicTrieRepairTest) test(t *testing.T) {
a := atomicBackend.AtomicTrie().(*atomicTrie)

// make a commit at a height larger than all bonus blocks
mainnetBonusBlocksParsed, bonusBlockMainnetHeights, err := readMainnetBonusBlocks()
require.NoError(err)
maxBonusBlockHeight := slices.Max(maps.Keys(mainnetBonusBlocksParsed))
commitHeight := nearestCommitHeight(maxBonusBlockHeight, commitInterval) + commitInterval
err = a.commit(commitHeight, types.EmptyRootHash)
Expand Down Expand Up @@ -98,6 +102,8 @@ func verifyAtomicTrieIsAlreadyRepaired(
// and empty slices as equal
expectedKeys := 0
expected := make(map[uint64]map[ids.ID][]byte)
mainnetBonusBlocksParsed, bonusBlockMainnetHeights, err := readMainnetBonusBlocks()
require.NoError(err)
for height, block := range mainnetBonusBlocksParsed {
txs, err := ExtractAtomicTxs(block.ExtData(), false, Codec)
require.NoError(err)
Expand Down
34 changes: 17 additions & 17 deletions plugin/evm/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,23 +35,19 @@ var (
)

var (
bonusBlockMainnetHeights = make(map[uint64]ids.ID)

//go:embed bonus_blocks.json
mainnetBonusBlocksJson []byte

// mainnetBonusBlocksParsed is a map of bonus block numbers to the parsed
// data. These blocks are hardcoded so nodes that do not have these blocks
// can add their atomic operations to the atomic trie so all nodes on have a
// canonical atomic trie.
// Initially, bonus blocks were not indexed into the atomic trie. However, a
// regression caused some nodes to index these blocks.
mainnetBonusBlocksParsed map[uint64]*types.Block = make(map[uint64]*types.Block)

errMissingUTXOs = errors.New("missing UTXOs")
)

func init() {
// readMainnetBonusBlocks returns maps of bonus block numbers to the parsed
// data and block numbers to block IDs. These blocks are hardcoded so nodes that do not have these blocks
// can add their atomic operations to the atomic trie so all nodes on have a
// canonical atomic trie.
// Initially, bonus blocks were not indexed into the atomic trie. However, a
// regression caused some nodes to index these blocks.
func readMainnetBonusBlocks() (map[uint64]*types.Block, map[uint64]ids.ID, error) {
mainnetBonusBlocks := map[uint64]string{
102972: "Njm9TcLUXRojZk8YhEM6ksvfiPdC1TME4zJvGaDXgzMCyB6oB",
103105: "BYqLB6xpqy7HsAgP2XNfGE8Ubg1uEzse5mBPTSJH9z5s8pvMa",
Expand Down Expand Up @@ -112,37 +108,41 @@ func init() {
103633: "2QiHZwLhQ3xLuyyfcdo5yCUfoSqWDvRZox5ECU19HiswfroCGp",
}

bonusBlockMainnetHeights := make(map[uint64]ids.ID)
for height, blkIDStr := range mainnetBonusBlocks {
blkID, err := ids.FromString(blkIDStr)
if err != nil {
panic(err)
return nil, nil, err
}
bonusBlockMainnetHeights[height] = blkID
}

var rlpMap map[uint64]string
mainnetBonusBlocksParsed := make(map[uint64]*types.Block)
err := json.Unmarshal(mainnetBonusBlocksJson, &rlpMap)
if err != nil {
panic(err)
return nil, nil, err
}
for height, rlpHex := range rlpMap {
expectedHash, ok := bonusBlockMainnetHeights[height]
if !ok {
panic(fmt.Sprintf("missing bonus block at height %d", height))
return nil, nil, fmt.Errorf("missing bonus block at height %d", height)
}
var ethBlock types.Block
if err := rlp.DecodeBytes(common.Hex2Bytes(rlpHex), &ethBlock); err != nil {
panic(fmt.Sprintf("failed to decode bonus block at height %d: %s", height, err))
return nil, nil, fmt.Errorf("failed to decode bonus block at height %d: %s", height, err)
}
if ids.ID(ethBlock.Hash()) != expectedHash {
panic(fmt.Sprintf("block ID mismatch at (%s != %s)", ids.ID(ethBlock.Hash()), expectedHash))
return nil, nil, fmt.Errorf("block ID mismatch at (%s != %s)", ids.ID(ethBlock.Hash()), expectedHash)
}

mainnetBonusBlocksParsed[height] = &ethBlock
}
if len(mainnetBonusBlocksParsed) != len(bonusBlockMainnetHeights) {
panic("mismatched bonus block heights")
return nil, nil, errors.New("mismatched bonus block heights")
}

return mainnetBonusBlocksParsed, bonusBlockMainnetHeights, nil
}

// Block implements the snowman.Block interface
Expand Down
17 changes: 16 additions & 1 deletion plugin/evm/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/ava-labs/avalanchego/ids"
Expand Down Expand Up @@ -132,10 +133,24 @@ type GossipEthTxPool struct {

bloom *gossip.BloomFilter
lock sync.RWMutex

// subscribed is set to true when the gossip subscription is active
// mostly used for testing
subscribed atomic.Bool
}

// IsSubscribed returns whether or not the gossip subscription is active.
func (g *GossipEthTxPool) IsSubscribed() bool {
return g.subscribed.Load()
}

func (g *GossipEthTxPool) Subscribe(ctx context.Context) {
g.mempool.SubscribeNewTxsEvent(g.pendingTxs)
sub := g.mempool.SubscribeNewTxsEvent(g.pendingTxs)
g.subscribed.CompareAndSwap(false, true)
defer func() {
sub.Unsubscribe()
g.subscribed.CompareAndSwap(true, false)
}()

for {
select {
Expand Down
20 changes: 11 additions & 9 deletions plugin/evm/gossip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -169,6 +170,10 @@ func TestGossipSubscribe(t *testing.T) {
defer cancel()
go gossipTxPool.Subscribe(ctx)

require.Eventually(func() bool {
return gossipTxPool.IsSubscribed()
}, 10*time.Second, 500*time.Millisecond, "expected gossipTxPool to be subscribed")

// create eth txs
ethTxs := getValidEthTxs(key, 10, big.NewInt(226*params.GWei))

Expand All @@ -178,20 +183,17 @@ func TestGossipSubscribe(t *testing.T) {
require.NoError(err, "failed adding tx to remote mempool")
}

require.Eventually(
func() bool {
require.EventuallyWithTf(
func(c *assert.CollectT) {
gossipTxPool.lock.RLock()
defer gossipTxPool.lock.RUnlock()

for _, tx := range ethTxs {
if !gossipTxPool.bloom.Has(&GossipEthTx{Tx: tx}) {
return false
}
for i, tx := range ethTxs {
require.Truef(gossipTxPool.bloom.Has(&GossipEthTx{Tx: tx}), "expected tx[%d] to be in bloom filter", i)
}
return true
},
10*time.Second,
10*time.Millisecond,
30*time.Second,
500*time.Millisecond,
"expected all transactions to eventually be in the bloom filter",
)
}
Expand Down
Loading

0 comments on commit be03c95

Please sign in to comment.