From 2211f58bc2a29ae1323e7fa8c53b3a07eff4336c Mon Sep 17 00:00:00 2001 From: Stephen Buttolph Date: Thu, 4 Apr 2024 14:47:42 -0400 Subject: [PATCH 1/5] Remove cancellation for Send*Request messages --- network/p2p/client.go | 20 ++++++++++++++++++-- x/sync/network_client.go | 9 ++++++++- 2 files changed, 26 insertions(+), 3 deletions(-) diff --git a/network/p2p/client.go b/network/p2p/client.go index 80d0118513e3..7fd6e6c35aa0 100644 --- a/network/p2p/client.go +++ b/network/p2p/client.go @@ -72,6 +72,14 @@ func (c *Client) AppRequest( appRequestBytes []byte, onResponse AppResponseCallback, ) error { + // Cancellation is removed from this context to avoid erroring unexpectedly. + // SendAppRequest should be non-blocking and any error other than context + // cancellation is unexpected. + // + // This guarantees that the router should never receive an unexpected + // AppResponse. + ctxWithoutCancel := context.WithoutCancel(ctx) + c.router.lock.Lock() defer c.router.lock.Unlock() @@ -87,7 +95,7 @@ func (c *Client) AppRequest( } if err := c.sender.SendAppRequest( - ctx, + ctxWithoutCancel, set.Of(nodeID), requestID, appRequestBytes, @@ -126,6 +134,14 @@ func (c *Client) CrossChainAppRequest( appRequestBytes []byte, onResponse CrossChainAppResponseCallback, ) error { + // Cancellation is removed from this context to avoid erroring unexpectedly. + // SendCrossChainAppRequest should be non-blocking and any error other than + // context cancellation is unexpected. + // + // This guarantees that the router should never receive an unexpected + // CrossChainAppResponse. + ctxWithoutCancel := context.WithoutCancel(ctx) + c.router.lock.Lock() defer c.router.lock.Unlock() @@ -139,7 +155,7 @@ func (c *Client) CrossChainAppRequest( } if err := c.sender.SendCrossChainAppRequest( - ctx, + ctxWithoutCancel, chainID, requestID, PrefixMessage(c.handlerPrefix, appRequestBytes), diff --git a/x/sync/network_client.go b/x/sync/network_client.go index 15f59cc5885a..18530d1c4e7c 100644 --- a/x/sync/network_client.go +++ b/x/sync/network_client.go @@ -286,7 +286,14 @@ func (c *networkClient) sendRequestLocked( // Send an app request to the peer. nodeIDs := set.Of(nodeID) - if err := c.appSender.SendAppRequest(ctx, nodeIDs, requestID, request); err != nil { + // Cancellation is removed from this context to avoid erroring unexpectedly. + // SendAppRequest should be non-blocking and any error other than context + // cancellation is unexpected. + // + // This guarantees that the network should never receive an unexpected + // AppResponse. + ctxWithoutCancel := context.WithoutCancel(ctx) + if err := c.appSender.SendAppRequest(ctxWithoutCancel, nodeIDs, requestID, request); err != nil { c.lock.Unlock() c.log.Fatal("failed to send app request", zap.Stringer("nodeID", nodeID), From 4d5c7ceb891f3cfe80459addf9e121278f115fee Mon Sep 17 00:00:00 2001 From: Stephen Buttolph Date: Thu, 4 Apr 2024 15:34:32 -0400 Subject: [PATCH 2/5] Add tests --- network/p2p/network_test.go | 80 +++++++++++++++++++++++++++++++++++++ 1 file changed, 80 insertions(+) diff --git a/network/p2p/network_test.go b/network/p2p/network_test.go index 73b6ef64d1a2..5339a6eeb315 100644 --- a/network/p2p/network_test.go +++ b/network/p2p/network_test.go @@ -180,6 +180,48 @@ func TestAppRequestResponse(t *testing.T) { <-done } +// Tests that the Client does not provide a cancelled context to the AppSender. +func TestAppRequestCancelledContext(t *testing.T) { + require := require.New(t) + ctx := context.Background() + + sentMessages := make(chan []byte, 1) + sender := &common.SenderTest{ + SendAppRequestF: func(ctx context.Context, _ set.Set[ids.NodeID], _ uint32, msgBytes []byte) error { + require.NoError(ctx.Err()) + sentMessages <- msgBytes + return nil + }, + } + network, err := NewNetwork(logging.NoLog{}, sender, prometheus.NewRegistry(), "") + require.NoError(err) + client := network.NewClient(handlerID) + + wantResponse := []byte("response") + wantNodeID := ids.GenerateTestNodeID() + done := make(chan struct{}) + + callback := func(_ context.Context, gotNodeID ids.NodeID, gotResponse []byte, err error) { + require.Equal(wantNodeID, gotNodeID) + require.NoError(err) + require.Equal(wantResponse, gotResponse) + + close(done) + } + + cancelledCtx, cancel := context.WithCancel(ctx) + cancel() + + want := []byte("request") + require.NoError(client.AppRequest(cancelledCtx, set.Of(wantNodeID), want, callback)) + got := <-sentMessages + require.Equal(handlerPrefix, got[0]) + require.Equal(want, got[1:]) + + require.NoError(network.AppResponse(ctx, wantNodeID, 1, wantResponse)) + <-done +} + // Tests that the Client callback is given an error if the request fails func TestAppRequestFailed(t *testing.T) { require := require.New(t) @@ -241,6 +283,44 @@ func TestCrossChainAppRequestResponse(t *testing.T) { <-done } +// Tests that the Client does not provide a cancelled context to the AppSender. +func TestCrossChainAppRequestCancelledContext(t *testing.T) { + require := require.New(t) + ctx := context.Background() + + sentMessages := make(chan []byte, 1) + sender := &common.SenderTest{ + SendCrossChainAppRequestF: func(ctx context.Context, _ ids.ID, _ uint32, msgBytes []byte) { + require.NoError(ctx.Err()) + sentMessages <- msgBytes + }, + } + network, err := NewNetwork(logging.NoLog{}, sender, prometheus.NewRegistry(), "") + require.NoError(err) + client := network.NewClient(handlerID) + + cancelledCtx, cancel := context.WithCancel(ctx) + cancel() + + wantChainID := ids.GenerateTestID() + wantResponse := []byte("response") + done := make(chan struct{}) + + callback := func(_ context.Context, gotChainID ids.ID, gotResponse []byte, err error) { + require.Equal(wantChainID, gotChainID) + require.NoError(err) + require.Equal(wantResponse, gotResponse) + + close(done) + } + + require.NoError(client.CrossChainAppRequest(cancelledCtx, wantChainID, []byte("request"), callback)) + <-sentMessages + + require.NoError(network.CrossChainAppResponse(ctx, wantChainID, 1, wantResponse)) + <-done +} + // Tests that the Client callback is given an error if the request fails func TestCrossChainAppRequestFailed(t *testing.T) { require := require.New(t) From 540c4f6fdfa20461fcbf700dbae878de47dd0bcb Mon Sep 17 00:00:00 2001 From: Stephen Buttolph Date: Thu, 4 Apr 2024 16:14:21 -0400 Subject: [PATCH 3/5] Update comments --- snow/engine/common/sender.go | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/snow/engine/common/sender.go b/snow/engine/common/sender.go index 09389061985e..69b53a899568 100644 --- a/snow/engine/common/sender.go +++ b/snow/engine/common/sender.go @@ -167,11 +167,16 @@ type QuerySender interface { // NetworkAppSender sends VM-level messages to nodes in the network. type NetworkAppSender interface { // Send an application-level request. - // A nil return value guarantees that for each nodeID in [nodeIDs], - // the VM corresponding to this AppSender eventually receives either: + // + // The VM corresponding to this AppSender may receive either: // * An AppResponse from nodeID with ID [requestID] // * An AppRequestFailed from nodeID with ID [requestID] - // Exactly one of the above messages will eventually be received per nodeID. + // + // A nil return value guarantees that the VM corresponding to this AppSender + // will receive exactly one of the above messages. + // + // A non-nil return value guarantees that the VM corresponding to this + // AppSender will receive at most one of the above messages. SendAppRequest(ctx context.Context, nodeIDs set.Set[ids.NodeID], requestID uint32, appRequestBytes []byte) error // Send an application-level response to a request. // This response must be in response to an AppRequest that the VM corresponding @@ -192,12 +197,16 @@ type CrossChainAppSender interface { // SendCrossChainAppRequest sends an application-level request to a // specific chain. // - // A nil return value guarantees that the VM corresponding to this - // CrossChainAppSender eventually receives either: + // The VM corresponding to this CrossChainAppSender may receive either: // * A CrossChainAppResponse from [chainID] with ID [requestID] // * A CrossChainAppRequestFailed from [chainID] with ID [requestID] - // Exactly one of the above messages will eventually be received from - // [chainID]. + // + // A nil return value guarantees that the VM corresponding to this + // CrossChainAppSender will eventually receive exactly one of the above + // messages. + // + // A non-nil return value guarantees that the VM corresponding to this + // CrossChainAppSender will receive at most one of the above messages. SendCrossChainAppRequest(ctx context.Context, chainID ids.ID, requestID uint32, appRequestBytes []byte) error // SendCrossChainAppResponse sends an application-level response to a // specific chain From 8e7ddf58d275f3370eb7727043ab3258906938d7 Mon Sep 17 00:00:00 2001 From: Stephen Buttolph Date: Thu, 4 Apr 2024 16:21:50 -0400 Subject: [PATCH 4/5] Add logs --- network/p2p/client.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/network/p2p/client.go b/network/p2p/client.go index 7fd6e6c35aa0..3a6a5e6af820 100644 --- a/network/p2p/client.go +++ b/network/p2p/client.go @@ -8,7 +8,10 @@ import ( "errors" "fmt" + "go.uber.org/zap" + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/message" "github.com/ava-labs/avalanchego/snow/engine/common" "github.com/ava-labs/avalanchego/utils/set" ) @@ -100,6 +103,12 @@ func (c *Client) AppRequest( requestID, appRequestBytes, ); err != nil { + c.router.log.Error("unexpected error when sending message", + zap.Stringer("op", message.AppRequestOp), + zap.Stringer("nodeID", nodeID), + zap.Uint32("requestID", requestID), + zap.Error(err), + ) return err } @@ -160,6 +169,12 @@ func (c *Client) CrossChainAppRequest( requestID, PrefixMessage(c.handlerPrefix, appRequestBytes), ); err != nil { + c.router.log.Error("unexpected error when sending message", + zap.Stringer("op", message.CrossChainAppRequestOp), + zap.Stringer("chainID", chainID), + zap.Uint32("requestID", requestID), + zap.Error(err), + ) return err } From 3a38fdbbfe9b01bd94a950bf1584f543ffa2135e Mon Sep 17 00:00:00 2001 From: Stephen Buttolph Date: Thu, 4 Apr 2024 17:36:07 -0400 Subject: [PATCH 5/5] consitency is key --- network/p2p/client.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/network/p2p/client.go b/network/p2p/client.go index 3a6a5e6af820..18556bfad1fa 100644 --- a/network/p2p/client.go +++ b/network/p2p/client.go @@ -128,8 +128,13 @@ func (c *Client) AppGossip( config common.SendConfig, appGossipBytes []byte, ) error { + // Cancellation is removed from this context to avoid erroring unexpectedly. + // SendAppGossip should be non-blocking and any error other than context + // cancellation is unexpected. + ctxWithoutCancel := context.WithoutCancel(ctx) + return c.sender.SendAppGossip( - ctx, + ctxWithoutCancel, config, PrefixMessage(c.handlerPrefix, appGossipBytes), )