Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add p2p.Network component #2283

Merged
merged 39 commits into from
Nov 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
c073533
p2p network
joshua-kim Nov 7, 2023
ec6b9f9
add test
joshua-kim Nov 9, 2023
b84be08
nit
joshua-kim Nov 9, 2023
74765cb
nit
joshua-kim Nov 9, 2023
c54c7d5
go mod
joshua-kim Nov 9, 2023
f61113e
refactor validators
joshua-kim Nov 15, 2023
f377a8b
nit
joshua-kim Nov 15, 2023
2995647
nit
joshua-kim Nov 15, 2023
accd487
nit
joshua-kim Nov 15, 2023
be9e25a
nit
joshua-kim Nov 15, 2023
e1b726d
Merge branch 'dev' into network
joshua-kim Nov 17, 2023
f35fc75
nit
joshua-kim Nov 17, 2023
bc0c84a
Merge branch 'dev' into network
joshua-kim Nov 17, 2023
15a7af6
Merge branch 'dev' into network
joshua-kim Nov 21, 2023
2627182
Update network/p2p/validators.go
joshua-kim Nov 28, 2023
5b1d64a
Update network/p2p/network.go
joshua-kim Nov 28, 2023
f62f91a
Update network/p2p/network_test.go
joshua-kim Nov 28, 2023
4854748
Update network/p2p/network_test.go
joshua-kim Nov 28, 2023
d0440e8
Update network/p2p/validators_test.go
joshua-kim Nov 28, 2023
35ef6fe
unexport clientOptions
joshua-kim Nov 28, 2023
97bf26f
fix
joshua-kim Nov 28, 2023
e741d12
fix bug
joshua-kim Nov 28, 2023
1568dc7
nit
joshua-kim Nov 28, 2023
b6565dd
nit
joshua-kim Nov 28, 2023
438a0ab
nit
joshua-kim Nov 28, 2023
7c4a2af
Merge branch 'dev' into network
joshua-kim Nov 28, 2023
3cc6218
fix
joshua-kim Nov 28, 2023
af2740d
nit
joshua-kim Nov 28, 2023
e2781cf
nit
joshua-kim Nov 28, 2023
f512ead
nit
joshua-kim Nov 28, 2023
741f612
nti
joshua-kim Nov 28, 2023
bfa0638
Update network/p2p/validators.go
joshua-kim Nov 29, 2023
4701b31
nit
joshua-kim Nov 29, 2023
3a8cca5
Merge branch 'dev' into network
joshua-kim Nov 29, 2023
ee0eb08
nit
joshua-kim Nov 29, 2023
5fde830
tidy
joshua-kim Nov 29, 2023
6870966
Merge branch 'dev' into network
StephenButtolph Nov 29, 2023
7295103
Update coreth
StephenButtolph Nov 30, 2023
521f8c8
Merge branch 'dev' into network
StephenButtolph Nov 30, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (
github.com/DataDog/zstd v1.5.2
github.com/Microsoft/go-winio v0.5.2
github.com/NYTimes/gziphandler v1.1.1
github.com/ava-labs/coreth v0.12.9-rc.5
github.com/ava-labs/coreth v0.12.9-rc.7
github.com/ava-labs/ledger-avalanche/go v0.0.0-20231102202641-ae2ebdaeac34
github.com/btcsuite/btcd/btcutil v1.1.3
github.com/cockroachdb/pebble v0.0.0-20230209160836-829675f94811
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 h1:eMwmnE/GDgah
github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/ava-labs/coreth v0.12.9-rc.5 h1:xYBgNm1uOPfUdUNm8+fS8ellHnEd4qfFNb6uZHo9tqI=
github.com/ava-labs/coreth v0.12.9-rc.5/go.mod h1:rECKQfGFDeodrwGPlJSvFUJDbVr30jSMIVjQLi6pNX4=
github.com/ava-labs/coreth v0.12.9-rc.7 h1:AlCmXnrJwo0NxlEXQHysQgRQSCA14PZW6iyJmeVYB34=
github.com/ava-labs/coreth v0.12.9-rc.7/go.mod h1:yrf2vEah4Fgj6sJ4UpHewo4DLolwdpf2bJuLRT80PGw=
github.com/ava-labs/ledger-avalanche/go v0.0.0-20231102202641-ae2ebdaeac34 h1:mg9Uw6oZFJKytJxgxnl3uxZOs/SB8CVHg6Io4Tf99Zc=
github.com/ava-labs/ledger-avalanche/go v0.0.0-20231102202641-ae2ebdaeac34/go.mod h1:pJxaT9bUgeRNVmNRgtCHb7sFDIRKy7CzTQVi8gGNT6g=
github.com/aymerick/raymond v2.0.3-0.20180322193309-b565731e1464+incompatible/go.mod h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g=
Expand Down
7 changes: 3 additions & 4 deletions network/p2p/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,9 @@ type CrossChainAppResponseCallback func(
type Client struct {
handlerID uint64
handlerPrefix []byte
router *Router
router *router
sender common.AppSender
// nodeSampler is used to select nodes to route AppRequestAny to
nodeSampler NodeSampler
options *clientOptions
}

// AppRequestAny issues an AppRequest to an arbitrary node decided by Client.
Expand All @@ -56,7 +55,7 @@ func (c *Client) AppRequestAny(
appRequestBytes []byte,
onResponse AppResponseCallback,
) error {
sampled := c.nodeSampler.Sample(ctx, 1)
sampled := c.options.nodeSampler.Sample(ctx, 1)
if len(sampled) != 1 {
return ErrNoPeers
}
Expand Down
42 changes: 19 additions & 23 deletions network/p2p/gossip/gossip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ import (

"github.com/stretchr/testify/require"

"go.uber.org/mock/gomock"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/network/p2p"
"github.com/ava-labs/avalanchego/snow/engine/common"
Expand Down Expand Up @@ -117,10 +115,9 @@ func TestGossiperGossip(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
require := require.New(t)
ctrl := gomock.NewController(t)

responseSender := common.NewMockSender(ctrl)
responseRouter := p2p.NewRouter(logging.NoLog{}, responseSender, prometheus.NewRegistry(), "")
responseSender := &common.SenderTest{}
responseNetwork := p2p.NewNetwork(logging.NoLog{}, responseSender, prometheus.NewRegistry(), "")
responseBloom, err := NewBloomFilter(1000, 0.01)
require.NoError(err)
responseSet := testSet{
Expand All @@ -130,31 +127,30 @@ func TestGossiperGossip(t *testing.T) {
for _, item := range tt.responder {
require.NoError(responseSet.Add(item))
}
peers := &p2p.Peers{}
require.NoError(peers.Connected(context.Background(), ids.EmptyNodeID, nil))

handler, err := NewHandler[*testTx](responseSet, tt.config, prometheus.NewRegistry())
require.NoError(err)
_, err = responseRouter.RegisterAppProtocol(0x0, handler, peers)
_, err = responseNetwork.NewAppProtocol(0x0, handler)
require.NoError(err)

requestSender := common.NewMockSender(ctrl)
requestRouter := p2p.NewRouter(logging.NoLog{}, requestSender, prometheus.NewRegistry(), "")

gossiped := make(chan struct{})
requestSender.EXPECT().SendAppRequest(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Do(func(ctx context.Context, nodeIDs set.Set[ids.NodeID], requestID uint32, request []byte) {
requestSender := &common.SenderTest{
SendAppRequestF: func(ctx context.Context, nodeIDs set.Set[ids.NodeID], requestID uint32, request []byte) error {
go func() {
require.NoError(responseRouter.AppRequest(ctx, ids.EmptyNodeID, requestID, time.Time{}, request))
require.NoError(responseNetwork.AppRequest(ctx, ids.EmptyNodeID, requestID, time.Time{}, request))
}()
}).AnyTimes()
return nil
},
}

responseSender.EXPECT().
SendAppResponse(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Do(func(ctx context.Context, nodeID ids.NodeID, requestID uint32, appResponseBytes []byte) {
require.NoError(requestRouter.AppResponse(ctx, nodeID, requestID, appResponseBytes))
close(gossiped)
}).AnyTimes()
requestNetwork := p2p.NewNetwork(logging.NoLog{}, requestSender, prometheus.NewRegistry(), "")
require.NoError(requestNetwork.Connected(context.Background(), ids.EmptyNodeID, nil))

gossiped := make(chan struct{})
responseSender.SendAppResponseF = func(ctx context.Context, nodeID ids.NodeID, requestID uint32, appResponseBytes []byte) error {
require.NoError(requestNetwork.AppResponse(ctx, nodeID, requestID, appResponseBytes))
close(gossiped)
return nil
}

bloom, err := NewBloomFilter(1000, 0.01)
require.NoError(err)
Expand All @@ -166,7 +162,7 @@ func TestGossiperGossip(t *testing.T) {
require.NoError(requestSet.Add(item))
}

requestClient, err := requestRouter.RegisterAppProtocol(0x0, nil, peers)
requestClient, err := requestNetwork.NewAppProtocol(0x0, nil)
require.NoError(err)

config := Config{
Expand Down
188 changes: 188 additions & 0 deletions network/p2p/network.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package p2p

import (
"context"
"encoding/binary"
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/snow/engine/common"
"github.com/ava-labs/avalanchego/snow/validators"
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/avalanchego/utils/set"
"github.com/ava-labs/avalanchego/version"
)

var (
_ validators.Connector = (*Network)(nil)
_ common.AppHandler = (*Network)(nil)
_ NodeSampler = (*peerSampler)(nil)
)

// ClientOption configures Client
type ClientOption interface {
apply(options *clientOptions)
}

type clientOptionFunc func(options *clientOptions)

func (o clientOptionFunc) apply(options *clientOptions) {
o(options)
}

// WithValidatorSampling configures Client.AppRequestAny to sample validators
func WithValidatorSampling(validators *Validators) ClientOption {
return clientOptionFunc(func(options *clientOptions) {
options.nodeSampler = validators
})
}

// clientOptions holds client-configurable values
type clientOptions struct {
// nodeSampler is used to select nodes to route Client.AppRequestAny to
nodeSampler NodeSampler
}

// NewNetwork returns an instance of Network
func NewNetwork(
log logging.Logger,
sender common.AppSender,
metrics prometheus.Registerer,
namespace string,
) *Network {
return &Network{
Peers: &Peers{},
log: log,
sender: sender,
metrics: metrics,
namespace: namespace,
router: newRouter(log, sender, metrics, namespace),
}
}

// Network exposes networking state and supports building p2p application
// protocols
type Network struct {
Peers *Peers

log logging.Logger
sender common.AppSender
metrics prometheus.Registerer
namespace string

router *router
}

func (n *Network) AppRequest(ctx context.Context, nodeID ids.NodeID, requestID uint32, deadline time.Time, request []byte) error {
return n.router.AppRequest(ctx, nodeID, requestID, deadline, request)
}

func (n *Network) AppResponse(ctx context.Context, nodeID ids.NodeID, requestID uint32, response []byte) error {
return n.router.AppResponse(ctx, nodeID, requestID, response)
}

func (n *Network) AppRequestFailed(ctx context.Context, nodeID ids.NodeID, requestID uint32) error {
return n.router.AppRequestFailed(ctx, nodeID, requestID)
}

func (n *Network) AppGossip(ctx context.Context, nodeID ids.NodeID, msg []byte) error {
return n.router.AppGossip(ctx, nodeID, msg)
}

func (n *Network) CrossChainAppRequest(ctx context.Context, chainID ids.ID, requestID uint32, deadline time.Time, request []byte) error {
return n.router.CrossChainAppRequest(ctx, chainID, requestID, deadline, request)
}

func (n *Network) CrossChainAppResponse(ctx context.Context, chainID ids.ID, requestID uint32, response []byte) error {
return n.router.CrossChainAppResponse(ctx, chainID, requestID, response)
}

func (n *Network) CrossChainAppRequestFailed(ctx context.Context, chainID ids.ID, requestID uint32) error {
return n.router.CrossChainAppRequestFailed(ctx, chainID, requestID)
}

func (n *Network) Connected(_ context.Context, nodeID ids.NodeID, _ *version.Application) error {
n.Peers.add(nodeID)
return nil
}

func (n *Network) Disconnected(_ context.Context, nodeID ids.NodeID) error {
n.Peers.remove(nodeID)
return nil
}

// NewAppProtocol reserves an identifier for an application protocol handler and
// returns a Client that can be used to send messages for the corresponding
// protocol.
func (n *Network) NewAppProtocol(handlerID uint64, handler Handler, options ...ClientOption) (*Client, error) {
if err := n.router.addHandler(handlerID, handler); err != nil {
return nil, err
}

client := &Client{
handlerID: handlerID,
handlerPrefix: binary.AppendUvarint(nil, handlerID),
sender: n.sender,
router: n.router,
options: &clientOptions{
nodeSampler: &peerSampler{
peers: n.Peers,
},
},
}

for _, option := range options {
option.apply(client.options)
}
joshua-kim marked this conversation as resolved.
Show resolved Hide resolved

return client, nil
}

// Peers contains metadata about the current set of connected peers
type Peers struct {
lock sync.RWMutex
set set.SampleableSet[ids.NodeID]
}

func (p *Peers) add(nodeID ids.NodeID) {
p.lock.Lock()
defer p.lock.Unlock()

p.set.Add(nodeID)
}

func (p *Peers) remove(nodeID ids.NodeID) {
p.lock.Lock()
defer p.lock.Unlock()

p.set.Remove(nodeID)
}

func (p *Peers) has(nodeID ids.NodeID) bool {
p.lock.RLock()
defer p.lock.RUnlock()

return p.set.Contains(nodeID)
}

// Sample returns a pseudo-random sample of up to limit Peers
func (p *Peers) Sample(limit int) []ids.NodeID {
p.lock.RLock()
defer p.lock.RUnlock()

return p.set.Sample(limit)
}

type peerSampler struct {
peers *Peers
}

func (p peerSampler) Sample(_ context.Context, limit int) []ids.NodeID {
return p.peers.Sample(limit)
}
Loading
Loading