diff --git a/go.mod b/go.mod index c5f2f3898e..49825ad744 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.20 require ( github.com/VictoriaMetrics/fastcache v1.10.0 - github.com/ava-labs/avalanchego v1.10.16-rc.0 + github.com/ava-labs/avalanchego v1.10.17-rc.9 github.com/cespare/cp v0.1.0 github.com/cockroachdb/pebble v0.0.0-20230209160836-829675f94811 github.com/davecgh/go-spew v1.1.1 @@ -33,12 +33,11 @@ require ( github.com/spf13/pflag v1.0.5 github.com/spf13/viper v1.12.0 github.com/status-im/keycard-go v0.2.0 - github.com/stretchr/testify v1.8.1 + github.com/stretchr/testify v1.8.4 github.com/syndtr/goleveldb v1.0.1-0.20220614013038-64ee5596c38a github.com/tyler-smith/go-bip39 v1.1.0 github.com/urfave/cli/v2 v2.17.2-0.20221006022127-8f469abc00aa go.uber.org/goleak v1.2.1 - go.uber.org/mock v0.2.0 golang.org/x/crypto v0.14.0 golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df golang.org/x/sync v0.3.0 @@ -128,9 +127,9 @@ require ( go.opentelemetry.io/otel/sdk v1.11.0 // indirect go.opentelemetry.io/otel/trace v1.11.0 // indirect go.opentelemetry.io/proto/otlp v0.19.0 // indirect - go.uber.org/atomic v1.10.0 // indirect - go.uber.org/multierr v1.8.0 // indirect - go.uber.org/zap v1.24.0 // indirect + go.uber.org/mock v0.2.0 // indirect + go.uber.org/multierr v1.10.0 // indirect + go.uber.org/zap v1.26.0 // indirect golang.org/x/net v0.17.0 // indirect golang.org/x/term v0.13.0 // indirect gonum.org/v1/gonum v0.11.0 // indirect diff --git a/go.sum b/go.sum index 4acbaf03f4..6ce7fcdbf9 100644 --- a/go.sum +++ b/go.sum @@ -55,10 +55,11 @@ github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 h1:eMwmnE/GDgah github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= -github.com/ava-labs/avalanchego v1.10.16-rc.0 h1:I2k3hKjSr7PH5KKNxFeYiZ1PfyMlYvPXJZLnYU8uGsQ= -github.com/ava-labs/avalanchego v1.10.16-rc.0/go.mod h1:fHTzxKZOMdM0n4EEXDDR0V3Ieb/Jnz7PM8zAsJRsh2U= +github.com/ava-labs/avalanchego v1.10.17-rc.0.0.20231129204442-3a8cca5f31b9 h1:IDbRSp4QU5gcsC31uxOMc1089nTDhZpr6c+z7pHVNks= +github.com/ava-labs/avalanchego v1.10.17-rc.0.0.20231129204442-3a8cca5f31b9/go.mod h1:JuMiDdOmbTZfZty/RFszDPpKd1KEy9aivjxbcc1TPZo= +github.com/ava-labs/avalanchego v1.10.17-rc.9 h1:qMDE56M33lO8nkXMPmycN8uIT1BL42giR46f8j7kbhg= +github.com/ava-labs/avalanchego v1.10.17-rc.9/go.mod h1:GW0zr0OJfy9Vbki1ChOm47jv0gK+2tnBXP2NpzAiVzE= github.com/aymerick/raymond v2.0.3-0.20180322193309-b565731e1464+incompatible/go.mod h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g= -github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bits-and-blooms/bitset v1.7.0 h1:YjAGVd3XmtK9ktAbX8Zg2g2PwLIMjGREZJHlV4j7NEo= @@ -549,7 +550,6 @@ github.com/status-im/keycard-go v0.2.0 h1:QDLFswOQu1r5jsycloeQh3bVU8n/NatHHaZobt github.com/status-im/keycard-go v0.2.0/go.mod h1:wlp8ZLbsmrF6g6WjugPAx+IzoLrkdf9+mHxBEeo3Hbg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= -github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= @@ -558,8 +558,8 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= -github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/subosito/gotenv v1.3.0 h1:mjC+YW8QpAdXibNi+vNWgzmgBH4+5l5dCXv8cNysBLI= github.com/subosito/gotenv v1.3.0/go.mod h1:YzJjq/33h7nrwdY+iHMhEOEEbW0ovIz0tB6t6PwAXzs= github.com/supranational/blst v0.3.11 h1:LyU6FolezeWAhvQk0k6O/d49jqgO52MSDDfYgbeoEm4= @@ -627,17 +627,14 @@ go.opentelemetry.io/otel/trace v1.11.0/go.mod h1:nyYjis9jy0gytE9LXGU+/m1sHTKbRY0 go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= go.opentelemetry.io/proto/otlp v0.19.0 h1:IVN6GR+mhC4s5yfcTbmzHYODqvWAp3ZedA2SJPI1Nnw= go.opentelemetry.io/proto/otlp v0.19.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI405h3+duxN4U= -go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= -go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ= -go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4= go.uber.org/mock v0.2.0 h1:TaP3xedm7JaAgScZO7tlvlKrqT0p7I6OsdGB5YNSMDU= go.uber.org/mock v0.2.0/go.mod h1:J0y0rp9L3xiff1+ZBfKxlC1fz2+aO16tw0tsDOixfuM= -go.uber.org/multierr v1.8.0 h1:dg6GjLku4EH+249NNmoIciG9N/jURbDG+pFlTkhzIC8= -go.uber.org/multierr v1.8.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak= -go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60= -go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg= +go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ= +go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo= +go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so= golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= diff --git a/peer/network.go b/peer/network.go index 9110a76625..f59783b751 100644 --- a/peer/network.go +++ b/peer/network.go @@ -77,6 +77,10 @@ type Network interface { // TrackBandwidth should be called for each valid request with the bandwidth // (length of response divided by request time), and with 0 if the response is invalid. TrackBandwidth(nodeID ids.NodeID, bandwidth float64) + + // NewAppProtocol reserves a protocol identifier and returns a corresponding + // client to send messages with + NewAppProtocol(protocol uint64, handler p2p.Handler, options ...p2p.ClientOption) (*p2p.Client, error) } // network is an implementation of Network that processes message requests for @@ -88,16 +92,16 @@ type network struct { outstandingRequestHandlers map[uint32]message.ResponseHandler // maps avalanchego requestID => message.ResponseHandler activeAppRequests *semaphore.Weighted // controls maximum number of active outbound requests activeCrossChainRequests *semaphore.Weighted // controls maximum number of active outbound cross chain requests - router *p2p.Router // handles messages being sent to the generic networking SDK - appSender common.AppSender // avalanchego AppSender for sending messages - codec codec.Manager // Codec used for parsing messages - crossChainCodec codec.Manager // Codec used for parsing cross chain messages - appRequestHandler message.RequestHandler // maps request type => handler - crossChainRequestHandler message.CrossChainRequestHandler // maps cross chain request type => handler - gossipHandler message.GossipHandler // maps gossip type => handler - peers *peerTracker // tracking of peers & bandwidth - appStats stats.RequestHandlerStats // Provide request handler metrics - crossChainStats stats.RequestHandlerStats // Provide cross chain request handler metrics + network *p2p.Network + appSender common.AppSender // avalanchego AppSender for sending messages + codec codec.Manager // Codec used for parsing messages + crossChainCodec codec.Manager // Codec used for parsing cross chain messages + appRequestHandler message.RequestHandler // maps request type => handler + crossChainRequestHandler message.CrossChainRequestHandler // maps cross chain request type => handler + gossipHandler message.GossipHandler // maps gossip type => handler + peers *peerTracker // tracking of peers & bandwidth + appStats stats.RequestHandlerStats // Provide request handler metrics + crossChainStats stats.RequestHandlerStats // Provide cross chain request handler metrics // Set to true when Shutdown is called, after which all operations on this // struct are no-ops. @@ -110,9 +114,8 @@ type network struct { closed utils.Atomic[bool] } -func NewNetwork(router *p2p.Router, appSender common.AppSender, codec codec.Manager, crossChainCodec codec.Manager, self ids.NodeID, maxActiveAppRequests int64, maxActiveCrossChainRequests int64) Network { +func NewNetwork(p2pNetwork *p2p.Network, appSender common.AppSender, codec codec.Manager, crossChainCodec codec.Manager, self ids.NodeID, maxActiveAppRequests int64, maxActiveCrossChainRequests int64) Network { return &network{ - router: router, appSender: appSender, codec: codec, crossChainCodec: crossChainCodec, @@ -120,6 +123,7 @@ func NewNetwork(router *p2p.Router, appSender common.AppSender, codec codec.Mana outstandingRequestHandlers: make(map[uint32]message.ResponseHandler), activeAppRequests: semaphore.NewWeighted(maxActiveAppRequests), activeCrossChainRequests: semaphore.NewWeighted(maxActiveCrossChainRequests), + network: p2pNetwork, gossipHandler: message.NoopMempoolGossipHandler{}, appRequestHandler: message.NoopRequestHandler{}, crossChainRequestHandler: message.NoopCrossChainRequestHandler{}, @@ -326,8 +330,8 @@ func (n *network) AppRequest(ctx context.Context, nodeID ids.NodeID, requestID u var req message.Request if _, err := n.codec.Unmarshal(request, &req); err != nil { - log.Debug("forwarding AppRequest to SDK router", "nodeID", nodeID, "requestID", requestID, "requestLen", len(request), "err", err) - return n.router.AppRequest(ctx, nodeID, requestID, deadline, request) + log.Debug("forwarding AppRequest to SDK network", "nodeID", nodeID, "requestID", requestID, "requestLen", len(request), "err", err) + return n.network.AppRequest(ctx, nodeID, requestID, deadline, request) } bufferedDeadline, err := calculateTimeUntilDeadline(deadline, n.appStats) @@ -362,8 +366,8 @@ func (n *network) AppResponse(ctx context.Context, nodeID ids.NodeID, requestID handler, exists := n.markRequestFulfilled(requestID) if !exists { - log.Debug("forwarding AppResponse to SDK router", "nodeID", nodeID, "requestID", requestID, "responseLen", len(response)) - return n.router.AppResponse(ctx, nodeID, requestID, response) + log.Debug("forwarding AppResponse to SDK network", "nodeID", nodeID, "requestID", requestID, "responseLen", len(response)) + return n.network.AppResponse(ctx, nodeID, requestID, response) } // We must release the slot @@ -383,8 +387,8 @@ func (n *network) AppRequestFailed(ctx context.Context, nodeID ids.NodeID, reque handler, exists := n.markRequestFulfilled(requestID) if !exists { - log.Debug("forwarding AppRequestFailed to SDK router", "nodeID", nodeID, "requestID", requestID) - return n.router.AppRequestFailed(ctx, nodeID, requestID) + log.Debug("forwarding AppRequestFailed to SDK network", "nodeID", nodeID, "requestID", requestID) + return n.network.AppRequestFailed(ctx, nodeID, requestID) } // We must release the slot @@ -456,7 +460,7 @@ func (n *network) AppGossip(_ context.Context, nodeID ids.NodeID, gossipBytes [] } // Connected adds the given nodeID to the peer list so that it can receive messages -func (n *network) Connected(_ context.Context, nodeID ids.NodeID, nodeVersion *version.Application) error { +func (n *network) Connected(ctx context.Context, nodeID ids.NodeID, nodeVersion *version.Application) error { log.Debug("adding new peer", "nodeID", nodeID) n.lock.Lock() @@ -472,11 +476,11 @@ func (n *network) Connected(_ context.Context, nodeID ids.NodeID, nodeVersion *v } n.peers.Connected(nodeID, nodeVersion) - return nil + return n.network.Connected(ctx, nodeID, nodeVersion) } // Disconnected removes given [nodeID] from the peer list -func (n *network) Disconnected(_ context.Context, nodeID ids.NodeID) error { +func (n *network) Disconnected(ctx context.Context, nodeID ids.NodeID) error { log.Debug("disconnecting peer", "nodeID", nodeID) n.lock.Lock() defer n.lock.Unlock() @@ -486,7 +490,7 @@ func (n *network) Disconnected(_ context.Context, nodeID ids.NodeID) error { } n.peers.Disconnected(nodeID) - return nil + return n.network.Disconnected(ctx, nodeID) } // Shutdown disconnects all peers @@ -539,6 +543,10 @@ func (n *network) TrackBandwidth(nodeID ids.NodeID, bandwidth float64) { n.peers.TrackBandwidth(nodeID, bandwidth) } +func (n *network) NewAppProtocol(protocol uint64, handler p2p.Handler, options ...p2p.ClientOption) (*p2p.Client, error) { + return n.network.NewAppProtocol(protocol, handler, options...) +} + // invariant: peer/network must use explicitly even request ids. // for this reason, [n.requestID] is initialized as zero and incremented by 2. // This is for backwards-compatibility while the SDK router exists with the diff --git a/peer/network_test.go b/peer/network_test.go index 7f5fe482d5..c077e7736a 100644 --- a/peer/network_test.go +++ b/peer/network_test.go @@ -58,7 +58,7 @@ var ( func TestNetworkDoesNotConnectToItself(t *testing.T) { selfNodeID := ids.GenerateTestNodeID() - n := NewNetwork(p2p.NewRouter(logging.NoLog{}, nil, prometheus.NewRegistry(), ""), nil, nil, nil, selfNodeID, 1, 1) + n := NewNetwork(p2p.NewNetwork(logging.NoLog{}, nil, prometheus.NewRegistry(), ""), nil, nil, nil, selfNodeID, 1, 1) assert.NoError(t, n.Connected(context.Background(), selfNodeID, defaultPeerVersion)) assert.EqualValues(t, 0, n.Size()) } @@ -94,7 +94,7 @@ func TestRequestAnyRequestsRoutingAndResponse(t *testing.T) { codecManager := buildCodec(t, HelloRequest{}, HelloResponse{}) crossChainCodecManager := buildCodec(t, ExampleCrossChainRequest{}, ExampleCrossChainResponse{}) - net = NewNetwork(p2p.NewRouter(logging.NoLog{}, nil, prometheus.NewRegistry(), ""), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 16, 16) + net = NewNetwork(p2p.NewNetwork(logging.NoLog{}, nil, prometheus.NewRegistry(), ""), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 16, 16) net.SetRequestHandler(&HelloGreetingRequestHandler{codec: codecManager}) client := NewNetworkClient(net) nodeID := ids.GenerateTestNodeID() @@ -146,7 +146,7 @@ func TestAppRequestOnCtxCancellation(t *testing.T) { }, } - net := NewNetwork(p2p.NewRouter(logging.NoLog{}, nil, prometheus.NewRegistry(), ""), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1) + net := NewNetwork(p2p.NewNetwork(logging.NoLog{}, nil, prometheus.NewRegistry(), ""), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1) net.SetRequestHandler(&HelloGreetingRequestHandler{codec: codecManager}) requestMessage := HelloRequest{Message: "this is a request"} @@ -198,7 +198,7 @@ func TestRequestRequestsRoutingAndResponse(t *testing.T) { codecManager := buildCodec(t, HelloRequest{}, HelloResponse{}) crossChainCodecManager := buildCodec(t, ExampleCrossChainRequest{}, ExampleCrossChainResponse{}) - net = NewNetwork(p2p.NewRouter(logging.NoLog{}, nil, prometheus.NewRegistry(), ""), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 16, 16) + net = NewNetwork(p2p.NewNetwork(logging.NoLog{}, nil, prometheus.NewRegistry(), ""), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 16, 16) net.SetRequestHandler(&HelloGreetingRequestHandler{codec: codecManager}) client := NewNetworkClient(net) @@ -278,7 +278,7 @@ func TestAppRequestOnShutdown(t *testing.T) { codecManager := buildCodec(t, HelloRequest{}, HelloResponse{}) crossChainCodecManager := buildCodec(t, ExampleCrossChainRequest{}, ExampleCrossChainResponse{}) - net = NewNetwork(p2p.NewRouter(logging.NoLog{}, nil, prometheus.NewRegistry(), ""), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1) + net = NewNetwork(p2p.NewNetwork(logging.NoLog{}, nil, prometheus.NewRegistry(), ""), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1) client := NewNetworkClient(net) nodeID := ids.GenerateTestNodeID() require.NoError(t, net.Connected(context.Background(), nodeID, defaultPeerVersion)) @@ -327,7 +327,7 @@ func TestAppRequestAnyOnCtxCancellation(t *testing.T) { }, } - net := NewNetwork(p2p.NewRouter(logging.NoLog{}, nil, prometheus.NewRegistry(), ""), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1) + net := NewNetwork(p2p.NewNetwork(logging.NoLog{}, nil, prometheus.NewRegistry(), ""), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1) net.SetRequestHandler(&HelloGreetingRequestHandler{codec: codecManager}) assert.NoError(t, net.Connected( @@ -404,7 +404,7 @@ func TestRequestMinVersion(t *testing.T) { } // passing nil as codec works because the net.AppRequest is never called - net = NewNetwork(p2p.NewRouter(logging.NoLog{}, nil, prometheus.NewRegistry(), ""), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 16) + net = NewNetwork(p2p.NewNetwork(logging.NoLog{}, nil, prometheus.NewRegistry(), ""), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 16) client := NewNetworkClient(net) requestMessage := TestMessage{Message: "this is a request"} requestBytes, err := message.RequestToBytes(codecManager, requestMessage) @@ -468,7 +468,7 @@ func TestOnRequestHonoursDeadline(t *testing.T) { processingDuration: 500 * time.Millisecond, } - net = NewNetwork(p2p.NewRouter(logging.NoLog{}, nil, prometheus.NewRegistry(), ""), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1) + net = NewNetwork(p2p.NewNetwork(logging.NoLog{}, nil, prometheus.NewRegistry(), ""), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1) net.SetRequestHandler(requestHandler) nodeID := ids.GenerateTestNodeID() @@ -508,7 +508,7 @@ func TestGossip(t *testing.T) { } gossipHandler := &testGossipHandler{} - clientNetwork = NewNetwork(p2p.NewRouter(logging.NoLog{}, nil, prometheus.NewRegistry(), ""), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1) + clientNetwork = NewNetwork(p2p.NewNetwork(logging.NoLog{}, nil, prometheus.NewRegistry(), ""), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1) clientNetwork.SetGossipHandler(gossipHandler) assert.NoError(t, clientNetwork.Connected(context.Background(), nodeID, defaultPeerVersion)) @@ -535,7 +535,7 @@ func TestHandleInvalidMessages(t *testing.T) { requestID := uint32(1) sender := testAppSender{} - clientNetwork := NewNetwork(p2p.NewRouter(logging.NoLog{}, nil, prometheus.NewRegistry(), ""), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1) + clientNetwork := NewNetwork(p2p.NewNetwork(logging.NoLog{}, nil, prometheus.NewRegistry(), ""), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1) clientNetwork.SetGossipHandler(message.NoopMempoolGossipHandler{}) clientNetwork.SetRequestHandler(&testRequestHandler{}) @@ -584,7 +584,7 @@ func TestNetworkPropagatesRequestHandlerError(t *testing.T) { requestID := uint32(1) sender := testAppSender{} - clientNetwork := NewNetwork(p2p.NewRouter(logging.NoLog{}, nil, prometheus.NewRegistry(), ""), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1) + clientNetwork := NewNetwork(p2p.NewNetwork(logging.NoLog{}, nil, prometheus.NewRegistry(), ""), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1) clientNetwork.SetGossipHandler(message.NoopMempoolGossipHandler{}) clientNetwork.SetRequestHandler(&testRequestHandler{err: errors.New("fail")}) // Return an error from the request handler @@ -624,7 +624,7 @@ func TestCrossChainAppRequest(t *testing.T) { }, } - net = NewNetwork(p2p.NewRouter(logging.NoLog{}, nil, prometheus.NewRegistry(), ""), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1) + net = NewNetwork(p2p.NewNetwork(logging.NoLog{}, nil, prometheus.NewRegistry(), ""), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1) net.SetCrossChainRequestHandler(&testCrossChainHandler{codec: crossChainCodecManager}) client := NewNetworkClient(net) @@ -659,7 +659,7 @@ func TestCrossChainAppRequestOnCtxCancellation(t *testing.T) { }, } - net := NewNetwork(p2p.NewRouter(logging.NoLog{}, nil, prometheus.NewRegistry(), ""), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1) + net := NewNetwork(p2p.NewNetwork(logging.NoLog{}, nil, prometheus.NewRegistry(), ""), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1) net.SetCrossChainRequestHandler(&testCrossChainHandler{codec: crossChainCodecManager}) exampleCrossChainRequest := ExampleCrossChainRequest{ @@ -711,7 +711,7 @@ func TestCrossChainRequestRequestsRoutingAndResponse(t *testing.T) { codecManager := buildCodec(t, TestMessage{}) crossChainCodecManager := buildCodec(t, ExampleCrossChainRequest{}, ExampleCrossChainResponse{}) - net = NewNetwork(p2p.NewRouter(logging.NoLog{}, nil, prometheus.NewRegistry(), ""), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1) + net = NewNetwork(p2p.NewNetwork(logging.NoLog{}, nil, prometheus.NewRegistry(), ""), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1) net.SetCrossChainRequestHandler(&testCrossChainHandler{codec: crossChainCodecManager}) client := NewNetworkClient(net) @@ -771,7 +771,7 @@ func TestCrossChainRequestOnShutdown(t *testing.T) { } codecManager := buildCodec(t, TestMessage{}) crossChainCodecManager := buildCodec(t, ExampleCrossChainRequest{}, ExampleCrossChainResponse{}) - net = NewNetwork(p2p.NewRouter(logging.NoLog{}, nil, prometheus.NewRegistry(), ""), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1) + net = NewNetwork(p2p.NewNetwork(logging.NoLog{}, nil, prometheus.NewRegistry(), ""), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1) client := NewNetworkClient(net) exampleCrossChainRequest := ExampleCrossChainRequest{ @@ -812,7 +812,7 @@ func TestNetworkCrossChainAppRequestAfterShutdown(t *testing.T) { require.NoError(net.SendCrossChainRequest(context.Background(), ids.GenerateTestID(), nil, nil)) } -func TestSDKRouting(t *testing.T) { +func TestNetworkRouting(t *testing.T) { require := require.New(t) sender := &testAppSender{ sendAppRequestFn: func(_ context.Context, s set.Set[ids.NodeID], u uint32, bytes []byte) error { @@ -824,15 +824,15 @@ func TestSDKRouting(t *testing.T) { } protocol := 0 handler := &testSDKHandler{} - router := p2p.NewRouter(logging.NoLog{}, sender, prometheus.NewRegistry(), "") - _, err := router.RegisterAppProtocol(uint64(protocol), handler, nil) + p2pNetwork := p2p.NewNetwork(logging.NoLog{}, sender, prometheus.NewRegistry(), "") + _, err := p2pNetwork.NewAppProtocol(uint64(protocol), handler) require.NoError(err) networkCodec := codec.NewManager(0) crossChainCodec := codec.NewManager(0) network := NewNetwork( - router, + p2pNetwork, nil, networkCodec, crossChainCodec, diff --git a/plugin/evm/tx_gossip_test.go b/plugin/evm/tx_gossip_test.go index 2930839fd2..92bd3951a0 100644 --- a/plugin/evm/tx_gossip_test.go +++ b/plugin/evm/tx_gossip_test.go @@ -23,8 +23,6 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" - "go.uber.org/mock/gomock" - "google.golang.org/protobuf/proto" "github.com/ava-labs/coreth/core" @@ -61,12 +59,11 @@ func TestEthTxGossip(t *testing.T) { <-txPoolNewHeads // sender for the peer requesting gossip from [vm] - ctrl := gomock.NewController(t) - peerSender := common.NewMockSender(ctrl) - router := p2p.NewRouter(logging.NoLog{}, peerSender, prometheus.NewRegistry(), "") + peerSender := &common.SenderTest{} + router := p2p.NewNetwork(logging.NoLog{}, peerSender, prometheus.NewRegistry(), "") // we're only making client requests, so we don't need a server handler - client, err := router.RegisterAppProtocol(ethTxGossipProtocol, nil, nil) + client, err := router.NewAppProtocol(ethTxGossipProtocol, nil) require.NoError(err) emptyBloomFilter, err := gossip.NewBloomFilter(txGossipBloomMaxItems, txGossipBloomFalsePositiveRate) @@ -84,11 +81,12 @@ func TestEthTxGossip(t *testing.T) { wg := &sync.WaitGroup{} requestingNodeID := ids.GenerateTestNodeID() - peerSender.EXPECT().SendAppRequest(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Do(func(ctx context.Context, nodeIDs set.Set[ids.NodeID], requestID uint32, appRequestBytes []byte) { + peerSender.SendAppRequestF = func(ctx context.Context, nodeIDs set.Set[ids.NodeID], requestID uint32, appRequestBytes []byte) error { go func() { require.NoError(vm.AppRequest(ctx, requestingNodeID, requestID, time.Time{}, appRequestBytes)) }() - }).AnyTimes() + return nil + } sender.SendAppResponseF = func(ctx context.Context, nodeID ids.NodeID, requestID uint32, appResponseBytes []byte) error { go func() { @@ -98,6 +96,7 @@ func TestEthTxGossip(t *testing.T) { } // we only accept gossip requests from validators + require.NoError(vm.Network.Connected(context.Background(), requestingNodeID, nil)) mockValidatorSet, ok := vm.ctx.ValidatorState.(*validators.TestState) require.True(ok) mockValidatorSet.GetCurrentHeightF = func(context.Context) (uint64, error) { @@ -167,12 +166,11 @@ func TestAtomicTxGossip(t *testing.T) { }() // sender for the peer requesting gossip from [vm] - ctrl := gomock.NewController(t) - peerSender := common.NewMockSender(ctrl) - router := p2p.NewRouter(logging.NoLog{}, peerSender, prometheus.NewRegistry(), "") + peerSender := &common.SenderTest{} + network := p2p.NewNetwork(logging.NoLog{}, peerSender, prometheus.NewRegistry(), "") // we're only making client requests, so we don't need a server handler - client, err := router.RegisterAppProtocol(atomicTxGossipProtocol, nil, nil) + client, err := network.NewAppProtocol(atomicTxGossipProtocol, nil) require.NoError(err) emptyBloomFilter, err := gossip.NewBloomFilter(txGossipBloomMaxItems, txGossipBloomFalsePositiveRate) @@ -188,20 +186,22 @@ func TestAtomicTxGossip(t *testing.T) { requestingNodeID := ids.GenerateTestNodeID() wg := &sync.WaitGroup{} - peerSender.EXPECT().SendAppRequest(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Do(func(ctx context.Context, nodeIDs set.Set[ids.NodeID], requestID uint32, appRequestBytes []byte) { + peerSender.SendAppRequestF = func(ctx context.Context, _ set.Set[ids.NodeID], requestID uint32, appRequestBytes []byte) error { go func() { require.NoError(vm.AppRequest(ctx, requestingNodeID, requestID, time.Time{}, appRequestBytes)) }() - }).AnyTimes() + return nil + } sender.SendAppResponseF = func(ctx context.Context, nodeID ids.NodeID, requestID uint32, appResponseBytes []byte) error { go func() { - require.NoError(router.AppResponse(ctx, nodeID, requestID, appResponseBytes)) + require.NoError(network.AppResponse(ctx, nodeID, requestID, appResponseBytes)) }() return nil } // we only accept gossip requests from validators + require.NoError(vm.Network.Connected(context.Background(), requestingNodeID, nil)) mockValidatorSet, ok := vm.ctx.ValidatorState.(*validators.TestState) require.True(ok) mockValidatorSet.GetCurrentHeightF = func(context.Context) (uint64, error) { diff --git a/plugin/evm/vm.go b/plugin/evm/vm.go index 027567c6f8..27ebeb1c8f 100644 --- a/plugin/evm/vm.go +++ b/plugin/evm/vm.go @@ -327,7 +327,6 @@ type VM struct { networkCodec codec.Manager validators *p2p.Validators - router *p2p.Router // Metrics multiGatherer avalanchegoMetrics.MultiGatherer @@ -585,10 +584,10 @@ func (vm *VM) Initialize( } // initialize peer network - vm.validators = p2p.NewValidators(vm.ctx.Log, vm.ctx.SubnetID, vm.ctx.ValidatorState, maxValidatorSetStaleness) - vm.router = p2p.NewRouter(vm.ctx.Log, appSender, vm.sdkMetrics, "p2p") + p2pNetwork := p2p.NewNetwork(vm.ctx.Log, appSender, vm.sdkMetrics, "p2p") + vm.validators = p2p.NewValidators(p2pNetwork.Peers, vm.ctx.Log, vm.ctx.SubnetID, vm.ctx.ValidatorState, maxValidatorSetStaleness) vm.networkCodec = message.Codec - vm.Network = peer.NewNetwork(vm.router, appSender, vm.networkCodec, message.CrossChainCodec, chainCtx.NodeID, vm.config.MaxOutboundActiveRequests, vm.config.MaxOutboundActiveCrossChainRequests) + vm.Network = peer.NewNetwork(p2pNetwork, appSender, vm.networkCodec, message.CrossChainCodec, chainCtx.NodeID, vm.config.MaxOutboundActiveRequests, vm.config.MaxOutboundActiveCrossChainRequests) vm.client = peer.NewNetworkClient(vm.Network) // initialize warp backend @@ -1084,7 +1083,7 @@ func (vm *VM) initBlockBuilding() error { }, Log: vm.ctx.Log, } - ethTxGossipClient, err := vm.router.RegisterAppProtocol(ethTxGossipProtocol, ethTxGossipHandler, vm.validators) + ethTxGossipClient, err := vm.Network.NewAppProtocol(ethTxGossipProtocol, ethTxGossipHandler, p2p.WithValidatorSampling(vm.validators)) if err != nil { return err } @@ -1104,7 +1103,7 @@ func (vm *VM) initBlockBuilding() error { Log: vm.ctx.Log, } - atomicTxGossipClient, err := vm.router.RegisterAppProtocol(atomicTxGossipProtocol, atomicTxGossipHandler, vm.validators) + atomicTxGossipClient, err := vm.Network.NewAppProtocol(atomicTxGossipProtocol, atomicTxGossipHandler, p2p.WithValidatorSampling(vm.validators)) if err != nil { return err } diff --git a/scripts/versions.sh b/scripts/versions.sh index 0b7638c3fd..57179c4339 100644 --- a/scripts/versions.sh +++ b/scripts/versions.sh @@ -1,4 +1,4 @@ #!/usr/bin/env bash # Don't export them as they're used in the context of other calls -avalanche_version=${AVALANCHE_VERSION:-'v1.10.16-rc.0'} +avalanche_version=${AVALANCHE_VERSION:-'v1.10.17-rc.9'}