From 7f8afae621449a270e50078c556cbcba5cc5e326 Mon Sep 17 00:00:00 2001 From: Joshua Kim <20001595+joshua-kim@users.noreply.github.com> Date: Tue, 6 Jun 2023 11:22:36 -0400 Subject: [PATCH] add sdk router to network --- peer/network.go | 42 ++++++++++-------- peer/network_test.go | 100 +++++++++++++++++++++++++++++++++++-------- plugin/evm/vm.go | 6 ++- 3 files changed, 110 insertions(+), 38 deletions(-) diff --git a/peer/network.go b/peer/network.go index 8c658a402f..e2210416a3 100644 --- a/peer/network.go +++ b/peer/network.go @@ -12,6 +12,8 @@ import ( "golang.org/x/sync/semaphore" + "github.com/ava-labs/avalanchego/network/p2p" + "github.com/ethereum/go-ethereum/log" "github.com/ava-labs/avalanchego/codec" @@ -87,23 +89,25 @@ type network struct { outstandingRequestHandlers map[uint32]message.ResponseHandler // maps avalanchego requestID => message.ResponseHandler activeAppRequests *semaphore.Weighted // controls maximum number of active outbound requests activeCrossChainRequests *semaphore.Weighted // controls maximum number of active outbound cross chain requests - appSender common.AppSender // avalanchego AppSender for sending messages - codec codec.Manager // Codec used for parsing messages - crossChainCodec codec.Manager // Codec used for parsing cross chain messages - appRequestHandler message.RequestHandler // maps request type => handler - crossChainRequestHandler message.CrossChainRequestHandler // maps cross chain request type => handler - gossipHandler message.GossipHandler // maps gossip type => handler - peers *peerTracker // tracking of peers & bandwidth - appStats stats.RequestHandlerStats // Provide request handler metrics - crossChainStats stats.RequestHandlerStats // Provide cross chain request handler metrics + router *p2p.Router + appSender common.AppSender // avalanchego AppSender for sending messages + codec codec.Manager // Codec used for parsing messages + crossChainCodec codec.Manager // Codec used for parsing cross chain messages + appRequestHandler message.RequestHandler // maps request type => handler + crossChainRequestHandler message.CrossChainRequestHandler // maps cross chain request type => handler + gossipHandler message.GossipHandler // maps gossip type => handler + peers *peerTracker // tracking of peers & bandwidth + appStats stats.RequestHandlerStats // Provide request handler metrics + crossChainStats stats.RequestHandlerStats // Provide cross chain request handler metrics // Set to true when Shutdown is called, after which all operations on this // struct are no-ops. closed utils.Atomic[bool] } -func NewNetwork(appSender common.AppSender, codec codec.Manager, crossChainCodec codec.Manager, self ids.NodeID, maxActiveAppRequests int64, maxActiveCrossChainRequests int64) Network { +func NewNetwork(router *p2p.Router, appSender common.AppSender, codec codec.Manager, crossChainCodec codec.Manager, self ids.NodeID, maxActiveAppRequests int64, maxActiveCrossChainRequests int64) Network { return &network{ + router: router, appSender: appSender, codec: codec, crossChainCodec: crossChainCodec, @@ -336,7 +340,9 @@ func (n *network) AppRequest(ctx context.Context, nodeID ids.NodeID, requestID u var req message.Request if _, err := n.codec.Unmarshal(request, &req); err != nil { log.Debug("failed to unmarshal app request", "nodeID", nodeID, "requestID", requestID, "requestLen", len(request), "err", err) - return nil + + // this might be an sdk request + return n.router.AppRequest(ctx, nodeID, requestID, deadline, request) } bufferedDeadline, err := calculateTimeUntilDeadline(deadline, n.appStats) @@ -366,7 +372,7 @@ func (n *network) AppRequest(ctx context.Context, nodeID ids.NodeID, requestID u // Error returned by this function is expected to be treated as fatal by the engine // If [requestID] is not known, this function will emit a log and return a nil error. // If the response handler returns an error it is propagated as a fatal error. -func (n *network) AppResponse(_ context.Context, nodeID ids.NodeID, requestID uint32, response []byte) error { +func (n *network) AppResponse(ctx context.Context, nodeID ids.NodeID, requestID uint32, response []byte) error { n.lock.Lock() defer n.lock.Unlock() @@ -378,9 +384,8 @@ func (n *network) AppResponse(_ context.Context, nodeID ids.NodeID, requestID ui handler, exists := n.markRequestFulfilled(requestID) if !exists { - // Should never happen since the engine should be managing outstanding requests - log.Error("received AppResponse to unknown request", "nodeID", nodeID, "requestID", requestID, "responseLen", len(response)) - return nil + log.Debug("forwarding unknown AppResponse to sdk", "nodeID", nodeID, "requestID", requestID, "responseLen", len(response)) + return n.router.AppResponse(ctx, nodeID, requestID, response) } // We must release the slot @@ -395,7 +400,7 @@ func (n *network) AppResponse(_ context.Context, nodeID ids.NodeID, requestID ui // - request times out before a response is provided // error returned by this function is expected to be treated as fatal by the engine // returns error only when the response handler returns an error -func (n *network) AppRequestFailed(_ context.Context, nodeID ids.NodeID, requestID uint32) error { +func (n *network) AppRequestFailed(ctx context.Context, nodeID ids.NodeID, requestID uint32) error { n.lock.Lock() defer n.lock.Unlock() @@ -407,9 +412,8 @@ func (n *network) AppRequestFailed(_ context.Context, nodeID ids.NodeID, request handler, exists := n.markRequestFulfilled(requestID) if !exists { - // Should never happen since the engine should be managing outstanding requests - log.Error("received AppRequestFailed to unknown request", "nodeID", nodeID, "requestID", requestID) - return nil + log.Debug("forwarding unknown AppRequestFailed to sdk", "nodeID", nodeID, "requestID", requestID) + return n.router.AppRequestFailed(ctx, nodeID, requestID) } // We must release the slot diff --git a/peer/network_test.go b/peer/network_test.go index 3e1c32f492..add57616de 100644 --- a/peer/network_test.go +++ b/peer/network_test.go @@ -12,7 +12,9 @@ import ( "testing" "time" + "github.com/ava-labs/avalanchego/network/p2p" "github.com/ava-labs/avalanchego/snow/engine/common" + "github.com/ava-labs/avalanchego/utils/logging" "github.com/ava-labs/avalanchego/utils/set" ethcommon "github.com/ethereum/go-ethereum/common" @@ -49,11 +51,13 @@ var ( _ message.CrossChainRequest = &ExampleCrossChainRequest{} _ message.CrossChainRequestHandler = &testCrossChainHandler{} + + _ p2p.Handler = &testSDKHandler{} ) func TestNetworkDoesNotConnectToItself(t *testing.T) { selfNodeID := ids.GenerateTestNodeID() - n := NewNetwork(nil, nil, nil, selfNodeID, 1, 1) + n := NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), nil, nil, nil, selfNodeID, 1, 1) assert.NoError(t, n.Connected(context.Background(), selfNodeID, defaultPeerVersion)) assert.EqualValues(t, 0, n.Size()) } @@ -89,7 +93,7 @@ func TestRequestAnyRequestsRoutingAndResponse(t *testing.T) { codecManager := buildCodec(t, HelloRequest{}, HelloResponse{}) crossChainCodecManager := buildCodec(t, ExampleCrossChainRequest{}, ExampleCrossChainResponse{}) - net = NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 16, 16) + net = NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 16, 16) net.SetRequestHandler(&HelloGreetingRequestHandler{codec: codecManager}) client := NewNetworkClient(net) nodeID := ids.GenerateTestNodeID() @@ -164,7 +168,7 @@ func TestRequestRequestsRoutingAndResponse(t *testing.T) { codecManager := buildCodec(t, HelloRequest{}, HelloResponse{}) crossChainCodecManager := buildCodec(t, ExampleCrossChainRequest{}, ExampleCrossChainResponse{}) - net = NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 16, 16) + net = NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 16, 16) net.SetRequestHandler(&HelloGreetingRequestHandler{codec: codecManager}) client := NewNetworkClient(net) @@ -244,7 +248,7 @@ func TestAppRequestOnShutdown(t *testing.T) { codecManager := buildCodec(t, HelloRequest{}, HelloResponse{}) crossChainCodecManager := buildCodec(t, ExampleCrossChainRequest{}, ExampleCrossChainResponse{}) - net = NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1) + net = NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1) client := NewNetworkClient(net) nodeID := ids.GenerateTestNodeID() require.NoError(t, net.Connected(context.Background(), nodeID, defaultPeerVersion)) @@ -293,7 +297,7 @@ func TestRequestMinVersion(t *testing.T) { } // passing nil as codec works because the net.AppRequest is never called - net = NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 16) + net = NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 16) client := NewNetworkClient(net) requestMessage := TestMessage{Message: "this is a request"} requestBytes, err := message.RequestToBytes(codecManager, requestMessage) @@ -356,7 +360,7 @@ func TestOnRequestHonoursDeadline(t *testing.T) { processingDuration: 500 * time.Millisecond, } - net = NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1) + net = NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1) net.SetRequestHandler(requestHandler) nodeID := ids.GenerateTestNodeID() @@ -396,7 +400,7 @@ func TestGossip(t *testing.T) { } gossipHandler := &testGossipHandler{} - clientNetwork = NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1) + clientNetwork = NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1) clientNetwork.SetGossipHandler(gossipHandler) assert.NoError(t, clientNetwork.Connected(context.Background(), nodeID, defaultPeerVersion)) @@ -423,7 +427,7 @@ func TestHandleInvalidMessages(t *testing.T) { requestID := uint32(1) sender := testAppSender{} - clientNetwork := NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1) + clientNetwork := NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1) clientNetwork.SetGossipHandler(message.NoopMempoolGossipHandler{}) clientNetwork.SetRequestHandler(&testRequestHandler{}) @@ -457,12 +461,11 @@ func TestHandleInvalidMessages(t *testing.T) { assert.NoError(t, clientNetwork.AppRequest(context.Background(), nodeID, requestID, time.Now().Add(time.Second), garbageResponse)) assert.NoError(t, clientNetwork.AppRequest(context.Background(), nodeID, requestID, time.Now().Add(time.Second), emptyResponse)) assert.NoError(t, clientNetwork.AppRequest(context.Background(), nodeID, requestID, time.Now().Add(time.Second), nilResponse)) - assert.NoError(t, clientNetwork.AppResponse(context.Background(), nodeID, requestID, gossipMsg)) - assert.NoError(t, clientNetwork.AppResponse(context.Background(), nodeID, requestID, requestMessage)) - assert.NoError(t, clientNetwork.AppResponse(context.Background(), nodeID, requestID, garbageResponse)) - assert.NoError(t, clientNetwork.AppResponse(context.Background(), nodeID, requestID, emptyResponse)) - assert.NoError(t, clientNetwork.AppResponse(context.Background(), nodeID, requestID, nilResponse)) - assert.NoError(t, clientNetwork.AppRequestFailed(context.Background(), nodeID, requestID)) + assert.ErrorIs(t, p2p.ErrUnrequestedResponse, clientNetwork.AppResponse(context.Background(), nodeID, requestID, gossipMsg)) + assert.ErrorIs(t, p2p.ErrUnrequestedResponse, clientNetwork.AppResponse(context.Background(), nodeID, requestID, requestMessage)) + assert.ErrorIs(t, p2p.ErrUnrequestedResponse, clientNetwork.AppResponse(context.Background(), nodeID, requestID, garbageResponse)) + assert.ErrorIs(t, p2p.ErrUnrequestedResponse, clientNetwork.AppResponse(context.Background(), nodeID, requestID, emptyResponse)) + assert.ErrorIs(t, p2p.ErrUnrequestedResponse, clientNetwork.AppResponse(context.Background(), nodeID, requestID, nilResponse)) } func TestNetworkPropagatesRequestHandlerError(t *testing.T) { @@ -473,7 +476,7 @@ func TestNetworkPropagatesRequestHandlerError(t *testing.T) { requestID := uint32(1) sender := testAppSender{} - clientNetwork := NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1) + clientNetwork := NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1) clientNetwork.SetGossipHandler(message.NoopMempoolGossipHandler{}) clientNetwork.SetRequestHandler(&testRequestHandler{err: errors.New("fail")}) // Return an error from the request handler @@ -513,7 +516,7 @@ func TestCrossChainAppRequest(t *testing.T) { }, } - net = NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1) + net = NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1) net.SetCrossChainRequestHandler(&testCrossChainHandler{codec: crossChainCodecManager}) client := NewNetworkClient(net) @@ -568,7 +571,7 @@ func TestCrossChainRequestRequestsRoutingAndResponse(t *testing.T) { codecManager := buildCodec(t, TestMessage{}) crossChainCodecManager := buildCodec(t, ExampleCrossChainRequest{}, ExampleCrossChainResponse{}) - net = NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1) + net = NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1) net.SetCrossChainRequestHandler(&testCrossChainHandler{codec: crossChainCodecManager}) client := NewNetworkClient(net) @@ -628,7 +631,7 @@ func TestCrossChainRequestOnShutdown(t *testing.T) { } codecManager := buildCodec(t, TestMessage{}) crossChainCodecManager := buildCodec(t, ExampleCrossChainRequest{}, ExampleCrossChainResponse{}) - net = NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1) + net = NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1) client := NewNetworkClient(net) exampleCrossChainRequest := ExampleCrossChainRequest{ @@ -649,6 +652,48 @@ func TestCrossChainRequestOnShutdown(t *testing.T) { require.True(t, called) } +func TestSDKRouting(t *testing.T) { + require := require.New(t) + sender := &testAppSender{ + sendAppRequestFn: func(s set.Set[ids.NodeID], u uint32, bytes []byte) error { + return nil + }, + sendAppResponseFn: func(id ids.NodeID, u uint32, bytes []byte) error { + return nil + }, + } + protocol := 0 + handler := &testSDKHandler{} + router := p2p.NewRouter(logging.NoLog{}, sender) + _, err := router.RegisterAppProtocol(uint64(protocol), handler) + require.NoError(err) + + networkCodec := codec.NewManager(0) + crossChainCodec := codec.NewManager(0) + + network := NewNetwork( + router, + nil, + networkCodec, + crossChainCodec, + ids.EmptyNodeID, + 1, + 1, + ) + + nodeID := ids.GenerateTestNodeID() + foobar := append([]byte{byte(protocol)}, []byte("foobar")...) + err = network.AppRequest(context.Background(), nodeID, 0, time.Time{}, foobar) + require.NoError(err) + require.True(handler.appRequested) + + err = network.AppResponse(context.Background(), ids.GenerateTestNodeID(), 0, foobar) + require.ErrorIs(err, p2p.ErrUnrequestedResponse) + + err = network.AppRequestFailed(context.Background(), nodeID, 0) + require.ErrorIs(err, p2p.ErrUnrequestedResponse) +} + func buildCodec(t *testing.T, types ...interface{}) codec.Manager { codecManager := codec.NewDefaultManager() c := linearcodec.NewDefault() @@ -850,3 +895,22 @@ type testCrossChainHandler struct { func (t *testCrossChainHandler) HandleCrossChainRequest(ctx context.Context, requestingChainID ids.ID, requestID uint32, exampleRequest message.CrossChainRequest) ([]byte, error) { return t.codec.Marshal(message.Version, ExampleCrossChainResponse{Response: "this is an example response"}) } + +type testSDKHandler struct { + appRequested bool +} + +func (t *testSDKHandler) AppGossip(ctx context.Context, nodeID ids.NodeID, gossipBytes []byte) error { + // TODO implement me + panic("implement me") +} + +func (t *testSDKHandler) AppRequest(ctx context.Context, nodeID ids.NodeID, deadline time.Time, requestBytes []byte) ([]byte, error) { + t.appRequested = true + return nil, nil +} + +func (t *testSDKHandler) CrossChainAppRequest(ctx context.Context, chainID ids.ID, deadline time.Time, requestBytes []byte) ([]byte, error) { + // TODO implement me + panic("implement me") +} diff --git a/plugin/evm/vm.go b/plugin/evm/vm.go index a62245587d..2e5a3f81b6 100644 --- a/plugin/evm/vm.go +++ b/plugin/evm/vm.go @@ -17,6 +17,7 @@ import ( "time" avalanchegoMetrics "github.com/ava-labs/avalanchego/api/metrics" + "github.com/ava-labs/avalanchego/network/p2p" "github.com/ava-labs/coreth/consensus/dummy" corethConstants "github.com/ava-labs/coreth/constants" @@ -276,6 +277,8 @@ type VM struct { client peer.NetworkClient networkCodec codec.Manager + router *p2p.Router + // Metrics multiGatherer avalanchegoMetrics.MultiGatherer @@ -506,8 +509,9 @@ func (vm *VM) Initialize( } // initialize peer network + vm.router = p2p.NewRouter(vm.ctx.Log, appSender) vm.networkCodec = message.Codec - vm.Network = peer.NewNetwork(appSender, vm.networkCodec, message.CrossChainCodec, chainCtx.NodeID, vm.config.MaxOutboundActiveRequests, vm.config.MaxOutboundActiveCrossChainRequests) + vm.Network = peer.NewNetwork(vm.router, appSender, vm.networkCodec, message.CrossChainCodec, chainCtx.NodeID, vm.config.MaxOutboundActiveRequests, vm.config.MaxOutboundActiveCrossChainRequests) vm.client = peer.NewNetworkClient(vm.Network) if err := vm.initializeChain(lastAcceptedHash); err != nil {