Skip to content

Commit

Permalink
add context to network handlers
Browse files Browse the repository at this point in the history
  • Loading branch information
ceyonur committed Nov 8, 2023
1 parent bf20517 commit 8e23ec4
Show file tree
Hide file tree
Showing 6 changed files with 222 additions and 62 deletions.
57 changes: 36 additions & 21 deletions peer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package peer

import (
"context"
"errors"

"github.com/ava-labs/avalanchego/ids"
Expand All @@ -23,15 +24,15 @@ type NetworkClient interface {
// node version greater than or equal to minVersion.
// Returns response bytes, the ID of the chosen peer, and ErrRequestFailed if
// the request should be retried.
SendAppRequestAny(minVersion *version.Application, request []byte) ([]byte, ids.NodeID, error)
SendAppRequestAny(ctx context.Context, minVersion *version.Application, request []byte) ([]byte, ids.NodeID, error)

// SendAppRequest synchronously sends request to the selected nodeID
// Returns response bytes, and ErrRequestFailed if the request should be retried.
SendAppRequest(nodeID ids.NodeID, request []byte) ([]byte, error)
SendAppRequest(ctx context.Context, nodeID ids.NodeID, request []byte) ([]byte, error)

// SendCrossChainRequest sends a request to a specific blockchain running on this node.
// Returns response bytes, and ErrRequestFailed if the request failed.
SendCrossChainRequest(chainID ids.ID, request []byte) ([]byte, error)
SendCrossChainRequest(ctx context.Context, chainID ids.ID, request []byte) ([]byte, error)

// Gossip sends given gossip message to peers
Gossip(gossip []byte) error
Expand Down Expand Up @@ -59,45 +60,59 @@ func NewNetworkClient(network Network) NetworkClient {
// node version greater than or equal to minVersion.
// Returns response bytes, the ID of the chosen peer, and ErrRequestFailed if
// the request should be retried.
func (c *client) SendAppRequestAny(minVersion *version.Application, request []byte) ([]byte, ids.NodeID, error) {
func (c *client) SendAppRequestAny(ctx context.Context, minVersion *version.Application, request []byte) ([]byte, ids.NodeID, error) {
waitingHandler := newWaitingResponseHandler()
nodeID, err := c.network.SendAppRequestAny(minVersion, request, waitingHandler)
nodeID, err := c.network.SendAppRequestAny(ctx, minVersion, request, waitingHandler)
if err != nil {
return nil, nodeID, err
}
response := <-waitingHandler.responseChan
if waitingHandler.failed {
return nil, nodeID, ErrRequestFailed

select {
case <-ctx.Done():
return nil, nodeID, ctx.Err()
case response := <-waitingHandler.responseChan:
if waitingHandler.failed {
return nil, nodeID, ErrRequestFailed
}
return response, nodeID, nil
}
return response, nodeID, nil
}

// SendAppRequest synchronously sends request to the specified nodeID
// Returns response bytes and ErrRequestFailed if the request should be retried.
func (c *client) SendAppRequest(nodeID ids.NodeID, request []byte) ([]byte, error) {
func (c *client) SendAppRequest(ctx context.Context, nodeID ids.NodeID, request []byte) ([]byte, error) {
waitingHandler := newWaitingResponseHandler()
if err := c.network.SendAppRequest(nodeID, request, waitingHandler); err != nil {
if err := c.network.SendAppRequest(ctx, nodeID, request, waitingHandler); err != nil {
return nil, err
}
response := <-waitingHandler.responseChan
if waitingHandler.failed {
return nil, ErrRequestFailed

select {
case <-ctx.Done():
return nil, ctx.Err()
case response := <-waitingHandler.responseChan:
if waitingHandler.failed {
return nil, ErrRequestFailed
}
return response, nil
}
return response, nil
}

// SendCrossChainRequest synchronously sends request to the specified chainID
// Returns response bytes and ErrRequestFailed if the request should be retried.
func (c *client) SendCrossChainRequest(chainID ids.ID, request []byte) ([]byte, error) {
func (c *client) SendCrossChainRequest(ctx context.Context, chainID ids.ID, request []byte) ([]byte, error) {
waitingHandler := newWaitingResponseHandler()
if err := c.network.SendCrossChainRequest(chainID, request, waitingHandler); err != nil {
if err := c.network.SendCrossChainRequest(ctx, chainID, request, waitingHandler); err != nil {
return nil, err
}
response := <-waitingHandler.responseChan
if waitingHandler.failed {
return nil, ErrRequestFailed
select {
case <-ctx.Done():
return nil, ctx.Err()
case response := <-waitingHandler.responseChan:
if waitingHandler.failed {
return nil, ErrRequestFailed
}
return response, nil
}
return response, nil
}

func (c *client) Gossip(gossip []byte) error {
Expand Down
28 changes: 14 additions & 14 deletions peer/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,16 @@ type Network interface {
// node version greater than or equal to minVersion.
// Returns the ID of the chosen peer, and an error if the request could not
// be sent to a peer with the desired [minVersion].
SendAppRequestAny(minVersion *version.Application, message []byte, handler message.ResponseHandler) (ids.NodeID, error)
SendAppRequestAny(ctx context.Context, minVersion *version.Application, message []byte, handler message.ResponseHandler) (ids.NodeID, error)

// SendAppRequest sends message to given nodeID, notifying handler when there's a response or timeout
SendAppRequest(nodeID ids.NodeID, message []byte, handler message.ResponseHandler) error
SendAppRequest(ctx context.Context, nodeID ids.NodeID, message []byte, handler message.ResponseHandler) error

// Gossip sends given gossip message to peers
Gossip(gossip []byte) error

// SendCrossChainRequest sends a message to given chainID notifying handler when there's a response or timeout
SendCrossChainRequest(chainID ids.ID, message []byte, handler message.ResponseHandler) error
SendCrossChainRequest(ctx context.Context, chainID ids.ID, message []byte, handler message.ResponseHandler) error

// Shutdown stops all peer channel listeners and marks the node to have stopped
// n.Start() can be called again but the peers will have to be reconnected
Expand Down Expand Up @@ -134,37 +134,37 @@ func NewNetwork(router *p2p.Router, appSender common.AppSender, codec codec.Mana
// the request will be sent to any peer regardless of their version.
// Returns the ID of the chosen peer, and an error if the request could not
// be sent to a peer with the desired [minVersion].
func (n *network) SendAppRequestAny(minVersion *version.Application, request []byte, handler message.ResponseHandler) (ids.NodeID, error) {
func (n *network) SendAppRequestAny(ctx context.Context, minVersion *version.Application, request []byte, handler message.ResponseHandler) (ids.NodeID, error) {
// Take a slot from total [activeAppRequests] and block until a slot becomes available.
if err := n.activeAppRequests.Acquire(context.Background(), 1); err != nil {
if err := n.activeAppRequests.Acquire(ctx, 1); err != nil {
return ids.EmptyNodeID, errAcquiringSemaphore
}

n.lock.Lock()
defer n.lock.Unlock()
if nodeID, ok := n.peers.GetAnyPeer(minVersion); ok {
return nodeID, n.sendAppRequest(nodeID, request, handler)
return nodeID, n.sendAppRequest(ctx, nodeID, request, handler)
}

n.activeAppRequests.Release(1)
return ids.EmptyNodeID, fmt.Errorf("no peers found matching version %s out of %d peers", minVersion, n.peers.Size())
}

// SendAppRequest sends request message bytes to specified nodeID, notifying the responseHandler on response or failure
func (n *network) SendAppRequest(nodeID ids.NodeID, request []byte, responseHandler message.ResponseHandler) error {
func (n *network) SendAppRequest(ctx context.Context, nodeID ids.NodeID, request []byte, responseHandler message.ResponseHandler) error {
if nodeID == ids.EmptyNodeID {
return fmt.Errorf("cannot send request to empty nodeID, nodeID=%s, requestLen=%d", nodeID, len(request))
}

// Take a slot from total [activeAppRequests] and block until a slot becomes available.
if err := n.activeAppRequests.Acquire(context.Background(), 1); err != nil {
if err := n.activeAppRequests.Acquire(ctx, 1); err != nil {
return errAcquiringSemaphore
}

n.lock.Lock()
defer n.lock.Unlock()

return n.sendAppRequest(nodeID, request, responseHandler)
return n.sendAppRequest(ctx, nodeID, request, responseHandler)
}

// sendAppRequest sends request message bytes to specified nodeID and adds [responseHandler] to [outstandingRequestHandlers]
Expand All @@ -173,7 +173,7 @@ func (n *network) SendAppRequest(nodeID ids.NodeID, request []byte, responseHand
// Releases active requests semaphore if there was an error in sending the request
// Returns an error if [appSender] is unable to make the request.
// Assumes write lock is held
func (n *network) sendAppRequest(nodeID ids.NodeID, request []byte, responseHandler message.ResponseHandler) error {
func (n *network) sendAppRequest(ctx context.Context, nodeID ids.NodeID, request []byte, responseHandler message.ResponseHandler) error {
if n.closed.Get() {
n.activeAppRequests.Release(1)
return nil
Expand All @@ -190,7 +190,7 @@ func (n *network) sendAppRequest(nodeID ids.NodeID, request []byte, responseHand

// Send app request to [nodeID].
// On failure, release the slot from [activeAppRequests] and delete request from [outstandingRequestHandlers]
if err := n.appSender.SendAppRequest(context.TODO(), nodeIDs, requestID, request); err != nil {
if err := n.appSender.SendAppRequest(ctx, nodeIDs, requestID, request); err != nil {
n.activeAppRequests.Release(1)
delete(n.outstandingRequestHandlers, requestID)
return err
Expand All @@ -203,9 +203,9 @@ func (n *network) sendAppRequest(nodeID ids.NodeID, request []byte, responseHand
// SendCrossChainRequest sends request message bytes to specified chainID and adds [handler] to [outstandingRequestHandlers]
// so that it can be invoked when the network receives either a response or failure message.
// Returns an error if [appSender] is unable to make the request.
func (n *network) SendCrossChainRequest(chainID ids.ID, request []byte, handler message.ResponseHandler) error {
func (n *network) SendCrossChainRequest(ctx context.Context, chainID ids.ID, request []byte, handler message.ResponseHandler) error {
// Take a slot from total [activeCrossChainRequests] and block until a slot becomes available.
if err := n.activeCrossChainRequests.Acquire(context.Background(), 1); err != nil {
if err := n.activeCrossChainRequests.Acquire(ctx, 1); err != nil {
return errAcquiringSemaphore
}

Expand All @@ -222,7 +222,7 @@ func (n *network) SendCrossChainRequest(chainID ids.ID, request []byte, handler

// Send cross chain request to [chainID].
// On failure, release the slot from [activeCrossChainRequests] and delete request from [outstandingRequestHandlers].
if err := n.appSender.SendCrossChainAppRequest(context.TODO(), chainID, requestID, request); err != nil {
if err := n.appSender.SendCrossChainAppRequest(ctx, chainID, requestID, request); err != nil {
n.activeCrossChainRequests.Release(1)
delete(n.outstandingRequestHandlers, requestID)
return err
Expand Down
Loading

0 comments on commit 8e23ec4

Please sign in to comment.