-
Notifications
You must be signed in to change notification settings - Fork 671
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Combine AppGossip and AppGossipSpecific #2836
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,6 +25,7 @@ import ( | |
"github.com/ava-labs/avalanchego/network/dialer" | ||
"github.com/ava-labs/avalanchego/network/peer" | ||
"github.com/ava-labs/avalanchego/network/throttling" | ||
"github.com/ava-labs/avalanchego/snow/engine/common" | ||
"github.com/ava-labs/avalanchego/snow/networking/router" | ||
"github.com/ava-labs/avalanchego/snow/networking/sender" | ||
"github.com/ava-labs/avalanchego/subnets" | ||
|
@@ -47,8 +48,7 @@ const ( | |
) | ||
|
||
var ( | ||
_ sender.ExternalSender = (*network)(nil) | ||
_ Network = (*network)(nil) | ||
_ Network = (*network)(nil) | ||
|
||
errNotValidator = errors.New("node is not a validator") | ||
errNotTracked = errors.New("subnet is not tracked") | ||
|
@@ -310,25 +310,42 @@ func NewNetwork( | |
return n, nil | ||
} | ||
|
||
func (n *network) Send(msg message.OutboundMessage, nodeIDs set.Set[ids.NodeID], subnetID ids.ID, allower subnets.Allower) set.Set[ids.NodeID] { | ||
peers := n.getPeers(nodeIDs, subnetID, allower) | ||
n.peerConfig.Metrics.MultipleSendsFailed( | ||
msg.Op(), | ||
nodeIDs.Len()-len(peers), | ||
) | ||
return n.send(msg, peers) | ||
} | ||
|
||
func (n *network) Gossip( | ||
func (n *network) Send( | ||
msg message.OutboundMessage, | ||
config common.SendConfig, | ||
subnetID ids.ID, | ||
numValidatorsToSend int, | ||
numNonValidatorsToSend int, | ||
numPeersToSend int, | ||
allower subnets.Allower, | ||
) set.Set[ids.NodeID] { | ||
peers := n.samplePeers(subnetID, numValidatorsToSend, numNonValidatorsToSend, numPeersToSend, allower) | ||
return n.send(msg, peers) | ||
namedPeers := n.getPeers(config.NodeIDs, subnetID, allower) | ||
n.peerConfig.Metrics.MultipleSendsFailed( | ||
StephenButtolph marked this conversation as resolved.
Show resolved
Hide resolved
|
||
msg.Op(), | ||
config.NodeIDs.Len()-len(namedPeers), | ||
) | ||
|
||
var ( | ||
sampledPeers = n.samplePeers(config, subnetID, allower) | ||
sentTo = set.NewSet[ids.NodeID](len(namedPeers) + len(sampledPeers)) | ||
now = n.peerConfig.Clock.Time() | ||
) | ||
|
||
// send to peers and update metrics | ||
// | ||
// Note: It is guaranteed that namedPeers and sampledPeers are disjoint. | ||
for _, peers := range [][]peer.Peer{namedPeers, sampledPeers} { | ||
for _, peer := range peers { | ||
if peer.Send(n.onCloseCtx, msg) { | ||
StephenButtolph marked this conversation as resolved.
Show resolved
Hide resolved
|
||
sentTo.Add(peer.ID()) | ||
|
||
// TODO: move send fail rate calculations into the peer metrics | ||
// record metrics for success | ||
n.sendFailRateCalculator.Observe(0, now) | ||
} else { | ||
// record metrics for failure | ||
n.sendFailRateCalculator.Observe(1, now) | ||
} | ||
} | ||
} | ||
return sentTo | ||
} | ||
|
||
// HealthCheck returns information about several network layer health checks. | ||
|
@@ -695,25 +712,24 @@ func (n *network) getPeers( | |
return peers | ||
} | ||
|
||
// samplePeers samples connected peers attempting to align with the number of | ||
// requested validators, non-validators, and peers. This function will | ||
// explicitly ignore nodeIDs already included in the send config. | ||
func (n *network) samplePeers( | ||
config common.SendConfig, | ||
subnetID ids.ID, | ||
numValidatorsToSample, | ||
numNonValidatorsToSample int, | ||
numPeersToSample int, | ||
allower subnets.Allower, | ||
) []peer.Peer { | ||
// If there are fewer validators than [numValidatorsToSample], then only | ||
// sample [numValidatorsToSample] validators. | ||
subnetValidatorsLen := n.config.Validators.Count(subnetID) | ||
if subnetValidatorsLen < numValidatorsToSample { | ||
numValidatorsToSample = subnetValidatorsLen | ||
} | ||
// As an optimization, if there are fewer validators than | ||
// [numValidatorsToSample], only attempt to sample [numValidatorsToSample] | ||
// validators to potentially avoid iterating over the entire peer set. | ||
numValidatorsToSample := min(config.Validators, n.config.Validators.Count(subnetID)) | ||
|
||
n.peersLock.RLock() | ||
defer n.peersLock.RUnlock() | ||
|
||
return n.connectedPeers.Sample( | ||
numValidatorsToSample+numNonValidatorsToSample+numPeersToSample, | ||
numValidatorsToSample+config.NonValidators+config.Peers, | ||
func(p peer.Peer) bool { | ||
// Only return peers that are tracking [subnetID] | ||
trackedSubnets := p.TrackedSubnets() | ||
|
@@ -722,14 +738,20 @@ func (n *network) samplePeers( | |
} | ||
|
||
peerID := p.ID() | ||
// if the peer was already explicitly included, don't include in the | ||
// sample | ||
if config.NodeIDs.Contains(peerID) { | ||
return false | ||
} | ||
Comment on lines
+741
to
+745
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the only slightly counter intuitive part of the code. Technically, we could inverse this and explicitly return true here (and remove the |
||
|
||
_, isValidator := n.config.Validators.GetValidator(subnetID, peerID) | ||
// check if the peer is allowed to connect to the subnet | ||
if !allower.IsAllowed(peerID, isValidator) { | ||
return false | ||
} | ||
|
||
if numPeersToSample > 0 { | ||
numPeersToSample-- | ||
if config.Peers > 0 { | ||
config.Peers-- | ||
return true | ||
} | ||
|
||
|
@@ -738,37 +760,12 @@ func (n *network) samplePeers( | |
return numValidatorsToSample >= 0 | ||
} | ||
|
||
numNonValidatorsToSample-- | ||
return numNonValidatorsToSample >= 0 | ||
config.NonValidators-- | ||
return config.NonValidators >= 0 | ||
}, | ||
) | ||
} | ||
|
||
// send the message to the provided peers. | ||
// | ||
// send takes ownership of the provided message reference. So, the provided | ||
// message should only be inspected if the reference has been externally | ||
// increased. | ||
func (n *network) send(msg message.OutboundMessage, peers []peer.Peer) set.Set[ids.NodeID] { | ||
sentTo := set.NewSet[ids.NodeID](len(peers)) | ||
now := n.peerConfig.Clock.Time() | ||
|
||
// send to peer and update metrics | ||
for _, peer := range peers { | ||
if peer.Send(n.onCloseCtx, msg) { | ||
sentTo.Add(peer.ID()) | ||
|
||
// TODO: move send fail rate calculations into the peer metrics | ||
// record metrics for success | ||
n.sendFailRateCalculator.Observe(0, now) | ||
} else { | ||
// record metrics for failure | ||
n.sendFailRateCalculator.Observe(1, now) | ||
} | ||
} | ||
return sentTo | ||
} | ||
|
||
func (n *network) disconnectedFromConnecting(nodeID ids.NodeID) { | ||
n.peersLock.Lock() | ||
defer n.peersLock.Unlock() | ||
|
@@ -1208,10 +1205,10 @@ func (n *network) runTimers() { | |
// pullGossipPeerLists requests validators from peers in the network | ||
func (n *network) pullGossipPeerLists() { | ||
peers := n.samplePeers( | ||
common.SendConfig{ | ||
Validators: 1, | ||
}, | ||
constants.PrimaryNetworkID, | ||
1, // numValidatorsToSample | ||
0, // numNonValidatorsToSample | ||
0, // numPeersToSample | ||
subnets.NoOpAllower, | ||
) | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Network
embedssender.ExternalSender