Skip to content

Commit

Permalink
Cleaner logic for gossip subscriptions for new forks (#4030)
Browse files Browse the repository at this point in the history
## Issue Addressed

Cleaner resolution for #4006 

## Proposed Changes

We are currently subscribing to core topics of new forks way before the actual fork since we had just a single `CORE_TOPICS` array. This PR separates the core topics for every fork and subscribes to only required topics based on the current fork.
Also adds logic for subscribing to the core topics of a new fork only 2 slots before the fork happens.

2 slots is to give enough time for the gossip meshes to form. 

Currently doesn't add logic to remove topics from older forks in new forks. For e.g. in the coupled 4844 world, we had to remove the `BeaconBlock` topic in favour of `BeaconBlocksAndBlobsSidecar` at the 4844 fork. It should be easy enough to add though. Not adding it because I'm assuming that  #4019 will get merged before this PR and we won't require any deletion logic. Happy to add it regardless though.
  • Loading branch information
pawanjay176 committed Mar 1, 2023
1 parent ca1ce38 commit 5b18fd9
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 13 deletions.
16 changes: 12 additions & 4 deletions beacon_node/lighthouse_network/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ use crate::rpc::*;
use crate::service::behaviour::BehaviourEvent;
pub use crate::service::behaviour::Gossipsub;
use crate::types::{
subnet_from_topic_hash, GossipEncoding, GossipKind, GossipTopic, SnappyTransform, Subnet,
SubnetDiscovery,
fork_core_topics, subnet_from_topic_hash, GossipEncoding, GossipKind, GossipTopic,
SnappyTransform, Subnet, SubnetDiscovery,
};
use crate::EnrExt;
use crate::Eth2Enr;
Expand All @@ -41,6 +41,7 @@ use std::{
sync::Arc,
task::{Context, Poll},
};
use types::ForkName;
use types::{
consts::altair::SYNC_COMMITTEE_SUBNET_COUNT, EnrForkId, EthSpec, ForkContext, Slot, SubnetId,
};
Expand Down Expand Up @@ -559,13 +560,20 @@ impl<AppReqId: ReqId, TSpec: EthSpec> Network<AppReqId, TSpec> {
self.unsubscribe(gossip_topic)
}

/// Subscribe to all currently subscribed topics with the new fork digest.
pub fn subscribe_new_fork_topics(&mut self, new_fork_digest: [u8; 4]) {
/// Subscribe to all required topics for the `new_fork` with the given `new_fork_digest`.
pub fn subscribe_new_fork_topics(&mut self, new_fork: ForkName, new_fork_digest: [u8; 4]) {
// Subscribe to existing topics with new fork digest
let subscriptions = self.network_globals.gossipsub_subscriptions.read().clone();
for mut topic in subscriptions.into_iter() {
topic.fork_digest = new_fork_digest;
self.subscribe(topic);
}

// Subscribe to core topics for the new fork
for kind in fork_core_topics(&new_fork) {
let topic = GossipTopic::new(kind, GossipEncoding::default(), new_fork_digest);
self.subscribe(topic);
}
}

/// Unsubscribe from all topics that doesn't have the given fork_digest
Expand Down
4 changes: 2 additions & 2 deletions beacon_node/lighthouse_network/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,6 @@ pub use pubsub::{PubsubMessage, SnappyTransform};
pub use subnet::{Subnet, SubnetDiscovery};
pub use sync_state::{BackFillState, SyncState};
pub use topics::{
subnet_from_topic_hash, GossipEncoding, GossipKind, GossipTopic, CORE_TOPICS,
LIGHT_CLIENT_GOSSIP_TOPICS,
core_topics_to_subscribe, fork_core_topics, subnet_from_topic_hash, GossipEncoding, GossipKind,
GossipTopic, LIGHT_CLIENT_GOSSIP_TOPICS,
};
43 changes: 39 additions & 4 deletions beacon_node/lighthouse_network/src/types/topics.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use libp2p::gossipsub::{IdentTopic as Topic, TopicHash};
use serde_derive::{Deserialize, Serialize};
use strum::AsRefStr;
use types::{SubnetId, SyncSubnetId};
use types::{ForkName, SubnetId, SyncSubnetId};

use crate::Subnet;

Expand All @@ -22,21 +22,45 @@ pub const BLS_TO_EXECUTION_CHANGE_TOPIC: &str = "bls_to_execution_change";
pub const LIGHT_CLIENT_FINALITY_UPDATE: &str = "light_client_finality_update";
pub const LIGHT_CLIENT_OPTIMISTIC_UPDATE: &str = "light_client_optimistic_update";

pub const CORE_TOPICS: [GossipKind; 7] = [
pub const BASE_CORE_TOPICS: [GossipKind; 5] = [
GossipKind::BeaconBlock,
GossipKind::BeaconAggregateAndProof,
GossipKind::VoluntaryExit,
GossipKind::ProposerSlashing,
GossipKind::AttesterSlashing,
GossipKind::SignedContributionAndProof,
GossipKind::BlsToExecutionChange,
];

pub const ALTAIR_CORE_TOPICS: [GossipKind; 1] = [GossipKind::SignedContributionAndProof];

pub const CAPELLA_CORE_TOPICS: [GossipKind; 1] = [GossipKind::BlsToExecutionChange];

pub const LIGHT_CLIENT_GOSSIP_TOPICS: [GossipKind; 2] = [
GossipKind::LightClientFinalityUpdate,
GossipKind::LightClientOptimisticUpdate,
];

/// Returns the core topics associated with each fork that are new to the previous fork
pub fn fork_core_topics(fork_name: &ForkName) -> Vec<GossipKind> {
match fork_name {
ForkName::Base => BASE_CORE_TOPICS.to_vec(),
ForkName::Altair => ALTAIR_CORE_TOPICS.to_vec(),
ForkName::Merge => vec![],
ForkName::Capella => CAPELLA_CORE_TOPICS.to_vec(),
}
}

/// Returns all the topics that we need to subscribe to for a given fork
/// including topics from older forks and new topics for the current fork.
pub fn core_topics_to_subscribe(mut current_fork: ForkName) -> Vec<GossipKind> {
let mut topics = fork_core_topics(&current_fork);
while let Some(previous_fork) = current_fork.previous_fork() {
let previous_fork_topics = fork_core_topics(&previous_fork);
topics.extend(previous_fork_topics);
current_fork = previous_fork;
}
topics
}

/// A gossipsub topic which encapsulates the type of messages that should be sent and received over
/// the pubsub protocol and the way the messages should be encoded.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)]
Expand Down Expand Up @@ -390,4 +414,15 @@ mod tests {
assert_eq!("proposer_slashing", ProposerSlashing.as_ref());
assert_eq!("attester_slashing", AttesterSlashing.as_ref());
}

#[test]
fn test_core_topics_to_subscribe() {
let mut all_topics = Vec::new();
all_topics.extend(CAPELLA_CORE_TOPICS);
all_topics.extend(ALTAIR_CORE_TOPICS);
all_topics.extend(BASE_CORE_TOPICS);

let latest_fork = *ForkName::list_all().last().unwrap();
assert_eq!(core_topics_to_subscribe(latest_fork), all_topics);
}
}
6 changes: 3 additions & 3 deletions beacon_node/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use lighthouse_network::{
Context, PeerAction, PeerRequestId, PubsubMessage, ReportSource, Request, Response, Subnet,
};
use lighthouse_network::{
types::{GossipEncoding, GossipTopic},
types::{core_topics_to_subscribe, GossipEncoding, GossipTopic},
MessageId, NetworkEvent, NetworkGlobals, PeerId,
};
use slog::{crit, debug, error, info, o, trace, warn};
Expand Down Expand Up @@ -445,7 +445,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
let fork_version = self.beacon_chain.spec.fork_version_for_name(fork_name);
let fork_digest = ChainSpec::compute_fork_digest(fork_version, self.beacon_chain.genesis_validators_root);
info!(self.log, "Subscribing to new fork topics");
self.libp2p.subscribe_new_fork_topics(fork_digest);
self.libp2p.subscribe_new_fork_topics(fork_name, fork_digest);
self.next_fork_subscriptions = Box::pin(None.into());
}
else {
Expand Down Expand Up @@ -684,7 +684,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
}

let mut subscribed_topics: Vec<GossipTopic> = vec![];
for topic_kind in lighthouse_network::types::CORE_TOPICS.iter() {
for topic_kind in core_topics_to_subscribe(self.fork_context.current_fork()) {
for fork_digest in self.required_gossip_fork_digests() {
let topic = GossipTopic::new(
topic_kind.clone(),
Expand Down

0 comments on commit 5b18fd9

Please sign in to comment.