diff --git a/core/txpool/txpool.go b/core/txpool/txpool.go index 29be40f192..bc3dd52eb6 100644 --- a/core/txpool/txpool.go +++ b/core/txpool/txpool.go @@ -630,6 +630,21 @@ func (pool *TxPool) PendingSize() int { return count } +// IteratePending iterates over [pool.pending] until [f] returns false. +// The caller must not modify [tx]. +func (pool *TxPool) IteratePending(f func(tx *types.Transaction) bool) { + pool.mu.RLock() + defer pool.mu.RUnlock() + + for _, list := range pool.pending { + for _, tx := range list.txs.items { + if !f(tx) { + return + } + } + } +} + // Locals retrieves the accounts currently considered local by the pool. func (pool *TxPool) Locals() []common.Address { pool.mu.Lock() diff --git a/go.mod b/go.mod index 65a8c045ba..b4ee923abf 100644 --- a/go.mod +++ b/go.mod @@ -38,12 +38,14 @@ require ( github.com/tyler-smith/go-bip39 v1.1.0 github.com/urfave/cli/v2 v2.17.2-0.20221006022127-8f469abc00aa go.uber.org/goleak v1.2.1 + go.uber.org/mock v0.2.0 golang.org/x/crypto v0.1.0 golang.org/x/exp v0.0.0-20230206171751-46f607a40771 golang.org/x/sync v0.1.0 golang.org/x/sys v0.8.0 golang.org/x/text v0.8.0 golang.org/x/time v0.0.0-20220922220347-f3bd1da661af + google.golang.org/protobuf v1.30.0 gopkg.in/natefinch/lumberjack.v2 v2.0.0 ) @@ -126,7 +128,6 @@ require ( go.opentelemetry.io/otel/trace v1.11.0 // indirect go.opentelemetry.io/proto/otlp v0.19.0 // indirect go.uber.org/atomic v1.10.0 // indirect - go.uber.org/mock v0.2.0 // indirect go.uber.org/multierr v1.8.0 // indirect go.uber.org/zap v1.24.0 // indirect golang.org/x/net v0.8.0 // indirect @@ -134,7 +135,6 @@ require ( gonum.org/v1/gonum v0.11.0 // indirect google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4 // indirect google.golang.org/grpc v1.55.0 // indirect - google.golang.org/protobuf v1.30.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect gopkg.in/yaml.v2 v2.4.0 // indirect diff --git a/go.sum b/go.sum index 618ffefbe5..810df82f35 100644 --- a/go.sum +++ b/go.sum @@ -55,10 +55,8 @@ github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 h1:eMwmnE/GDgah github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= -github.com/ava-labs/avalanchego v1.10.9-rc.4 h1:vtavPfRiF6r1Zc6RV8/arEfVpe9GQsLWHbMfIWkHbMI= -github.com/ava-labs/avalanchego v1.10.9-rc.4/go.mod h1:vTBLl1zK36olfLRA7IUfdbvphWqlkuarIoXxvZTHZVw= -github.com/ava-labs/avalanchego v1.10.9 h1:qxhp3YoD2Wm/iIKP6Wb1isbkUPWmIrJxWgivDoL0obM= -github.com/ava-labs/avalanchego v1.10.9/go.mod h1:C8R5uiltpc8MQ62ixxgODR+15mesWF0aAw3H+Qrl9Iw= +github.com/ava-labs/avalanchego v1.10.10-rc.1 h1:dPJISEWqL3tdUShe6RuB8CFuXl3rsH8617sXbLBjkIE= +github.com/ava-labs/avalanchego v1.10.10-rc.1/go.mod h1:C8R5uiltpc8MQ62ixxgODR+15mesWF0aAw3H+Qrl9Iw= github.com/ava-labs/avalanchego v1.10.10-rc.2 h1:nlHc1JwKb5TEc9oqPU2exvOpazhxr11N2ym/LzYxv4k= github.com/ava-labs/avalanchego v1.10.10-rc.2/go.mod h1:BN97sZppDSvIMIfEjrLTjdPTFkGLkb0ISJHEcoxMMNk= github.com/aymerick/raymond v2.0.3-0.20180322193309-b565731e1464+incompatible/go.mod h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g= diff --git a/plugin/evm/gossip_mempool.go b/plugin/evm/gossip_mempool.go new file mode 100644 index 0000000000..bd74d8c5e0 --- /dev/null +++ b/plugin/evm/gossip_mempool.go @@ -0,0 +1,137 @@ +// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package evm + +import ( + "context" + "fmt" + "sync" + + "github.com/ava-labs/avalanchego/ids" + "github.com/ethereum/go-ethereum/log" + + "github.com/ava-labs/avalanchego/network/p2p/gossip" + + "github.com/ava-labs/coreth/core" + "github.com/ava-labs/coreth/core/txpool" + "github.com/ava-labs/coreth/core/types" +) + +var ( + _ gossip.Gossipable = (*GossipEthTx)(nil) + _ gossip.Gossipable = (*GossipAtomicTx)(nil) + _ gossip.Set[*GossipEthTx] = (*GossipEthTxPool)(nil) +) + +type GossipAtomicTx struct { + Tx *Tx +} + +func (tx *GossipAtomicTx) GetID() ids.ID { + return tx.Tx.ID() +} + +func (tx *GossipAtomicTx) Marshal() ([]byte, error) { + return tx.Tx.SignedBytes(), nil +} + +func (tx *GossipAtomicTx) Unmarshal(bytes []byte) error { + atomicTx, err := ExtractAtomicTx(bytes, Codec) + tx.Tx = atomicTx + + return err +} + +func NewGossipEthTxPool(mempool *txpool.TxPool) (*GossipEthTxPool, error) { + bloom, err := gossip.NewBloomFilter(txGossipBloomMaxItems, txGossipBloomFalsePositiveRate) + if err != nil { + return nil, fmt.Errorf("failed to initialize bloom filter: %w", err) + } + + return &GossipEthTxPool{ + mempool: mempool, + pendingTxs: make(chan core.NewTxsEvent), + bloom: bloom, + }, nil +} + +type GossipEthTxPool struct { + mempool *txpool.TxPool + pendingTxs chan core.NewTxsEvent + + bloom *gossip.BloomFilter + lock sync.RWMutex +} + +func (g *GossipEthTxPool) Subscribe(ctx context.Context) { + g.mempool.SubscribeNewTxsEvent(g.pendingTxs) + + for { + select { + case <-ctx.Done(): + log.Debug("shutting down subscription") + return + case pendingTxs := <-g.pendingTxs: + g.lock.Lock() + for _, pendingTx := range pendingTxs.Txs { + tx := &GossipEthTx{Tx: pendingTx} + g.bloom.Add(tx) + reset, err := gossip.ResetBloomFilterIfNeeded(g.bloom, txGossipMaxFalsePositiveRate) + if err != nil { + log.Error("failed to reset bloom filter", "err", err) + continue + } + + if reset { + log.Debug("resetting bloom filter", "reason", "reached max filled ratio") + + g.mempool.IteratePending(func(tx *types.Transaction) bool { + g.bloom.Add(&GossipEthTx{Tx: pendingTx}) + return true + }) + } + } + g.lock.Unlock() + } + } +} + +// Add enqueues the transaction to the mempool. Subscribe should be called +// to receive an event if tx is actually added to the mempool or not. +func (g *GossipEthTxPool) Add(tx *GossipEthTx) error { + return g.mempool.AddRemotes([]*types.Transaction{tx.Tx})[0] +} + +func (g *GossipEthTxPool) Iterate(f func(tx *GossipEthTx) bool) { + g.mempool.IteratePending(func(tx *types.Transaction) bool { + return f(&GossipEthTx{Tx: tx}) + }) +} + +func (g *GossipEthTxPool) GetFilter() ([]byte, []byte, error) { + g.lock.RLock() + defer g.lock.RUnlock() + + bloom, err := g.bloom.Bloom.MarshalBinary() + salt := g.bloom.Salt + + return bloom, salt[:], err +} + +type GossipEthTx struct { + Tx *types.Transaction +} + +func (tx *GossipEthTx) GetID() ids.ID { + return ids.ID(tx.Tx.Hash()) +} + +func (tx *GossipEthTx) Marshal() ([]byte, error) { + return tx.Tx.MarshalBinary() +} + +func (tx *GossipEthTx) Unmarshal(bytes []byte) error { + tx.Tx = &types.Transaction{} + return tx.Tx.UnmarshalBinary(bytes) +} diff --git a/plugin/evm/gossip_mempool_test.go b/plugin/evm/gossip_mempool_test.go new file mode 100644 index 0000000000..b206219295 --- /dev/null +++ b/plugin/evm/gossip_mempool_test.go @@ -0,0 +1,119 @@ +// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package evm + +import ( + "testing" + + "github.com/ava-labs/avalanchego/utils/crypto/secp256k1" + "github.com/ava-labs/avalanchego/vms/components/verify" + "github.com/stretchr/testify/require" + + "github.com/ava-labs/avalanchego/ids" +) + +func TestGossipAtomicTxMarshal(t *testing.T) { + require := require.New(t) + + expected := &GossipAtomicTx{ + Tx: &Tx{ + UnsignedAtomicTx: &UnsignedImportTx{}, + Creds: []verify.Verifiable{}, + }, + } + + key0 := testKeys[0] + require.NoError(expected.Tx.Sign(Codec, [][]*secp256k1.PrivateKey{{key0}})) + + bytes, err := expected.Marshal() + require.NoError(err) + + actual := &GossipAtomicTx{} + require.NoError(actual.Unmarshal(bytes)) + + require.NoError(err) + require.Equal(expected.GetID(), actual.GetID()) +} + +func TestAtomicMempoolIterate(t *testing.T) { + txs := []*GossipAtomicTx{ + { + Tx: &Tx{ + UnsignedAtomicTx: &TestUnsignedTx{ + IDV: ids.GenerateTestID(), + }, + }, + }, + { + Tx: &Tx{ + UnsignedAtomicTx: &TestUnsignedTx{ + IDV: ids.GenerateTestID(), + }, + }, + }, + } + + tests := []struct { + name string + add []*GossipAtomicTx + f func(tx *GossipAtomicTx) bool + possibleValues []*GossipAtomicTx + expectedLen int + }{ + { + name: "func matches nothing", + add: txs, + f: func(*GossipAtomicTx) bool { + return false + }, + possibleValues: nil, + }, + { + name: "func matches all", + add: txs, + f: func(*GossipAtomicTx) bool { + return true + }, + possibleValues: txs, + expectedLen: 2, + }, + { + name: "func matches subset", + add: txs, + f: func(tx *GossipAtomicTx) bool { + return tx.Tx == txs[0].Tx + }, + possibleValues: txs, + expectedLen: 1, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + require := require.New(t) + m, err := NewMempool(ids.Empty, 10) + require.NoError(err) + + for _, add := range tt.add { + require.NoError(m.Add(add)) + } + + matches := make([]*GossipAtomicTx, 0) + f := func(tx *GossipAtomicTx) bool { + match := tt.f(tx) + + if match { + matches = append(matches, tx) + } + + return match + } + + m.Iterate(f) + + require.Len(matches, tt.expectedLen) + require.Subset(tt.possibleValues, matches) + }) + } +} diff --git a/plugin/evm/gossiper_atomic_gossiping_test.go b/plugin/evm/gossiper_atomic_gossiping_test.go index 73e3ce17b3..6ded11967b 100644 --- a/plugin/evm/gossiper_atomic_gossiping_test.go +++ b/plugin/evm/gossiper_atomic_gossiping_test.go @@ -11,8 +11,8 @@ import ( "time" "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/snow" "github.com/ava-labs/avalanchego/utils/set" - "github.com/stretchr/testify/assert" "github.com/ava-labs/coreth/plugin/evm/message" @@ -22,10 +22,11 @@ import ( func TestMempoolAtmTxsIssueTxAndGossiping(t *testing.T) { assert := assert.New(t) - _, vm, _, sharedMemory, sender := GenesisVM(t, true, "", "", "") + _, vm, _, sharedMemory, sender := GenesisVM(t, false, "", "", "") defer func() { assert.NoError(vm.Shutdown(context.Background())) }() + assert.NoError(vm.Connected(context.Background(), ids.GenerateTestNodeID(), nil)) // Create conflicting transactions importTxs := createImportTxOptions(t, vm, sharedMemory) @@ -56,12 +57,15 @@ func TestMempoolAtmTxsIssueTxAndGossiping(t *testing.T) { return nil } + assert.NoError(vm.SetState(context.Background(), snow.NormalOp)) + // Optimistically gossip raw tx assert.NoError(vm.issueTx(tx, true /*=local*/)) time.Sleep(500 * time.Millisecond) gossipedLock.Lock() assert.Equal(1, gossiped) gossipedLock.Unlock() + assert.True(vm.mempool.bloom.Has(&GossipAtomicTx{Tx: tx})) // Test hash on retry assert.NoError(vm.gossiper.GossipAtomicTxs([]*Tx{tx})) diff --git a/plugin/evm/mempool.go b/plugin/evm/mempool.go index 25b67298f4..8c1c56474e 100644 --- a/plugin/evm/mempool.go +++ b/plugin/evm/mempool.go @@ -10,6 +10,8 @@ import ( "github.com/ava-labs/avalanchego/cache" "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/network/p2p/gossip" + "github.com/ava-labs/coreth/metrics" "github.com/ethereum/go-ethereum/log" ) @@ -69,12 +71,19 @@ type Mempool struct { txHeap *txHeap // utxoSpenders maps utxoIDs to the transaction consuming them in the mempool utxoSpenders map[ids.ID]*Tx + // bloom is a bloom filter containing the txs in the mempool + bloom *gossip.BloomFilter metrics *mempoolMetrics } // NewMempool returns a Mempool with [maxSize] -func NewMempool(AVAXAssetID ids.ID, maxSize int) *Mempool { +func NewMempool(AVAXAssetID ids.ID, maxSize int) (*Mempool, error) { + bloom, err := gossip.NewBloomFilter(txGossipBloomMaxItems, txGossipBloomFalsePositiveRate) + if err != nil { + return nil, fmt.Errorf("failed to initialize bloom filter: %w", err) + } + return &Mempool{ AVAXAssetID: AVAXAssetID, issuedTxs: make(map[ids.ID]*Tx), @@ -84,8 +93,9 @@ func NewMempool(AVAXAssetID ids.ID, maxSize int) *Mempool { txHeap: newTxHeap(maxSize), maxSize: maxSize, utxoSpenders: make(map[ids.ID]*Tx), + bloom: bloom, metrics: newMempoolMetrics(), - } + }, nil } // Len returns the number of transactions in the mempool @@ -125,6 +135,10 @@ func (m *Mempool) atomicTxGasPrice(tx *Tx) (uint64, error) { return burned / gasUsed, nil } +func (m *Mempool) Add(tx *GossipAtomicTx) error { + return m.AddTx(tx.Tx) +} + // Add attempts to add [tx] to the mempool and returns an error if // it could not be addeed to the mempool. func (m *Mempool) AddTx(tx *Tx) error { @@ -259,6 +273,21 @@ func (m *Mempool) addTx(tx *Tx, force bool) error { for utxoID := range utxoSet { m.utxoSpenders[utxoID] = tx } + + m.bloom.Add(&GossipAtomicTx{Tx: tx}) + reset, err := gossip.ResetBloomFilterIfNeeded(m.bloom, txGossipMaxFalsePositiveRate) + if err != nil { + return err + } + + if reset { + log.Debug("resetting bloom filter", "reason", "reached max filled ratio") + + for _, pendingTx := range m.txHeap.minHeap.items { + m.bloom.Add(&GossipAtomicTx{Tx: pendingTx.tx}) + } + } + // When adding [tx] to the mempool make sure that there is an item in Pending // to signal the VM to produce a block. Note: if the VM's buildStatus has already // been set to something other than [dontBuild], this will be ignored and won't be @@ -266,9 +295,31 @@ func (m *Mempool) addTx(tx *Tx, force bool) error { // and CancelCurrentTx. m.newTxs = append(m.newTxs, tx) m.addPending() + return nil } +func (m *Mempool) Iterate(f func(tx *GossipAtomicTx) bool) { + m.lock.RLock() + defer m.lock.RUnlock() + + for _, item := range m.txHeap.maxHeap.items { + if !f(&GossipAtomicTx{Tx: item.tx}) { + return + } + } +} + +func (m *Mempool) GetFilter() ([]byte, []byte, error) { + m.lock.RLock() + defer m.lock.RUnlock() + + bloom, err := m.bloom.Bloom.MarshalBinary() + salt := m.bloom.Salt + + return bloom, salt[:], err +} + // NextTx returns a transaction to be issued from the mempool. func (m *Mempool) NextTx() (*Tx, bool) { m.lock.Lock() diff --git a/plugin/evm/mempool_test.go b/plugin/evm/mempool_test.go new file mode 100644 index 0000000000..6d719a0ee2 --- /dev/null +++ b/plugin/evm/mempool_test.go @@ -0,0 +1,35 @@ +// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package evm + +import ( + "testing" + + "github.com/ava-labs/avalanchego/ids" + "github.com/stretchr/testify/require" +) + +func TestMempoolAddTx(t *testing.T) { + require := require.New(t) + m, err := NewMempool(ids.Empty, 5_000) + require.NoError(err) + + txs := make([]*GossipAtomicTx, 0) + for i := 0; i < 3_000; i++ { + tx := &GossipAtomicTx{ + Tx: &Tx{ + UnsignedAtomicTx: &TestUnsignedTx{ + IDV: ids.GenerateTestID(), + }, + }, + } + + txs = append(txs, tx) + require.NoError(m.Add(tx)) + } + + for _, tx := range txs { + require.True(m.bloom.Has(tx)) + } +} diff --git a/plugin/evm/tx_gossip_test.go b/plugin/evm/tx_gossip_test.go new file mode 100644 index 0000000000..890a9e2d06 --- /dev/null +++ b/plugin/evm/tx_gossip_test.go @@ -0,0 +1,254 @@ +// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package evm + +import ( + "context" + "math/big" + "sync" + "testing" + "time" + + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/network/p2p" + "github.com/ava-labs/avalanchego/network/p2p/gossip" + "github.com/ava-labs/avalanchego/proto/pb/sdk" + "github.com/ava-labs/avalanchego/snow/engine/common" + "github.com/ava-labs/avalanchego/snow/validators" + "github.com/ava-labs/avalanchego/utils" + "github.com/ava-labs/avalanchego/utils/crypto/secp256k1" + "github.com/ava-labs/avalanchego/utils/logging" + "github.com/ava-labs/avalanchego/utils/set" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" + + "go.uber.org/mock/gomock" + + "google.golang.org/protobuf/proto" + + "github.com/ava-labs/coreth/core" + "github.com/ava-labs/coreth/core/types" + "github.com/ava-labs/coreth/params" +) + +func TestEthTxGossip(t *testing.T) { + require := require.New(t) + + // set up prefunded address + importAmount := uint64(1_000_000_000) + issuer, vm, _, _, sender := GenesisVMWithUTXOs(t, true, genesisJSONLatest, "", "", map[ids.ShortID]uint64{ + testShortIDAddrs[0]: importAmount, + }) + defer func() { + require.NoError(vm.Shutdown(context.Background())) + }() + + importAccepted := make(chan core.NewTxPoolHeadEvent) + vm.txPool.SubscribeNewHeadEvent(importAccepted) + + importTx, err := vm.newImportTx(vm.ctx.XChainID, testEthAddrs[0], initialBaseFee, []*secp256k1.PrivateKey{testKeys[0]}) + require.NoError(err) + require.NoError(vm.issueTx(importTx, true)) + <-issuer + + blk, err := vm.BuildBlock(context.Background()) + require.NoError(err) + + require.NoError(blk.Verify(context.Background())) + require.NoError(vm.SetPreference(context.Background(), blk.ID())) + require.NoError(blk.Accept(context.Background())) + <-importAccepted + + // sender for the peer requesting gossip from [vm] + ctrl := gomock.NewController(t) + peerSender := common.NewMockSender(ctrl) + router := p2p.NewRouter(logging.NoLog{}, peerSender, prometheus.NewRegistry(), "") + + // we're only making client requests, so we don't need a server handler + client, err := router.RegisterAppProtocol(ethTxGossipProtocol, nil, nil) + require.NoError(err) + + emptyBloomFilter, err := gossip.NewBloomFilter(txGossipBloomMaxItems, txGossipBloomFalsePositiveRate) + require.NoError(err) + emptyBloomFilterBytes, err := emptyBloomFilter.Bloom.MarshalBinary() + require.NoError(err) + request := &sdk.PullGossipRequest{ + Filter: emptyBloomFilterBytes, + Salt: utils.RandomBytes(32), + } + + requestBytes, err := proto.Marshal(request) + require.NoError(err) + + wg := &sync.WaitGroup{} + + requestingNodeID := ids.GenerateTestNodeID() + peerSender.EXPECT().SendAppRequest(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Do(func(ctx context.Context, nodeIDs set.Set[ids.NodeID], requestID uint32, appRequestBytes []byte) { + go func() { + require.NoError(vm.AppRequest(ctx, requestingNodeID, requestID, time.Time{}, appRequestBytes)) + }() + }).AnyTimes() + + sender.SendAppResponseF = func(ctx context.Context, nodeID ids.NodeID, requestID uint32, appResponseBytes []byte) error { + go func() { + require.NoError(router.AppResponse(ctx, nodeID, requestID, appResponseBytes)) + }() + return nil + } + + // we only accept gossip requests from validators + mockValidatorSet, ok := vm.ctx.ValidatorState.(*validators.TestState) + require.True(ok) + mockValidatorSet.GetCurrentHeightF = func(context.Context) (uint64, error) { + return 0, nil + } + mockValidatorSet.GetValidatorSetF = func(context.Context, uint64, ids.ID) (map[ids.NodeID]*validators.GetValidatorOutput, error) { + return map[ids.NodeID]*validators.GetValidatorOutput{requestingNodeID: nil}, nil + } + + // Ask the VM for any new transactions. We should get nothing at first. + wg.Add(1) + onResponse := func(_ context.Context, nodeID ids.NodeID, responseBytes []byte, err error) { + require.NoError(err) + + response := &sdk.PullGossipResponse{} + require.NoError(proto.Unmarshal(responseBytes, response)) + require.Empty(response.Gossip) + wg.Done() + } + require.NoError(client.AppRequest(context.Background(), set.Set[ids.NodeID]{vm.ctx.NodeID: struct{}{}}, requestBytes, onResponse)) + wg.Wait() + + // Issue a tx to the VM + address := testEthAddrs[0] + key := testKeys[0].ToECDSA() + tx := types.NewTransaction(0, address, big.NewInt(10), 100_000, big.NewInt(params.LaunchMinGasPrice), nil) + signedTx, err := types.SignTx(tx, types.NewEIP155Signer(vm.chainID), key) + require.NoError(err) + + errs := vm.txPool.AddLocals([]*types.Transaction{signedTx}) + require.Len(errs, 1) + require.Nil(errs[0]) + + // wait so we aren't throttled by the vm + time.Sleep(5 * time.Second) + + // Ask the VM for new transactions. We should get the newly issued tx. + wg.Add(1) + onResponse = func(_ context.Context, nodeID ids.NodeID, responseBytes []byte, err error) { + require.NoError(err) + + response := &sdk.PullGossipResponse{} + require.NoError(proto.Unmarshal(responseBytes, response)) + require.Len(response.Gossip, 1) + + gotTx := &GossipEthTx{} + require.NoError(gotTx.Unmarshal(response.Gossip[0])) + require.Equal(signedTx.Hash(), gotTx.Tx.Hash()) + + wg.Done() + } + require.NoError(client.AppRequest(context.Background(), set.Set[ids.NodeID]{vm.ctx.NodeID: struct{}{}}, requestBytes, onResponse)) + wg.Wait() +} + +func TestAtomicTxGossip(t *testing.T) { + require := require.New(t) + + // set up prefunded address + importAmount := uint64(1_000_000_000) + issuer, vm, _, _, sender := GenesisVMWithUTXOs(t, true, genesisJSONApricotPhase0, "", "", map[ids.ShortID]uint64{ + testShortIDAddrs[0]: importAmount, + }) + + defer func() { + require.NoError(vm.Shutdown(context.Background())) + }() + + // sender for the peer requesting gossip from [vm] + ctrl := gomock.NewController(t) + peerSender := common.NewMockSender(ctrl) + router := p2p.NewRouter(logging.NoLog{}, peerSender, prometheus.NewRegistry(), "") + + // we're only making client requests, so we don't need a server handler + client, err := router.RegisterAppProtocol(atomicTxGossipProtocol, nil, nil) + require.NoError(err) + + emptyBloomFilter, err := gossip.NewBloomFilter(txGossipBloomMaxItems, txGossipBloomFalsePositiveRate) + require.NoError(err) + bloomBytes, err := emptyBloomFilter.Bloom.MarshalBinary() + require.NoError(err) + request := &sdk.PullGossipRequest{ + Filter: bloomBytes, + Salt: emptyBloomFilter.Salt[:], + } + requestBytes, err := proto.Marshal(request) + require.NoError(err) + + requestingNodeID := ids.GenerateTestNodeID() + wg := &sync.WaitGroup{} + peerSender.EXPECT().SendAppRequest(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Do(func(ctx context.Context, nodeIDs set.Set[ids.NodeID], requestID uint32, appRequestBytes []byte) { + go func() { + require.NoError(vm.AppRequest(ctx, requestingNodeID, requestID, time.Time{}, appRequestBytes)) + }() + }).AnyTimes() + + sender.SendAppResponseF = func(ctx context.Context, nodeID ids.NodeID, requestID uint32, appResponseBytes []byte) error { + go func() { + require.NoError(router.AppResponse(ctx, nodeID, requestID, appResponseBytes)) + }() + return nil + } + + // we only accept gossip requests from validators + mockValidatorSet, ok := vm.ctx.ValidatorState.(*validators.TestState) + require.True(ok) + mockValidatorSet.GetCurrentHeightF = func(context.Context) (uint64, error) { + return 0, nil + } + mockValidatorSet.GetValidatorSetF = func(context.Context, uint64, ids.ID) (map[ids.NodeID]*validators.GetValidatorOutput, error) { + return map[ids.NodeID]*validators.GetValidatorOutput{requestingNodeID: nil}, nil + } + + // Ask the VM for any new transactions. We should get nothing at first. + wg.Add(1) + onResponse := func(_ context.Context, nodeID ids.NodeID, responseBytes []byte, err error) { + require.NoError(err) + + response := &sdk.PullGossipResponse{} + require.NoError(proto.Unmarshal(responseBytes, response)) + require.Empty(response.Gossip) + wg.Done() + } + require.NoError(client.AppRequest(context.Background(), set.Set[ids.NodeID]{vm.ctx.NodeID: struct{}{}}, requestBytes, onResponse)) + wg.Wait() + + // issue a new tx to the vm + importTx, err := vm.newImportTx(vm.ctx.XChainID, testEthAddrs[0], initialBaseFee, []*secp256k1.PrivateKey{testKeys[0]}) + require.NoError(err) + + require.NoError(vm.issueTx(importTx, true /*=local*/)) + <-issuer + + // wait so we aren't throttled by the vm + time.Sleep(5 * time.Second) + + // Ask the VM for new transactions. We should get the newly issued tx. + wg.Add(1) + onResponse = func(_ context.Context, nodeID ids.NodeID, responseBytes []byte, err error) { + require.NoError(err) + + response := &sdk.PullGossipResponse{} + require.NoError(proto.Unmarshal(responseBytes, response)) + require.Len(response.Gossip, 1) + + gotTx := &GossipAtomicTx{} + require.NoError(gotTx.Unmarshal(response.Gossip[0])) + require.Equal(importTx.InputUTXOs(), gotTx.Tx.InputUTXOs()) + + wg.Done() + } + require.NoError(client.AppRequest(context.Background(), set.Set[ids.NodeID]{vm.ctx.NodeID: struct{}{}}, requestBytes, onResponse)) + wg.Wait() +} diff --git a/plugin/evm/tx_test.go b/plugin/evm/tx_test.go index 3438f16296..769690d272 100644 --- a/plugin/evm/tx_test.go +++ b/plugin/evm/tx_test.go @@ -150,7 +150,9 @@ func executeTxTest(t *testing.T, test atomicTxTest) { // If this test simulates processing txs during bootstrapping (where some verification is skipped), // initialize the block building goroutines normally initialized in SetState(snow.NormalOps). // This ensures that the VM can build a block correctly during the test. - vm.initBlockBuilding() + if err := vm.initBlockBuilding(); err != nil { + t.Fatal(err) + } } if err := vm.issueTx(tx, true /*=local*/); err != nil { diff --git a/plugin/evm/vm.go b/plugin/evm/vm.go index ecc60e9458..ab5e523079 100644 --- a/plugin/evm/vm.go +++ b/plugin/evm/vm.go @@ -18,6 +18,7 @@ import ( avalanchegoMetrics "github.com/ava-labs/avalanchego/api/metrics" "github.com/ava-labs/avalanchego/network/p2p" + "github.com/ava-labs/avalanchego/network/p2p/gossip" "github.com/ava-labs/coreth/consensus/dummy" corethConstants "github.com/ava-labs/coreth/constants" @@ -128,6 +129,41 @@ const ( bytesToIDCacheSize = 5 * units.MiB targetAtomicTxsSize = 40 * units.KiB + + // p2p app protocols + ethTxGossipProtocol = 0x0 + atomicTxGossipProtocol = 0x1 + + // gossip constants + txGossipBloomMaxItems = 8 * 1024 + txGossipBloomFalsePositiveRate = 0.01 + txGossipMaxFalsePositiveRate = 0.05 + txGossipTargetResponseSize = 20 * units.KiB + maxValidatorSetStaleness = time.Minute + throttlingPeriod = 10 * time.Second + throttlingLimit = 2 +) + +var ( + ethTxGossipConfig = gossip.Config{ + Namespace: "eth_tx_gossip", + Frequency: 10 * time.Second, + PollSize: 10, + } + ethTxGossipHandlerConfig = gossip.HandlerConfig{ + Namespace: "eth_tx_gossip", + TargetResponseSize: txGossipTargetResponseSize, + } + + atomicTxGossipConfig = gossip.Config{ + Namespace: "atomic_tx_gossip", + Frequency: 10 * time.Second, + PollSize: 10, + } + atomicTxGossipHandlerConfig = gossip.HandlerConfig{ + Namespace: "atomic_tx_gossip", + TargetResponseSize: txGossipTargetResponseSize, + } ) // Define the API endpoints for the VM @@ -211,6 +247,8 @@ func init() { // VM implements the snowman.ChainVM interface type VM struct { ctx *snow.Context + // [cancel] may be nil until [snow.NormalOp] starts + cancel context.CancelFunc // *chain.State helps to implement the VM interface by wrapping blocks // with an efficient caching layer. *chain.State @@ -277,7 +315,8 @@ type VM struct { client peer.NetworkClient networkCodec codec.Manager - router *p2p.Router + validators *p2p.Validators + router *p2p.Router // Metrics multiGatherer avalanchegoMetrics.MultiGatherer @@ -503,13 +542,17 @@ func (vm *VM) Initialize( vm.codec = Codec // TODO: read size from settings - vm.mempool = NewMempool(chainCtx.AVAXAssetID, defaultMempoolSize) + vm.mempool, err = NewMempool(chainCtx.AVAXAssetID, defaultMempoolSize) + if err != nil { + return fmt.Errorf("failed to initialize mempool: %w", err) + } if err := vm.initializeMetrics(); err != nil { return err } // initialize peer network + vm.validators = p2p.NewValidators(vm.ctx.Log, vm.ctx.SubnetID, vm.ctx.ValidatorState, maxValidatorSetStaleness) vm.router = p2p.NewRouter(vm.ctx.Log, appSender, vm.sdkMetrics, "p2p") vm.networkCodec = message.Codec vm.Network = peer.NewNetwork(vm.router, appSender, vm.networkCodec, message.CrossChainCodec, chainCtx.NodeID, vm.config.MaxOutboundActiveRequests, vm.config.MaxOutboundActiveCrossChainRequests) @@ -942,7 +985,9 @@ func (vm *VM) SetState(_ context.Context, state snow.State) error { return vm.fx.Bootstrapping() case snow.NormalOp: // Initialize goroutines related to block building once we enter normal operation as there is no need to handle mempool gossip before this point. - vm.initBlockBuilding() + if err := vm.initBlockBuilding(); err != nil { + return fmt.Errorf("failed to initialize block building: %w", err) + } vm.bootstrapped = true return vm.fx.Bootstrapped() default: @@ -951,13 +996,101 @@ func (vm *VM) SetState(_ context.Context, state snow.State) error { } // initBlockBuilding starts goroutines to manage block building -func (vm *VM) initBlockBuilding() { +func (vm *VM) initBlockBuilding() error { + ctx, cancel := context.WithCancel(context.TODO()) + vm.cancel = cancel + // NOTE: gossip network must be initialized first otherwise ETH tx gossip will not work. gossipStats := NewGossipStats() vm.gossiper = vm.createGossiper(gossipStats) vm.builder = vm.NewBlockBuilder(vm.toEngine) vm.builder.awaitSubmittedTxs() vm.Network.SetGossipHandler(NewGossipHandler(vm, gossipStats)) + + ethTxPool, err := NewGossipEthTxPool(vm.txPool) + if err != nil { + return err + } + vm.shutdownWg.Add(1) + go func() { + ethTxPool.Subscribe(ctx) + vm.shutdownWg.Done() + }() + + var ( + ethTxGossipHandler p2p.Handler + atomicTxGossipHandler p2p.Handler + ) + + ethTxGossipHandler, err = gossip.NewHandler[*GossipEthTx](ethTxPool, ethTxGossipHandlerConfig, vm.sdkMetrics) + if err != nil { + return err + } + ethTxGossipHandler = &p2p.ValidatorHandler{ + ValidatorSet: vm.validators, + Handler: &p2p.ThrottlerHandler{ + Throttler: p2p.NewSlidingWindowThrottler(throttlingPeriod, throttlingLimit), + Handler: ethTxGossipHandler, + }, + } + ethTxGossipClient, err := vm.router.RegisterAppProtocol(ethTxGossipProtocol, ethTxGossipHandler, vm.validators) + if err != nil { + return err + } + + atomicTxGossipHandler, err = gossip.NewHandler[*GossipAtomicTx](vm.mempool, atomicTxGossipHandlerConfig, vm.sdkMetrics) + if err != nil { + return err + } + + atomicTxGossipHandler = &p2p.ValidatorHandler{ + ValidatorSet: vm.validators, + Handler: &p2p.ThrottlerHandler{ + Throttler: p2p.NewSlidingWindowThrottler(throttlingPeriod, throttlingLimit), + Handler: atomicTxGossipHandler, + }, + } + + atomicTxGossipClient, err := vm.router.RegisterAppProtocol(atomicTxGossipProtocol, atomicTxGossipHandler, vm.validators) + if err != nil { + return err + } + + ethTxGossiper, err := gossip.NewGossiper[GossipEthTx, *GossipEthTx]( + ethTxGossipConfig, + vm.ctx.Log, + ethTxPool, + ethTxGossipClient, + vm.sdkMetrics, + ) + if err != nil { + return err + } + + vm.shutdownWg.Add(1) + go func() { + ethTxGossiper.Gossip(ctx) + vm.shutdownWg.Done() + }() + + atomicTxGossiper, err := gossip.NewGossiper[GossipAtomicTx, *GossipAtomicTx]( + atomicTxGossipConfig, + vm.ctx.Log, + vm.mempool, + atomicTxGossipClient, + vm.sdkMetrics, + ) + if err != nil { + return err + } + + vm.shutdownWg.Add(1) + go func() { + atomicTxGossiper.Gossip(ctx) + vm.shutdownWg.Done() + }() + + return nil } // setAppRequestHandlers sets the request handlers for the VM to serve state sync @@ -995,6 +1128,9 @@ func (vm *VM) Shutdown(context.Context) error { if vm.ctx == nil { return nil } + if vm.cancel != nil { + vm.cancel() + } vm.Network.Shutdown() if err := vm.StateSyncClient.Shutdown(); err != nil { log.Error("error stopping state syncer", "err", err) @@ -1342,7 +1478,7 @@ func (vm *VM) issueTx(tx *Tx, local bool) error { } return err } - // NOTE: Gossiping of the issued [Tx] is handled in [AddTx] + return nil }