diff --git a/Cargo.lock b/Cargo.lock index 4098db65af4..6085dcd4854 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2961,6 +2961,7 @@ dependencies = [ "cumulus-relay-chain-rpc-interface", "futures", "lru 0.11.0", + "parking_lot 0.12.1", "polkadot-availability-recovery", "polkadot-collator-protocol", "polkadot-core-primitives", diff --git a/client/relay-chain-minimal-node/Cargo.toml b/client/relay-chain-minimal-node/Cargo.toml index 7f789f107bf..b8bbb621281 100644 --- a/client/relay-chain-minimal-node/Cargo.toml +++ b/client/relay-chain-minimal-node/Cargo.toml @@ -43,3 +43,4 @@ tracing = "0.1.37" async-trait = "0.1.72" futures = "0.3.28" tokio = { version = "1.29.1", features = ["macros"] } +parking_lot = "0.12.0" diff --git a/client/relay-chain-minimal-node/src/collator_overseer.rs b/client/relay-chain-minimal-node/src/collator_overseer.rs index b488db962f3..04922db74fd 100644 --- a/client/relay-chain-minimal-node/src/collator_overseer.rs +++ b/client/relay-chain-minimal-node/src/collator_overseer.rs @@ -16,7 +16,8 @@ use futures::{select, StreamExt}; use lru::LruCache; -use std::sync::Arc; +use parking_lot::Mutex; +use std::{collections::HashMap, sync::Arc}; use polkadot_availability_recovery::AvailabilityRecoverySubsystem; use polkadot_collator_protocol::{CollatorProtocolSubsystem, ProtocolSide}; @@ -27,7 +28,7 @@ use polkadot_network_bridge::{ use polkadot_node_collation_generation::CollationGenerationSubsystem; use polkadot_node_core_runtime_api::RuntimeApiSubsystem; use polkadot_node_network_protocol::{ - peer_set::PeerSetProtocolNames, + peer_set::{PeerSet, PeerSetProtocolNames}, request_response::{ v1::{AvailableDataFetchingRequest, CollationFetchingRequest}, IncomingRequestReceiver, ReqProtocolNames, @@ -41,7 +42,7 @@ use polkadot_overseer::{ use polkadot_primitives::CollatorPair; use sc_authority_discovery::Service as AuthorityDiscoveryService; -use sc_network::NetworkStateInfo; +use sc_network::{service::traits::NotificationService, NetworkStateInfo}; use sc_service::TaskManager; use sp_runtime::traits::Block as BlockT; @@ -74,6 +75,8 @@ pub(crate) struct CollatorOverseerGenArgs<'a> { pub req_protocol_names: ReqProtocolNames, /// Peerset protocols name mapping pub peer_set_protocol_names: PeerSetProtocolNames, + /// Notification services. + pub notification_services: HashMap>, } fn build_overseer( @@ -90,6 +93,7 @@ fn build_overseer( collator_pair, req_protocol_names, peer_set_protocol_names, + notification_services, }: CollatorOverseerGenArgs<'_>, ) -> Result< (Overseer, Arc>, OverseerHandle), @@ -97,6 +101,8 @@ fn build_overseer( > { let spawner = SpawnGlue(spawner); let network_bridge_metrics: NetworkBridgeMetrics = Metrics::register(registry)?; + let notification_sinks = Arc::new(Mutex::new(HashMap::new())); + let builder = Overseer::builder() .availability_distribution(DummySubsystem) .availability_recovery(AvailabilityRecoverySubsystem::with_availability_store_skip( @@ -126,6 +132,8 @@ fn build_overseer( sync_oracle, network_bridge_metrics.clone(), peer_set_protocol_names.clone(), + notification_services, + notification_sinks.clone(), )) .network_bridge_tx(NetworkBridgeTxSubsystem::new( network_service, @@ -133,6 +141,7 @@ fn build_overseer( network_bridge_metrics, req_protocol_names, peer_set_protocol_names, + notification_sinks, )) .provisioner(DummySubsystem) .runtime_api(RuntimeApiSubsystem::new( diff --git a/client/relay-chain-minimal-node/src/lib.rs b/client/relay-chain-minimal-node/src/lib.rs index 6def072913b..0c6c09a0822 100644 --- a/client/relay-chain-minimal-node/src/lib.rs +++ b/client/relay-chain-minimal-node/src/lib.rs @@ -21,7 +21,7 @@ use cumulus_relay_chain_rpc_interface::{RelayChainRpcInterface, Url}; use network::build_collator_network; use polkadot_network_bridge::{peer_sets_info, IsAuthority}; use polkadot_node_network_protocol::{ - peer_set::PeerSetProtocolNames, + peer_set::{PeerSet, PeerSetProtocolNames}, request_response::{v1, IncomingRequest, IncomingRequestReceiver, Protocol, ReqProtocolNames}, }; @@ -29,12 +29,15 @@ use polkadot_node_subsystem_util::metrics::prometheus::Registry; use polkadot_primitives::CollatorPair; use sc_authority_discovery::Service as AuthorityDiscoveryService; -use sc_network::{config::FullNetworkConfiguration, Event, NetworkEventStream, NetworkService}; +use sc_network::{ + config::FullNetworkConfiguration, service::traits::NotificationService, Event, + NetworkEventStream, NetworkService, +}; use sc_service::{Configuration, TaskManager}; use sp_runtime::{app_crypto::Pair, traits::Block as BlockT}; use futures::StreamExt; -use std::sync::Arc; +use std::{collections::HashMap, sync::Arc}; mod collator_overseer; @@ -143,9 +146,13 @@ async fn new_minimal_relay_chain( PeerSetProtocolNames::new(genesis_hash, config.chain_spec.fork_id()); let is_authority = if role.is_authority() { IsAuthority::Yes } else { IsAuthority::No }; - for config in peer_sets_info(is_authority, &peer_set_protocol_names) { - net_config.add_notification_protocol(config); - } + let notification_services = peer_sets_info(is_authority, &peer_set_protocol_names) + .into_iter() + .map(|(config, (peerset, service))| { + net_config.add_notification_protocol(config); + (peerset, service) + }) + .collect::>>(); let request_protocol_names = ReqProtocolNames::new(genesis_hash, config.chain_spec.fork_id()); let (collation_req_receiver, available_data_req_receiver) = @@ -184,6 +191,7 @@ async fn new_minimal_relay_chain( collator_pair, req_protocol_names: request_protocol_names, peer_set_protocol_names, + notification_services, }; let overseer_handle = diff --git a/client/relay-chain-minimal-node/src/network.rs b/client/relay-chain-minimal-node/src/network.rs index 5097e6ce33a..05bfa0c65fb 100644 --- a/client/relay-chain-minimal-node/src/network.rs +++ b/client/relay-chain-minimal-node/src/network.rs @@ -22,13 +22,12 @@ use sc_network::{ NonDefaultSetConfig, NonReservedPeerMode, NotificationHandshake, ProtocolId, SetConfig, }, peer_store::PeerStore, - NetworkService, + NetworkService, NotificationService, }; use sc_network::config::FullNetworkConfiguration; use sc_network_common::{role::Roles, sync::message::BlockAnnouncesHandshake}; use sc_service::{error::Error, Configuration, NetworkStarter, SpawnTaskHandle}; -use sc_utils::mpsc::tracing_unbounded; use std::{iter, sync::Arc}; @@ -44,7 +43,7 @@ pub(crate) fn build_collator_network( Error, > { let protocol_id = config.protocol_id(); - let block_announce_config = get_block_announce_proto_config::( + let (block_announce_config, _notification_service) = get_block_announce_proto_config::( protocol_id.clone(), &None, Roles::from(&config.role), @@ -64,8 +63,6 @@ pub(crate) fn build_collator_network( let peer_store_handle = peer_store.handle(); spawn_handle.spawn("peer-store", Some("networking"), peer_store.run()); - // RX is not used for anything because syncing is not started for the minimal node - let (tx, _rx) = tracing_unbounded("mpsc_syncing_engine_protocol", 100_000); let network_params = sc_network::config::Params:: { role: config.role.clone(), executor: { @@ -81,7 +78,6 @@ pub(crate) fn build_collator_network( protocol_id, metrics_registry: config.prometheus_config.as_ref().map(|config| config.registry.clone()), block_announce_config, - tx, }; let network_worker = sc_network::NetworkWorker::new(network_params)?; @@ -133,7 +129,7 @@ fn get_block_announce_proto_config( best_number: NumberFor, best_hash: B::Hash, genesis_hash: B::Hash, -) -> NonDefaultSetConfig { +) -> (NonDefaultSetConfig, Box) { let block_announces_protocol = { let genesis_hash = genesis_hash.as_ref(); if let Some(ref fork_id) = fork_id { @@ -143,12 +139,11 @@ fn get_block_announce_proto_config( } }; - NonDefaultSetConfig { - notifications_protocol: block_announces_protocol.into(), - fallback_names: iter::once(format!("/{}/block-announces/1", protocol_id.as_ref()).into()) - .collect(), - max_notification_size: 1024 * 1024, - handshake: Some(NotificationHandshake::new(BlockAnnouncesHandshake::::build( + NonDefaultSetConfig::new( + block_announces_protocol.into(), + iter::once(format!("/{}/block-announces/1", protocol_id.as_ref()).into()).collect(), + 1024 * 1024, + Some(NotificationHandshake::new(BlockAnnouncesHandshake::::build( roles, best_number, best_hash, @@ -156,11 +151,11 @@ fn get_block_announce_proto_config( ))), // NOTE: `set_config` will be ignored by `protocol.rs` as the block announcement // protocol is still hardcoded into the peerset. - set_config: SetConfig { + SetConfig { in_peers: 0, out_peers: 0, reserved_nodes: Vec::new(), non_reserved_mode: NonReservedPeerMode::Deny, }, - } + ) }