Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove cancellation for Send*AppRequest messages #2915

Merged
merged 5 commits into from
Apr 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 39 additions & 3 deletions network/p2p/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ import (
"errors"
"fmt"

"go.uber.org/zap"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/message"
"github.com/ava-labs/avalanchego/snow/engine/common"
"github.com/ava-labs/avalanchego/utils/set"
)
Expand Down Expand Up @@ -72,6 +75,14 @@ func (c *Client) AppRequest(
appRequestBytes []byte,
onResponse AppResponseCallback,
) error {
// Cancellation is removed from this context to avoid erroring unexpectedly.
// SendAppRequest should be non-blocking and any error other than context
// cancellation is unexpected.
//
// This guarantees that the router should never receive an unexpected
// AppResponse.
ctxWithoutCancel := context.WithoutCancel(ctx)

c.router.lock.Lock()
defer c.router.lock.Unlock()

Expand All @@ -87,11 +98,17 @@ func (c *Client) AppRequest(
}

if err := c.sender.SendAppRequest(
ctx,
ctxWithoutCancel,
set.Of(nodeID),
requestID,
appRequestBytes,
); err != nil {
c.router.log.Error("unexpected error when sending message",
zap.Stringer("op", message.AppRequestOp),
zap.Stringer("nodeID", nodeID),
zap.Uint32("requestID", requestID),
zap.Error(err),
)
return err
}

Expand All @@ -111,8 +128,13 @@ func (c *Client) AppGossip(
config common.SendConfig,
appGossipBytes []byte,
) error {
// Cancellation is removed from this context to avoid erroring unexpectedly.
// SendAppGossip should be non-blocking and any error other than context
// cancellation is unexpected.
ctxWithoutCancel := context.WithoutCancel(ctx)

return c.sender.SendAppGossip(
ctx,
ctxWithoutCancel,
config,
PrefixMessage(c.handlerPrefix, appGossipBytes),
)
Expand All @@ -126,6 +148,14 @@ func (c *Client) CrossChainAppRequest(
appRequestBytes []byte,
onResponse CrossChainAppResponseCallback,
) error {
// Cancellation is removed from this context to avoid erroring unexpectedly.
// SendCrossChainAppRequest should be non-blocking and any error other than
// context cancellation is unexpected.
//
// This guarantees that the router should never receive an unexpected
// CrossChainAppResponse.
ctxWithoutCancel := context.WithoutCancel(ctx)

c.router.lock.Lock()
defer c.router.lock.Unlock()

Expand All @@ -139,11 +169,17 @@ func (c *Client) CrossChainAppRequest(
}

if err := c.sender.SendCrossChainAppRequest(
ctx,
ctxWithoutCancel,
chainID,
requestID,
PrefixMessage(c.handlerPrefix, appRequestBytes),
); err != nil {
c.router.log.Error("unexpected error when sending message",
zap.Stringer("op", message.CrossChainAppRequestOp),
zap.Stringer("chainID", chainID),
zap.Uint32("requestID", requestID),
zap.Error(err),
)
return err
}

Expand Down
80 changes: 80 additions & 0 deletions network/p2p/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,48 @@ func TestAppRequestResponse(t *testing.T) {
<-done
}

// Tests that the Client does not provide a cancelled context to the AppSender.
func TestAppRequestCancelledContext(t *testing.T) {
require := require.New(t)
ctx := context.Background()

sentMessages := make(chan []byte, 1)
sender := &common.SenderTest{
SendAppRequestF: func(ctx context.Context, _ set.Set[ids.NodeID], _ uint32, msgBytes []byte) error {
require.NoError(ctx.Err())
sentMessages <- msgBytes
return nil
},
}
network, err := NewNetwork(logging.NoLog{}, sender, prometheus.NewRegistry(), "")
require.NoError(err)
client := network.NewClient(handlerID)

wantResponse := []byte("response")
wantNodeID := ids.GenerateTestNodeID()
done := make(chan struct{})

callback := func(_ context.Context, gotNodeID ids.NodeID, gotResponse []byte, err error) {
require.Equal(wantNodeID, gotNodeID)
require.NoError(err)
require.Equal(wantResponse, gotResponse)

close(done)
}

cancelledCtx, cancel := context.WithCancel(ctx)
cancel()

want := []byte("request")
require.NoError(client.AppRequest(cancelledCtx, set.Of(wantNodeID), want, callback))
got := <-sentMessages
require.Equal(handlerPrefix, got[0])
require.Equal(want, got[1:])

require.NoError(network.AppResponse(ctx, wantNodeID, 1, wantResponse))
<-done
}

// Tests that the Client callback is given an error if the request fails
func TestAppRequestFailed(t *testing.T) {
require := require.New(t)
Expand Down Expand Up @@ -241,6 +283,44 @@ func TestCrossChainAppRequestResponse(t *testing.T) {
<-done
}

// Tests that the Client does not provide a cancelled context to the AppSender.
func TestCrossChainAppRequestCancelledContext(t *testing.T) {
require := require.New(t)
ctx := context.Background()

sentMessages := make(chan []byte, 1)
sender := &common.SenderTest{
SendCrossChainAppRequestF: func(ctx context.Context, _ ids.ID, _ uint32, msgBytes []byte) {
require.NoError(ctx.Err())
sentMessages <- msgBytes
},
}
network, err := NewNetwork(logging.NoLog{}, sender, prometheus.NewRegistry(), "")
require.NoError(err)
client := network.NewClient(handlerID)

cancelledCtx, cancel := context.WithCancel(ctx)
cancel()

wantChainID := ids.GenerateTestID()
wantResponse := []byte("response")
done := make(chan struct{})

callback := func(_ context.Context, gotChainID ids.ID, gotResponse []byte, err error) {
require.Equal(wantChainID, gotChainID)
require.NoError(err)
require.Equal(wantResponse, gotResponse)

close(done)
}

require.NoError(client.CrossChainAppRequest(cancelledCtx, wantChainID, []byte("request"), callback))
<-sentMessages

require.NoError(network.CrossChainAppResponse(ctx, wantChainID, 1, wantResponse))
<-done
}

// Tests that the Client callback is given an error if the request fails
func TestCrossChainAppRequestFailed(t *testing.T) {
require := require.New(t)
Expand Down
23 changes: 16 additions & 7 deletions snow/engine/common/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,11 +167,16 @@ type QuerySender interface {
// NetworkAppSender sends VM-level messages to nodes in the network.
type NetworkAppSender interface {
// Send an application-level request.
// A nil return value guarantees that for each nodeID in [nodeIDs],
// the VM corresponding to this AppSender eventually receives either:
//
// The VM corresponding to this AppSender may receive either:
// * An AppResponse from nodeID with ID [requestID]
// * An AppRequestFailed from nodeID with ID [requestID]
// Exactly one of the above messages will eventually be received per nodeID.
//
// A nil return value guarantees that the VM corresponding to this AppSender
// will receive exactly one of the above messages.
//
// A non-nil return value guarantees that the VM corresponding to this
// AppSender will receive at most one of the above messages.
SendAppRequest(ctx context.Context, nodeIDs set.Set[ids.NodeID], requestID uint32, appRequestBytes []byte) error
// Send an application-level response to a request.
// This response must be in response to an AppRequest that the VM corresponding
Expand All @@ -192,12 +197,16 @@ type CrossChainAppSender interface {
// SendCrossChainAppRequest sends an application-level request to a
// specific chain.
//
// A nil return value guarantees that the VM corresponding to this
// CrossChainAppSender eventually receives either:
// The VM corresponding to this CrossChainAppSender may receive either:
// * A CrossChainAppResponse from [chainID] with ID [requestID]
// * A CrossChainAppRequestFailed from [chainID] with ID [requestID]
// Exactly one of the above messages will eventually be received from
// [chainID].
//
// A nil return value guarantees that the VM corresponding to this
// CrossChainAppSender will eventually receive exactly one of the above
// messages.
//
// A non-nil return value guarantees that the VM corresponding to this
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good comment 🙏

// CrossChainAppSender will receive at most one of the above messages.
SendCrossChainAppRequest(ctx context.Context, chainID ids.ID, requestID uint32, appRequestBytes []byte) error
// SendCrossChainAppResponse sends an application-level response to a
// specific chain
Expand Down
9 changes: 8 additions & 1 deletion x/sync/network_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,14 @@ func (c *networkClient) sendRequestLocked(

// Send an app request to the peer.
nodeIDs := set.Of(nodeID)
if err := c.appSender.SendAppRequest(ctx, nodeIDs, requestID, request); err != nil {
// Cancellation is removed from this context to avoid erroring unexpectedly.
// SendAppRequest should be non-blocking and any error other than context
// cancellation is unexpected.
//
// This guarantees that the network should never receive an unexpected
// AppResponse.
ctxWithoutCancel := context.WithoutCancel(ctx)
if err := c.appSender.SendAppRequest(ctxWithoutCancel, nodeIDs, requestID, request); err != nil {
c.lock.Unlock()
c.log.Fatal("failed to send app request",
zap.Stringer("nodeID", nodeID),
Expand Down
Loading