Skip to content

Commit

Permalink
Fail outstanding app requests on shutdown (#618)
Browse files Browse the repository at this point in the history
Co-authored-by: aaronbuchwald <aaron.buchwald56@gmail.com>
  • Loading branch information
darioush and aaronbuchwald authored Apr 20, 2023
1 parent c156ccb commit e6bf2a0
Show file tree
Hide file tree
Showing 3 changed files with 175 additions and 3 deletions.
60 changes: 57 additions & 3 deletions peer/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/snow/engine/common"
"github.com/ava-labs/avalanchego/snow/validators"
"github.com/ava-labs/avalanchego/utils"
"github.com/ava-labs/avalanchego/utils/set"
"github.com/ava-labs/avalanchego/version"

Expand Down Expand Up @@ -95,6 +96,10 @@ type network struct {
peers *peerTracker // tracking of peers & bandwidth
appStats stats.RequestHandlerStats // Provide request handler metrics
crossChainStats stats.RequestHandlerStats // Provide cross chain request handler metrics

// Set to true when Shutdown is called, after which all operations on this
// struct are no-ops.
closed utils.Atomic[bool]
}

func NewNetwork(appSender common.AppSender, codec codec.Manager, crossChainCodec codec.Manager, self ids.NodeID, maxActiveAppRequests int64, maxActiveCrossChainRequests int64) Network {
Expand Down Expand Up @@ -160,6 +165,10 @@ func (n *network) SendAppRequest(nodeID ids.NodeID, request []byte, responseHand
// Returns an error if [appSender] is unable to make the request.
// Assumes write lock is held
func (n *network) sendAppRequest(nodeID ids.NodeID, request []byte, responseHandler message.ResponseHandler) error {
if n.closed.Get() {
return nil
}

log.Debug("sending request to peer", "nodeID", nodeID, "requestLen", len(request))
n.peers.TrackPeer(nodeID)

Expand Down Expand Up @@ -196,6 +205,10 @@ func (n *network) SendCrossChainRequest(chainID ids.ID, request []byte, handler
n.lock.Lock()
defer n.lock.Unlock()

if n.closed.Get() {
return nil
}

// generate requestID
requestID := n.requestIDGen
n.requestIDGen++
Expand All @@ -218,6 +231,10 @@ func (n *network) SendCrossChainRequest(chainID ids.ID, request []byte, handler
// Send a CrossChainAppResponse to [chainID] in response to a valid message using the same
// [requestID] before the deadline.
func (n *network) CrossChainAppRequest(ctx context.Context, requestingChainID ids.ID, requestID uint32, deadline time.Time, request []byte) error {
if n.closed.Get() {
return nil
}

log.Debug("received CrossChainAppRequest from chain", "requestingChainID", requestingChainID, "requestID", requestID, "requestLen", len(request))

var req message.CrossChainRequest
Expand Down Expand Up @@ -258,6 +275,10 @@ func (n *network) CrossChainAppRequestFailed(ctx context.Context, respondingChai
n.lock.Lock()
defer n.lock.Unlock()

if n.closed.Get() {
return nil
}

log.Debug("received CrossChainAppRequestFailed from chain", "respondingChainID", respondingChainID, "requestID", requestID)

handler, exists := n.markRequestFulfilled(requestID)
Expand All @@ -281,6 +302,10 @@ func (n *network) CrossChainAppResponse(ctx context.Context, respondingChainID i
n.lock.Lock()
defer n.lock.Unlock()

if n.closed.Get() {
return nil
}

log.Debug("received CrossChainAppResponse from responding chain", "respondingChainID", respondingChainID, "requestID", requestID)

handler, exists := n.markRequestFulfilled(requestID)
Expand All @@ -302,6 +327,10 @@ func (n *network) CrossChainAppResponse(ctx context.Context, respondingChainID i
// sends a response back to the sender if length of response returned by the handler is >0
// expects the deadline to not have been passed
func (n *network) AppRequest(ctx context.Context, nodeID ids.NodeID, requestID uint32, deadline time.Time, request []byte) error {
if n.closed.Get() {
return nil
}

log.Debug("received AppRequest from node", "nodeID", nodeID, "requestID", requestID, "requestLen", len(request))

var req message.Request
Expand Down Expand Up @@ -341,6 +370,10 @@ func (n *network) AppResponse(_ context.Context, nodeID ids.NodeID, requestID ui
n.lock.Lock()
defer n.lock.Unlock()

if n.closed.Get() {
return nil
}

log.Debug("received AppResponse from peer", "nodeID", nodeID, "requestID", requestID)

handler, exists := n.markRequestFulfilled(requestID)
Expand All @@ -366,6 +399,10 @@ func (n *network) AppRequestFailed(_ context.Context, nodeID ids.NodeID, request
n.lock.Lock()
defer n.lock.Unlock()

if n.closed.Get() {
return nil
}

log.Debug("received AppRequestFailed from peer", "nodeID", nodeID, "requestID", requestID)

handler, exists := n.markRequestFulfilled(requestID)
Expand Down Expand Up @@ -419,13 +456,21 @@ func (n *network) markRequestFulfilled(requestID uint32) (message.ResponseHandle

// Gossip sends given gossip message to peers
func (n *network) Gossip(gossip []byte) error {
if n.closed.Get() {
return nil
}

return n.appSender.SendAppGossip(context.TODO(), gossip)
}

// AppGossip is called by avalanchego -> VM when there is an incoming AppGossip from a peer
// error returned by this function is expected to be treated as fatal by the engine
// returns error if request could not be parsed as message.Request or when the requestHandler returns an error
func (n *network) AppGossip(_ context.Context, nodeID ids.NodeID, gossipBytes []byte) error {
if n.closed.Get() {
return nil
}

var gossipMsg message.GossipMessage
if _, err := n.codec.Unmarshal(gossipBytes, &gossipMsg); err != nil {
log.Debug("could not parse app gossip", "nodeID", nodeID, "gossipLen", len(gossipBytes), "err", err)
Expand All @@ -443,6 +488,10 @@ func (n *network) Connected(_ context.Context, nodeID ids.NodeID, nodeVersion *v
n.lock.Lock()
defer n.lock.Unlock()

if n.closed.Get() {
return nil
}

if nodeID == n.self {
log.Debug("skipping registering self as peer")
return nil
Expand All @@ -458,6 +507,10 @@ func (n *network) Disconnected(_ context.Context, nodeID ids.NodeID) error {
n.lock.Lock()
defer n.lock.Unlock()

if n.closed.Get() {
return nil
}

n.peers.Disconnected(nodeID)
return nil
}
Expand All @@ -468,12 +521,13 @@ func (n *network) Shutdown() {
defer n.lock.Unlock()

// clean up any pending requests
for requestID := range n.outstandingRequestHandlers {
for requestID, handler := range n.outstandingRequestHandlers {
_ = handler.OnFailure() // make sure all waiting threads are unblocked
delete(n.outstandingRequestHandlers, requestID)
}

// reset peers
n.peers = NewPeerTracker()
n.peers = NewPeerTracker() // reset peers
n.closed.Set(true) // mark network as closed
}

func (n *network) SetGossipHandler(handler message.GossipHandler) {
Expand Down
84 changes: 84 additions & 0 deletions peer/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/version"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

ethcommon "github.com/ethereum/go-ethereum/common"
)
Expand Down Expand Up @@ -222,6 +223,48 @@ func TestRequestRequestsRoutingAndResponse(t *testing.T) {
assert.Contains(t, err.Error(), "cannot send request to empty nodeID")
}

func TestAppRequestOnShutdown(t *testing.T) {
var (
net Network
wg sync.WaitGroup
called bool
)
sender := testAppSender{
sendAppRequestFn: func(nodes set.Set[ids.NodeID], requestID uint32, requestBytes []byte) error {
wg.Add(1)
go func() {
called = true
// shutdown the network here to ensure any outstanding requests are handled as failed
net.Shutdown()
wg.Done()
}() // this is on a goroutine to avoid a deadlock since calling Shutdown takes the lock.
return nil
},
}

codecManager := buildCodec(t, HelloRequest{}, HelloResponse{})
crossChainCodecManager := buildCodec(t, ExampleCrossChainRequest{}, ExampleCrossChainResponse{})
net = NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
client := NewNetworkClient(net)
nodeID := ids.GenerateTestNodeID()
require.NoError(t, net.Connected(context.Background(), nodeID, defaultPeerVersion))

requestMessage := HelloRequest{Message: "this is a request"}
require.NoError(t, net.Connected(context.Background(), nodeID, defaultPeerVersion))

wg.Add(1)
go func() {
defer wg.Done()
requestBytes, err := message.RequestToBytes(codecManager, requestMessage)
require.NoError(t, err)
responseBytes, _, err := client.SendAppRequestAny(defaultPeerVersion, requestBytes)
require.Error(t, err, ErrRequestFailed)
require.Nil(t, responseBytes)
}()
wg.Wait()
require.True(t, called)
}

func TestRequestMinVersion(t *testing.T) {
callNum := uint32(0)
nodeID := ids.GenerateTestNodeID()
Expand Down Expand Up @@ -565,6 +608,47 @@ func TestCrossChainRequestRequestsRoutingAndResponse(t *testing.T) {
assert.Equal(t, totalCalls, int(atomic.LoadUint32(&callNum)))
}

func TestCrossChainRequestOnShutdown(t *testing.T) {
var (
net Network
wg sync.WaitGroup
called bool
)
sender := testAppSender{
sendCrossChainAppRequestFn: func(requestingChainID ids.ID, requestID uint32, requestBytes []byte) error {
wg.Add(1)
go func() {
called = true
// shutdown the network here to ensure any outstanding requests are handled as failed
net.Shutdown()
wg.Done()
}() // this is on a goroutine to avoid a deadlock since calling Shutdown takes the lock.
return nil
},
}
codecManager := buildCodec(t, TestMessage{})
crossChainCodecManager := buildCodec(t, ExampleCrossChainRequest{}, ExampleCrossChainResponse{})
net = NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
client := NewNetworkClient(net)

exampleCrossChainRequest := ExampleCrossChainRequest{
Message: "hello this is an example request",
}
chainID := ids.ID(ethcommon.BytesToHash([]byte{1, 2, 3, 4, 5}))

wg.Add(1)
go func() {
defer wg.Done()
crossChainRequest, err := buildCrossChainRequest(crossChainCodecManager, exampleCrossChainRequest)
require.NoError(t, err)
responseBytes, err := client.SendCrossChainRequest(chainID, crossChainRequest)
require.ErrorIs(t, err, ErrRequestFailed)
require.Nil(t, responseBytes)
}()
wg.Wait()
require.True(t, called)
}

func buildCodec(t *testing.T, types ...interface{}) codec.Manager {
codecManager := codec.NewDefaultManager()
c := linearcodec.NewDefault()
Expand Down
34 changes: 34 additions & 0 deletions plugin/evm/syncervm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,40 @@ func TestStateSyncToggleEnabledToDisabled(t *testing.T) {
testSyncerVM(t, vmSetup, test)
}

func TestVMShutdownWhileSyncing(t *testing.T) {
var (
lock sync.Mutex
vmSetup *syncVMSetup
)
reqCount := 0
test := syncTest{
syncableInterval: 256,
stateSyncMinBlocks: 50, // must be less than [syncableInterval] to perform sync
syncMode: block.StateSyncStatic,
responseIntercept: func(syncerVM *VM, nodeID ids.NodeID, requestID uint32, response []byte) {
lock.Lock()
defer lock.Unlock()

reqCount++
// Shutdown the VM after 50 requests to interrupt the sync
if reqCount == 50 {
// Note this verifies the VM shutdown does not time out while syncing.
require.NoError(t, vmSetup.syncerVM.Shutdown(context.Background()))
} else if reqCount < 50 {
syncerVM.AppResponse(context.Background(), nodeID, requestID, response)
}
},
expectedErr: context.Canceled,
}
vmSetup = createSyncServerAndClientVMs(t, test)
defer func() {
require.NoError(t, vmSetup.serverVM.Shutdown(context.Background()))
}()

// Perform sync resulting in early termination.
testSyncerVM(t, vmSetup, test)
}

func createSyncServerAndClientVMs(t *testing.T, test syncTest) *syncVMSetup {
var (
serverVM, syncerVM *VM
Expand Down

0 comments on commit e6bf2a0

Please sign in to comment.