Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use p2p.Network #384

Merged
merged 8 commits into from
Nov 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.20

require (
github.com/VictoriaMetrics/fastcache v1.10.0
github.com/ava-labs/avalanchego v1.10.16-rc.0
github.com/ava-labs/avalanchego v1.10.17-rc.9
github.com/cespare/cp v0.1.0
github.com/cockroachdb/pebble v0.0.0-20230209160836-829675f94811
github.com/davecgh/go-spew v1.1.1
Expand Down Expand Up @@ -33,12 +33,11 @@ require (
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.12.0
github.com/status-im/keycard-go v0.2.0
github.com/stretchr/testify v1.8.1
github.com/stretchr/testify v1.8.4
github.com/syndtr/goleveldb v1.0.1-0.20220614013038-64ee5596c38a
github.com/tyler-smith/go-bip39 v1.1.0
github.com/urfave/cli/v2 v2.17.2-0.20221006022127-8f469abc00aa
go.uber.org/goleak v1.2.1
go.uber.org/mock v0.2.0
golang.org/x/crypto v0.14.0
golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df
golang.org/x/sync v0.3.0
Expand Down Expand Up @@ -128,9 +127,9 @@ require (
go.opentelemetry.io/otel/sdk v1.11.0 // indirect
go.opentelemetry.io/otel/trace v1.11.0 // indirect
go.opentelemetry.io/proto/otlp v0.19.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
go.uber.org/zap v1.24.0 // indirect
go.uber.org/mock v0.2.0 // indirect
go.uber.org/multierr v1.10.0 // indirect
go.uber.org/zap v1.26.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/term v0.13.0 // indirect
gonum.org/v1/gonum v0.11.0 // indirect
Expand Down
23 changes: 10 additions & 13 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,11 @@ github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 h1:eMwmnE/GDgah
github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/ava-labs/avalanchego v1.10.16-rc.0 h1:I2k3hKjSr7PH5KKNxFeYiZ1PfyMlYvPXJZLnYU8uGsQ=
github.com/ava-labs/avalanchego v1.10.16-rc.0/go.mod h1:fHTzxKZOMdM0n4EEXDDR0V3Ieb/Jnz7PM8zAsJRsh2U=
github.com/ava-labs/avalanchego v1.10.17-rc.0.0.20231129204442-3a8cca5f31b9 h1:IDbRSp4QU5gcsC31uxOMc1089nTDhZpr6c+z7pHVNks=
github.com/ava-labs/avalanchego v1.10.17-rc.0.0.20231129204442-3a8cca5f31b9/go.mod h1:JuMiDdOmbTZfZty/RFszDPpKd1KEy9aivjxbcc1TPZo=
github.com/ava-labs/avalanchego v1.10.17-rc.9 h1:qMDE56M33lO8nkXMPmycN8uIT1BL42giR46f8j7kbhg=
github.com/ava-labs/avalanchego v1.10.17-rc.9/go.mod h1:GW0zr0OJfy9Vbki1ChOm47jv0gK+2tnBXP2NpzAiVzE=
github.com/aymerick/raymond v2.0.3-0.20180322193309-b565731e1464+incompatible/go.mod h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g=
github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bits-and-blooms/bitset v1.7.0 h1:YjAGVd3XmtK9ktAbX8Zg2g2PwLIMjGREZJHlV4j7NEo=
Expand Down Expand Up @@ -549,7 +550,6 @@ github.com/status-im/keycard-go v0.2.0 h1:QDLFswOQu1r5jsycloeQh3bVU8n/NatHHaZobt
github.com/status-im/keycard-go v0.2.0/go.mod h1:wlp8ZLbsmrF6g6WjugPAx+IzoLrkdf9+mHxBEeo3Hbg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
Expand All @@ -558,8 +558,8 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/subosito/gotenv v1.3.0 h1:mjC+YW8QpAdXibNi+vNWgzmgBH4+5l5dCXv8cNysBLI=
github.com/subosito/gotenv v1.3.0/go.mod h1:YzJjq/33h7nrwdY+iHMhEOEEbW0ovIz0tB6t6PwAXzs=
github.com/supranational/blst v0.3.11 h1:LyU6FolezeWAhvQk0k6O/d49jqgO52MSDDfYgbeoEm4=
Expand Down Expand Up @@ -627,17 +627,14 @@ go.opentelemetry.io/otel/trace v1.11.0/go.mod h1:nyYjis9jy0gytE9LXGU+/m1sHTKbRY0
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
go.opentelemetry.io/proto/otlp v0.19.0 h1:IVN6GR+mhC4s5yfcTbmzHYODqvWAp3ZedA2SJPI1Nnw=
go.opentelemetry.io/proto/otlp v0.19.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI405h3+duxN4U=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ=
go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A=
go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4=
go.uber.org/mock v0.2.0 h1:TaP3xedm7JaAgScZO7tlvlKrqT0p7I6OsdGB5YNSMDU=
go.uber.org/mock v0.2.0/go.mod h1:J0y0rp9L3xiff1+ZBfKxlC1fz2+aO16tw0tsDOixfuM=
go.uber.org/multierr v1.8.0 h1:dg6GjLku4EH+249NNmoIciG9N/jURbDG+pFlTkhzIC8=
go.uber.org/multierr v1.8.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak=
go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60=
go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg=
go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ=
go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo=
go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so=
golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
Expand Down
52 changes: 30 additions & 22 deletions peer/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ type Network interface {
// TrackBandwidth should be called for each valid request with the bandwidth
// (length of response divided by request time), and with 0 if the response is invalid.
TrackBandwidth(nodeID ids.NodeID, bandwidth float64)

// NewAppProtocol reserves a protocol identifier and returns a corresponding
// client to send messages with
NewAppProtocol(protocol uint64, handler p2p.Handler, options ...p2p.ClientOption) (*p2p.Client, error)
}

// network is an implementation of Network that processes message requests for
Expand All @@ -88,16 +92,16 @@ type network struct {
outstandingRequestHandlers map[uint32]message.ResponseHandler // maps avalanchego requestID => message.ResponseHandler
activeAppRequests *semaphore.Weighted // controls maximum number of active outbound requests
activeCrossChainRequests *semaphore.Weighted // controls maximum number of active outbound cross chain requests
router *p2p.Router // handles messages being sent to the generic networking SDK
appSender common.AppSender // avalanchego AppSender for sending messages
codec codec.Manager // Codec used for parsing messages
crossChainCodec codec.Manager // Codec used for parsing cross chain messages
appRequestHandler message.RequestHandler // maps request type => handler
crossChainRequestHandler message.CrossChainRequestHandler // maps cross chain request type => handler
gossipHandler message.GossipHandler // maps gossip type => handler
peers *peerTracker // tracking of peers & bandwidth
appStats stats.RequestHandlerStats // Provide request handler metrics
crossChainStats stats.RequestHandlerStats // Provide cross chain request handler metrics
network *p2p.Network
appSender common.AppSender // avalanchego AppSender for sending messages
codec codec.Manager // Codec used for parsing messages
crossChainCodec codec.Manager // Codec used for parsing cross chain messages
appRequestHandler message.RequestHandler // maps request type => handler
crossChainRequestHandler message.CrossChainRequestHandler // maps cross chain request type => handler
gossipHandler message.GossipHandler // maps gossip type => handler
peers *peerTracker // tracking of peers & bandwidth
appStats stats.RequestHandlerStats // Provide request handler metrics
crossChainStats stats.RequestHandlerStats // Provide cross chain request handler metrics

// Set to true when Shutdown is called, after which all operations on this
// struct are no-ops.
Expand All @@ -110,16 +114,16 @@ type network struct {
closed utils.Atomic[bool]
}

func NewNetwork(router *p2p.Router, appSender common.AppSender, codec codec.Manager, crossChainCodec codec.Manager, self ids.NodeID, maxActiveAppRequests int64, maxActiveCrossChainRequests int64) Network {
func NewNetwork(p2pNetwork *p2p.Network, appSender common.AppSender, codec codec.Manager, crossChainCodec codec.Manager, self ids.NodeID, maxActiveAppRequests int64, maxActiveCrossChainRequests int64) Network {
return &network{
router: router,
appSender: appSender,
codec: codec,
crossChainCodec: crossChainCodec,
self: self,
outstandingRequestHandlers: make(map[uint32]message.ResponseHandler),
activeAppRequests: semaphore.NewWeighted(maxActiveAppRequests),
activeCrossChainRequests: semaphore.NewWeighted(maxActiveCrossChainRequests),
network: p2pNetwork,
gossipHandler: message.NoopMempoolGossipHandler{},
appRequestHandler: message.NoopRequestHandler{},
crossChainRequestHandler: message.NoopCrossChainRequestHandler{},
Expand Down Expand Up @@ -326,8 +330,8 @@ func (n *network) AppRequest(ctx context.Context, nodeID ids.NodeID, requestID u

var req message.Request
if _, err := n.codec.Unmarshal(request, &req); err != nil {
log.Debug("forwarding AppRequest to SDK router", "nodeID", nodeID, "requestID", requestID, "requestLen", len(request), "err", err)
return n.router.AppRequest(ctx, nodeID, requestID, deadline, request)
log.Debug("forwarding AppRequest to SDK network", "nodeID", nodeID, "requestID", requestID, "requestLen", len(request), "err", err)
return n.network.AppRequest(ctx, nodeID, requestID, deadline, request)
}

bufferedDeadline, err := calculateTimeUntilDeadline(deadline, n.appStats)
Expand Down Expand Up @@ -362,8 +366,8 @@ func (n *network) AppResponse(ctx context.Context, nodeID ids.NodeID, requestID

handler, exists := n.markRequestFulfilled(requestID)
if !exists {
log.Debug("forwarding AppResponse to SDK router", "nodeID", nodeID, "requestID", requestID, "responseLen", len(response))
return n.router.AppResponse(ctx, nodeID, requestID, response)
log.Debug("forwarding AppResponse to SDK network", "nodeID", nodeID, "requestID", requestID, "responseLen", len(response))
return n.network.AppResponse(ctx, nodeID, requestID, response)
}

// We must release the slot
Expand All @@ -383,8 +387,8 @@ func (n *network) AppRequestFailed(ctx context.Context, nodeID ids.NodeID, reque

handler, exists := n.markRequestFulfilled(requestID)
if !exists {
log.Debug("forwarding AppRequestFailed to SDK router", "nodeID", nodeID, "requestID", requestID)
return n.router.AppRequestFailed(ctx, nodeID, requestID)
log.Debug("forwarding AppRequestFailed to SDK network", "nodeID", nodeID, "requestID", requestID)
return n.network.AppRequestFailed(ctx, nodeID, requestID)
}

// We must release the slot
Expand Down Expand Up @@ -456,7 +460,7 @@ func (n *network) AppGossip(_ context.Context, nodeID ids.NodeID, gossipBytes []
}

// Connected adds the given nodeID to the peer list so that it can receive messages
func (n *network) Connected(_ context.Context, nodeID ids.NodeID, nodeVersion *version.Application) error {
func (n *network) Connected(ctx context.Context, nodeID ids.NodeID, nodeVersion *version.Application) error {
log.Debug("adding new peer", "nodeID", nodeID)

n.lock.Lock()
Expand All @@ -472,11 +476,11 @@ func (n *network) Connected(_ context.Context, nodeID ids.NodeID, nodeVersion *v
}

n.peers.Connected(nodeID, nodeVersion)
return nil
return n.network.Connected(ctx, nodeID, nodeVersion)
}

// Disconnected removes given [nodeID] from the peer list
func (n *network) Disconnected(_ context.Context, nodeID ids.NodeID) error {
func (n *network) Disconnected(ctx context.Context, nodeID ids.NodeID) error {
log.Debug("disconnecting peer", "nodeID", nodeID)
n.lock.Lock()
defer n.lock.Unlock()
Expand All @@ -486,7 +490,7 @@ func (n *network) Disconnected(_ context.Context, nodeID ids.NodeID) error {
}

n.peers.Disconnected(nodeID)
return nil
return n.network.Disconnected(ctx, nodeID)
}

// Shutdown disconnects all peers
Expand Down Expand Up @@ -539,6 +543,10 @@ func (n *network) TrackBandwidth(nodeID ids.NodeID, bandwidth float64) {
n.peers.TrackBandwidth(nodeID, bandwidth)
}

func (n *network) NewAppProtocol(protocol uint64, handler p2p.Handler, options ...p2p.ClientOption) (*p2p.Client, error) {
return n.network.NewAppProtocol(protocol, handler, options...)
}

// invariant: peer/network must use explicitly even request ids.
// for this reason, [n.requestID] is initialized as zero and incremented by 2.
// This is for backwards-compatibility while the SDK router exists with the
Expand Down
Loading
Loading