Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use SDK Gossip #318

Merged
merged 37 commits into from
Sep 12, 2023
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
2fdd559
evm gossip
joshua-kim Jun 6, 2023
ca37577
nit
joshua-kim Sep 6, 2023
c93ee73
Update plugin/evm/gossip_mempool.go
joshua-kim Sep 8, 2023
bb513d6
nits
joshua-kim Sep 8, 2023
cdb9649
nit
joshua-kim Sep 8, 2023
33b8a06
nit
joshua-kim Sep 8, 2023
845b93d
nit
joshua-kim Sep 8, 2023
76cc2d2
Update plugin/evm/gossip_mempool.go
joshua-kim Sep 8, 2023
954f60f
Update plugin/evm/gossip_mempool.go
joshua-kim Sep 8, 2023
c83364f
Update plugin/evm/mempool.go
joshua-kim Sep 8, 2023
021f8bf
try go 1.20
joshua-kim Sep 8, 2023
8c56141
Revert "try go 1.20"
joshua-kim Sep 8, 2023
90398eb
Update plugin/evm/gossip_mempool.go
joshua-kim Sep 8, 2023
673f5ab
clean mod cache
joshua-kim Sep 8, 2023
234bdb1
Revert "clean mod cache"
joshua-kim Sep 8, 2023
61a1c16
version
joshua-kim Sep 8, 2023
2898ee5
nits
joshua-kim Sep 8, 2023
248a1ca
nit
joshua-kim Sep 8, 2023
0393cfb
nit
joshua-kim Sep 11, 2023
474a03c
nit
joshua-kim Sep 11, 2023
844ccd2
nit
joshua-kim Sep 11, 2023
db51063
nits
joshua-kim Sep 11, 2023
fbfa372
nit
joshua-kim Sep 11, 2023
ec525c9
nit
joshua-kim Sep 11, 2023
25d0bf3
fix ci
joshua-kim Sep 11, 2023
78698af
fix docker version
joshua-kim Sep 11, 2023
6d8f21e
nit
joshua-kim Sep 11, 2023
d4b722c
update go version in ci
joshua-kim Sep 11, 2023
2094e8d
nit
joshua-kim Sep 11, 2023
fe04ca8
nit
joshua-kim Sep 12, 2023
678f640
Squashed commit of the following:
joshua-kim Sep 12, 2023
d487de1
remove ctx field
joshua-kim Sep 12, 2023
0c5a462
add nil check
joshua-kim Sep 12, 2023
0e34744
oops
joshua-kim Sep 12, 2023
461170e
nit
joshua-kim Sep 12, 2023
5a9a7e7
Squashed commit of the following:
joshua-kim Sep 12, 2023
1ffc8e2
Merge branch 'master' into sdk-gossip
joshua-kim Sep 12, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions core/txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,21 @@ func (pool *TxPool) PendingSize() int {
return count
}

// IteratePending iterates over [pool.pending] until [f] returns false.
// The caller must not modify [tx].
func (pool *TxPool) IteratePending(f func(tx *types.Transaction) bool) {
pool.mu.RLock()
defer pool.mu.RUnlock()

for _, list := range pool.pending {
for _, tx := range list.txs.items {
if !f(tx) {
return
}
}
}
Comment on lines +639 to +645
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to prioritize transactions by nonce or perhaps local addresses?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pending and items are both maps which have random-ish iteration orders so I'm not super concerned about starvation but it could be a good follow-up

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, the other concern would be if we end up sending only transactions with future nonces and don't include the necessary transaction to make them executable. This is an edge case when we hit the max size, so I don't think this needs to block the PR.

}

// Locals retrieves the accounts currently considered local by the pool.
func (pool *TxPool) Locals() []common.Address {
pool.mu.Lock()
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.19

require (
github.com/VictoriaMetrics/fastcache v1.10.0
github.com/ava-labs/avalanchego v1.10.9-rc.4
github.com/ava-labs/avalanchego v1.10.10-rc.1
github.com/cespare/cp v0.1.0
github.com/cockroachdb/pebble v0.0.0-20230209160836-829675f94811
github.com/davecgh/go-spew v1.1.1
Expand Down Expand Up @@ -38,12 +38,14 @@ require (
github.com/tyler-smith/go-bip39 v1.1.0
github.com/urfave/cli/v2 v2.17.2-0.20221006022127-8f469abc00aa
go.uber.org/goleak v1.2.1
go.uber.org/mock v0.2.0
golang.org/x/crypto v0.1.0
golang.org/x/exp v0.0.0-20230206171751-46f607a40771
golang.org/x/sync v0.1.0
golang.org/x/sys v0.8.0
golang.org/x/text v0.8.0
golang.org/x/time v0.0.0-20220922220347-f3bd1da661af
google.golang.org/protobuf v1.30.0
gopkg.in/natefinch/lumberjack.v2 v2.0.0
)

Expand Down Expand Up @@ -126,15 +128,13 @@ require (
go.opentelemetry.io/otel/trace v1.11.0 // indirect
go.opentelemetry.io/proto/otlp v0.19.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/mock v0.2.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
go.uber.org/zap v1.24.0 // indirect
golang.org/x/net v0.8.0 // indirect
golang.org/x/term v0.7.0 // indirect
gonum.org/v1/gonum v0.11.0 // indirect
google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4 // indirect
google.golang.org/grpc v1.55.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 h1:eMwmnE/GDgah
github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/ava-labs/avalanchego v1.10.9-rc.4 h1:vtavPfRiF6r1Zc6RV8/arEfVpe9GQsLWHbMfIWkHbMI=
github.com/ava-labs/avalanchego v1.10.9-rc.4/go.mod h1:vTBLl1zK36olfLRA7IUfdbvphWqlkuarIoXxvZTHZVw=
github.com/ava-labs/avalanchego v1.10.10-rc.1 h1:dPJISEWqL3tdUShe6RuB8CFuXl3rsH8617sXbLBjkIE=
github.com/ava-labs/avalanchego v1.10.10-rc.1/go.mod h1:C8R5uiltpc8MQ62ixxgODR+15mesWF0aAw3H+Qrl9Iw=
github.com/aymerick/raymond v2.0.3-0.20180322193309-b565731e1464+incompatible/go.mod h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g=
github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
Expand Down
2 changes: 1 addition & 1 deletion peer/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -665,7 +665,7 @@ func TestSDKRouting(t *testing.T) {
protocol := 0
handler := &testSDKHandler{}
router := p2p.NewRouter(logging.NoLog{}, sender)
_, err := router.RegisterAppProtocol(uint64(protocol), handler)
_, err := router.RegisterAppProtocol(uint64(protocol), handler, nil)
require.NoError(err)

networkCodec := codec.NewManager(0)
Expand Down
148 changes: 148 additions & 0 deletions plugin/evm/gossip_mempool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package evm

import (
"context"
"fmt"
"sync"

"github.com/ava-labs/avalanchego/ids"
"github.com/ethereum/go-ethereum/log"

"github.com/ava-labs/avalanchego/network/p2p/gossip"

"github.com/ava-labs/coreth/core"
"github.com/ava-labs/coreth/core/txpool"
"github.com/ava-labs/coreth/core/types"
)

var (
_ gossip.Gossipable = (*GossipEthTx)(nil)
_ gossip.Gossipable = (*GossipAtomicTx)(nil)
_ gossip.Set[*GossipEthTx] = (*GossipEthTxPool)(nil)
)

type GossipAtomicTx struct {
Tx *Tx
}

func (tx *GossipAtomicTx) GetID() ids.ID {
return tx.Tx.ID()
}

func (tx *GossipAtomicTx) Marshal() ([]byte, error) {
return tx.Tx.SignedBytes(), nil
}

func (tx *GossipAtomicTx) Unmarshal(bytes []byte) error {
result := &Tx{}
joshua-kim marked this conversation as resolved.
Show resolved Hide resolved

_, err := Codec.Unmarshal(bytes, result)
joshua-kim marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}
unsignedBytes, err := Codec.Marshal(codecVersion, &result.UnsignedAtomicTx)
if err != nil {
return err
}

result.Initialize(unsignedBytes, bytes)
tx.Tx = result

return nil
}

func NewGossipEthTxPool(mempool *txpool.TxPool) (*GossipEthTxPool, error) {
bloom, err := gossip.NewBloomFilter(txGossipBloomMaxItems, txGossipBloomFalsePositiveRate)
if err != nil {
return nil, fmt.Errorf("failed to initialize bloom filter: %w", err)
}

return &GossipEthTxPool{
mempool: mempool,
pendingTxs: make(chan core.NewTxsEvent),
bloom: bloom,
}, nil
}

type GossipEthTxPool struct {
mempool *txpool.TxPool
pendingTxs chan core.NewTxsEvent

bloom *gossip.BloomFilter
lock sync.RWMutex
}

func (g *GossipEthTxPool) Subscribe(ctx context.Context) {
g.mempool.SubscribeNewTxsEvent(g.pendingTxs)

for {
select {
case <-ctx.Done():
log.Debug("shutting down subscription")
return
case pendingTxs := <-g.pendingTxs:
g.lock.Lock()
for _, pendingTx := range pendingTxs.Txs {
tx := &GossipEthTx{Tx: pendingTx}
g.bloom.Add(tx)
reset, err := gossip.ResetBloomFilterIfNeeded(g.bloom, txGossipBloomMaxFilledRatio)
joshua-kim marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
log.Error("failed to reset bloom filter", "err", err)
continue
}

if reset {
log.Debug("resetting bloom filter", "reason", "reached max filled ratio")

g.mempool.IteratePending(func(tx *types.Transaction) bool {
g.bloom.Add(&GossipEthTx{Tx: pendingTx})
return true
})
}
}
g.lock.Unlock()
}
}
}

// Add enqueues the transaction to the mempool. Subscribe should be called
// to receive an event if tx is actually added to the mempool or not.
func (g *GossipEthTxPool) Add(tx *GossipEthTx) error {
return g.mempool.AddRemotes([]*types.Transaction{tx.Tx})[0]
}

func (g *GossipEthTxPool) Iterate(f func(tx *GossipEthTx) bool) {
g.mempool.IteratePending(func(tx *types.Transaction) bool {
return f(&GossipEthTx{Tx: tx})
})
}

func (g *GossipEthTxPool) GetFilter() ([]byte, []byte, error) {
g.lock.RLock()
defer g.lock.RUnlock()

bloom, err := g.bloom.Bloom.MarshalBinary()
salt := g.bloom.Salt

return bloom, salt[:], err
}

type GossipEthTx struct {
Tx *types.Transaction
}

func (tx *GossipEthTx) GetID() ids.ID {
return ids.ID(tx.Tx.Hash())
}

func (tx *GossipEthTx) Marshal() ([]byte, error) {
return tx.Tx.MarshalBinary()
}

func (tx *GossipEthTx) Unmarshal(bytes []byte) error {
tx.Tx = &types.Transaction{}
return tx.Tx.UnmarshalBinary(bytes)
}
119 changes: 119 additions & 0 deletions plugin/evm/gossip_mempool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package evm

import (
"testing"

"github.com/ava-labs/avalanchego/utils/crypto/secp256k1"
"github.com/ava-labs/avalanchego/vms/components/verify"
"github.com/stretchr/testify/require"

"github.com/ava-labs/avalanchego/ids"
)

func TestGossipAtomicTxMarshal(t *testing.T) {
require := require.New(t)

expected := &GossipAtomicTx{
Tx: &Tx{
UnsignedAtomicTx: &UnsignedImportTx{},
Creds: []verify.Verifiable{},
},
}

key0 := testKeys[0]
require.NoError(expected.Tx.Sign(Codec, [][]*secp256k1.PrivateKey{{key0}}))

bytes, err := expected.Marshal()
require.NoError(err)

actual := &GossipAtomicTx{}
require.NoError(actual.Unmarshal(bytes))

require.NoError(err)
require.Equal(expected.GetID(), actual.GetID())
}

func TestAtomicMempoolIterate(t *testing.T) {
txs := []*GossipAtomicTx{
{
Tx: &Tx{
UnsignedAtomicTx: &TestUnsignedTx{
IDV: ids.GenerateTestID(),
},
},
},
{
Tx: &Tx{
UnsignedAtomicTx: &TestUnsignedTx{
IDV: ids.GenerateTestID(),
},
},
},
}

tests := []struct {
name string
add []*GossipAtomicTx
f func(tx *GossipAtomicTx) bool
possibleValues []*GossipAtomicTx
expectedLen int
}{
{
name: "func matches nothing",
add: txs,
f: func(*GossipAtomicTx) bool {
return false
},
possibleValues: nil,
},
{
name: "func matches all",
add: txs,
f: func(*GossipAtomicTx) bool {
return true
},
possibleValues: txs,
expectedLen: 2,
},
{
name: "func matches subset",
add: txs,
f: func(tx *GossipAtomicTx) bool {
return tx.Tx == txs[0].Tx
},
possibleValues: txs,
expectedLen: 1,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
require := require.New(t)
m, err := NewMempool(ids.Empty, 10)
require.NoError(err)

for _, add := range tt.add {
require.NoError(m.Add(add))
}

matches := make([]*GossipAtomicTx, 0)
f := func(tx *GossipAtomicTx) bool {
match := tt.f(tx)

if match {
matches = append(matches, tx)
}

return match
}

m.Iterate(f)

require.Len(matches, tt.expectedLen)
require.Subset(tt.possibleValues, matches)
})
}
}
8 changes: 6 additions & 2 deletions plugin/evm/gossiper_atomic_gossiping_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import (
"time"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/snow"
"github.com/ava-labs/avalanchego/utils/set"

"github.com/stretchr/testify/assert"

"github.com/ava-labs/coreth/plugin/evm/message"
Expand All @@ -22,10 +22,11 @@ import (
func TestMempoolAtmTxsIssueTxAndGossiping(t *testing.T) {
assert := assert.New(t)

_, vm, _, sharedMemory, sender := GenesisVM(t, true, "", "", "")
_, vm, _, sharedMemory, sender := GenesisVM(t, false, "", "", "")
defer func() {
assert.NoError(vm.Shutdown(context.Background()))
}()
assert.NoError(vm.Connected(context.Background(), ids.GenerateTestNodeID(), nil))

// Create conflicting transactions
importTxs := createImportTxOptions(t, vm, sharedMemory)
Expand Down Expand Up @@ -56,12 +57,15 @@ func TestMempoolAtmTxsIssueTxAndGossiping(t *testing.T) {
return nil
}

assert.NoError(vm.SetState(context.Background(), snow.NormalOp))

// Optimistically gossip raw tx
assert.NoError(vm.issueTx(tx, true /*=local*/))
time.Sleep(500 * time.Millisecond)
gossipedLock.Lock()
assert.Equal(1, gossiped)
gossipedLock.Unlock()
assert.True(vm.mempool.bloom.Has(&GossipAtomicTx{Tx: tx}))

// Test hash on retry
assert.NoError(vm.gossiper.GossipAtomicTxs([]*Tx{tx}))
Expand Down
Loading
Loading