Skip to content

Commit

Permalink
Revert "Revert "Use p2p.Network (#384)""
Browse files Browse the repository at this point in the history
This reverts commit 30d99fe.
  • Loading branch information
darioush committed Dec 1, 2023
1 parent 30d99fe commit ad85636
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 70 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.20

require (
github.com/VictoriaMetrics/fastcache v1.10.0
github.com/ava-labs/avalanchego v1.10.15-0.20231031223857-4957ccb4ee4f
github.com/ava-labs/avalanchego v1.10.17-rc.9
github.com/cespare/cp v0.1.0
github.com/cockroachdb/pebble v0.0.0-20230209160836-829675f94811
github.com/davecgh/go-spew v1.1.1
Expand Down Expand Up @@ -33,7 +33,7 @@ require (
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.12.0
github.com/status-im/keycard-go v0.2.0
github.com/stretchr/testify v1.8.1
github.com/stretchr/testify v1.8.4
github.com/syndtr/goleveldb v1.0.1-0.20220614013038-64ee5596c38a
github.com/tyler-smith/go-bip39 v1.1.0
github.com/urfave/cli/v2 v2.17.2-0.20221006022127-8f469abc00aa
Expand Down
9 changes: 4 additions & 5 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 h1:eMwmnE/GDgah
github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/ava-labs/avalanchego v1.10.15-0.20231031223857-4957ccb4ee4f h1:9KiBELZpw+rYdWpaxt3ZputI7zooHcYFg13n7hsWIlI=
github.com/ava-labs/avalanchego v1.10.15-0.20231031223857-4957ccb4ee4f/go.mod h1:XUgFacPFIXauOQlabcWsuRMA47/NSGwPqyOGXE0S/Jc=
github.com/ava-labs/avalanchego v1.10.17-rc.9 h1:qMDE56M33lO8nkXMPmycN8uIT1BL42giR46f8j7kbhg=
github.com/ava-labs/avalanchego v1.10.17-rc.9/go.mod h1:GW0zr0OJfy9Vbki1ChOm47jv0gK+2tnBXP2NpzAiVzE=
github.com/aymerick/raymond v2.0.3-0.20180322193309-b565731e1464+incompatible/go.mod h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
Expand Down Expand Up @@ -548,7 +548,6 @@ github.com/status-im/keycard-go v0.2.0 h1:QDLFswOQu1r5jsycloeQh3bVU8n/NatHHaZobt
github.com/status-im/keycard-go v0.2.0/go.mod h1:wlp8ZLbsmrF6g6WjugPAx+IzoLrkdf9+mHxBEeo3Hbg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
Expand All @@ -557,8 +556,8 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/subosito/gotenv v1.3.0 h1:mjC+YW8QpAdXibNi+vNWgzmgBH4+5l5dCXv8cNysBLI=
github.com/subosito/gotenv v1.3.0/go.mod h1:YzJjq/33h7nrwdY+iHMhEOEEbW0ovIz0tB6t6PwAXzs=
github.com/supranational/blst v0.3.11 h1:LyU6FolezeWAhvQk0k6O/d49jqgO52MSDDfYgbeoEm4=
Expand Down
52 changes: 30 additions & 22 deletions peer/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ type Network interface {
// TrackBandwidth should be called for each valid request with the bandwidth
// (length of response divided by request time), and with 0 if the response is invalid.
TrackBandwidth(nodeID ids.NodeID, bandwidth float64)

// NewAppProtocol reserves a protocol identifier and returns a corresponding
// client to send messages with
NewAppProtocol(protocol uint64, handler p2p.Handler, options ...p2p.ClientOption) (*p2p.Client, error)
}

// network is an implementation of Network that processes message requests for
Expand All @@ -88,16 +92,16 @@ type network struct {
outstandingRequestHandlers map[uint32]message.ResponseHandler // maps avalanchego requestID => message.ResponseHandler
activeAppRequests *semaphore.Weighted // controls maximum number of active outbound requests
activeCrossChainRequests *semaphore.Weighted // controls maximum number of active outbound cross chain requests
router *p2p.Router // handles messages being sent to the generic networking SDK
appSender common.AppSender // avalanchego AppSender for sending messages
codec codec.Manager // Codec used for parsing messages
crossChainCodec codec.Manager // Codec used for parsing cross chain messages
appRequestHandler message.RequestHandler // maps request type => handler
crossChainRequestHandler message.CrossChainRequestHandler // maps cross chain request type => handler
gossipHandler message.GossipHandler // maps gossip type => handler
peers *peerTracker // tracking of peers & bandwidth
appStats stats.RequestHandlerStats // Provide request handler metrics
crossChainStats stats.RequestHandlerStats // Provide cross chain request handler metrics
network *p2p.Network
appSender common.AppSender // avalanchego AppSender for sending messages
codec codec.Manager // Codec used for parsing messages
crossChainCodec codec.Manager // Codec used for parsing cross chain messages
appRequestHandler message.RequestHandler // maps request type => handler
crossChainRequestHandler message.CrossChainRequestHandler // maps cross chain request type => handler
gossipHandler message.GossipHandler // maps gossip type => handler
peers *peerTracker // tracking of peers & bandwidth
appStats stats.RequestHandlerStats // Provide request handler metrics
crossChainStats stats.RequestHandlerStats // Provide cross chain request handler metrics

// Set to true when Shutdown is called, after which all operations on this
// struct are no-ops.
Expand All @@ -110,16 +114,16 @@ type network struct {
closed utils.Atomic[bool]
}

func NewNetwork(router *p2p.Router, appSender common.AppSender, codec codec.Manager, crossChainCodec codec.Manager, self ids.NodeID, maxActiveAppRequests int64, maxActiveCrossChainRequests int64) Network {
func NewNetwork(p2pNetwork *p2p.Network, appSender common.AppSender, codec codec.Manager, crossChainCodec codec.Manager, self ids.NodeID, maxActiveAppRequests int64, maxActiveCrossChainRequests int64) Network {
return &network{
router: router,
appSender: appSender,
codec: codec,
crossChainCodec: crossChainCodec,
self: self,
outstandingRequestHandlers: make(map[uint32]message.ResponseHandler),
activeAppRequests: semaphore.NewWeighted(maxActiveAppRequests),
activeCrossChainRequests: semaphore.NewWeighted(maxActiveCrossChainRequests),
network: p2pNetwork,
gossipHandler: message.NoopMempoolGossipHandler{},
appRequestHandler: message.NoopRequestHandler{},
crossChainRequestHandler: message.NoopCrossChainRequestHandler{},
Expand Down Expand Up @@ -326,8 +330,8 @@ func (n *network) AppRequest(ctx context.Context, nodeID ids.NodeID, requestID u

var req message.Request
if _, err := n.codec.Unmarshal(request, &req); err != nil {
log.Debug("forwarding AppRequest to SDK router", "nodeID", nodeID, "requestID", requestID, "requestLen", len(request), "err", err)
return n.router.AppRequest(ctx, nodeID, requestID, deadline, request)
log.Debug("forwarding AppRequest to SDK network", "nodeID", nodeID, "requestID", requestID, "requestLen", len(request), "err", err)
return n.network.AppRequest(ctx, nodeID, requestID, deadline, request)
}

bufferedDeadline, err := calculateTimeUntilDeadline(deadline, n.appStats)
Expand Down Expand Up @@ -362,8 +366,8 @@ func (n *network) AppResponse(ctx context.Context, nodeID ids.NodeID, requestID

handler, exists := n.markRequestFulfilled(requestID)
if !exists {
log.Debug("forwarding AppResponse to SDK router", "nodeID", nodeID, "requestID", requestID, "responseLen", len(response))
return n.router.AppResponse(ctx, nodeID, requestID, response)
log.Debug("forwarding AppResponse to SDK network", "nodeID", nodeID, "requestID", requestID, "responseLen", len(response))
return n.network.AppResponse(ctx, nodeID, requestID, response)
}

// We must release the slot
Expand All @@ -383,8 +387,8 @@ func (n *network) AppRequestFailed(ctx context.Context, nodeID ids.NodeID, reque

handler, exists := n.markRequestFulfilled(requestID)
if !exists {
log.Debug("forwarding AppRequestFailed to SDK router", "nodeID", nodeID, "requestID", requestID)
return n.router.AppRequestFailed(ctx, nodeID, requestID)
log.Debug("forwarding AppRequestFailed to SDK network", "nodeID", nodeID, "requestID", requestID)
return n.network.AppRequestFailed(ctx, nodeID, requestID)
}

// We must release the slot
Expand Down Expand Up @@ -456,7 +460,7 @@ func (n *network) AppGossip(_ context.Context, nodeID ids.NodeID, gossipBytes []
}

// Connected adds the given nodeID to the peer list so that it can receive messages
func (n *network) Connected(_ context.Context, nodeID ids.NodeID, nodeVersion *version.Application) error {
func (n *network) Connected(ctx context.Context, nodeID ids.NodeID, nodeVersion *version.Application) error {
log.Debug("adding new peer", "nodeID", nodeID)

n.lock.Lock()
Expand All @@ -472,11 +476,11 @@ func (n *network) Connected(_ context.Context, nodeID ids.NodeID, nodeVersion *v
}

n.peers.Connected(nodeID, nodeVersion)
return nil
return n.network.Connected(ctx, nodeID, nodeVersion)
}

// Disconnected removes given [nodeID] from the peer list
func (n *network) Disconnected(_ context.Context, nodeID ids.NodeID) error {
func (n *network) Disconnected(ctx context.Context, nodeID ids.NodeID) error {
log.Debug("disconnecting peer", "nodeID", nodeID)
n.lock.Lock()
defer n.lock.Unlock()
Expand All @@ -486,7 +490,7 @@ func (n *network) Disconnected(_ context.Context, nodeID ids.NodeID) error {
}

n.peers.Disconnected(nodeID)
return nil
return n.network.Disconnected(ctx, nodeID)
}

// Shutdown disconnects all peers
Expand Down Expand Up @@ -539,6 +543,10 @@ func (n *network) TrackBandwidth(nodeID ids.NodeID, bandwidth float64) {
n.peers.TrackBandwidth(nodeID, bandwidth)
}

func (n *network) NewAppProtocol(protocol uint64, handler p2p.Handler, options ...p2p.ClientOption) (*p2p.Client, error) {
return n.network.NewAppProtocol(protocol, handler, options...)
}

// invariant: peer/network must use explicitly even request ids.
// for this reason, [n.requestID] is initialized as zero and incremented by 2.
// This is for backwards-compatibility while the SDK router exists with the
Expand Down
Loading

0 comments on commit ad85636

Please sign in to comment.