Skip to content

Commit

Permalink
Rework the event system of sc-network
Browse files Browse the repository at this point in the history
This commit introduces a new concept called `NotificationService` which
allows Polkadot protocols to communicate with the underlying
notification protocol implementation directly, without routing events
through `NetworkWorker`. This implies that each protocol has its own
service which it uses to communicate with remote peers and that each
`NotificationService` is unique with respect to the underlying
notification protocol, meaning `NotificationService` for the transaction
protocol can only be used to send and receive transaction-related
notifications.

The `NotificationService` concept introduces two additional benefits:
  * allow protocols to start using custom handshakes
  * allow protocols to accept/reject inbound peers

Previously the validation of inbound connections was solely
the responsibility of `ProtocolController`. This caused issues with
light peers and `SyncingEngine` as `ProtocolController` would accept
more peers than `SyncingEngine` could accept which caused peers to have
differing views of their own states. `SyncingEngine` would reject excess
peers but these rejections were not properly communicated to those peers
causing them to assume that they were accepted.

With `NotificationService`, the local handshake is not sent to remote
peer if peer is rejected which allows it to detect that it was rejected.

This commit also deprecates the use of `NetworkEventStream` for all
notification-related events and going forward only DHT events are
provided through `NetworkEventStream`. If protocols wish to follow
each other's events, they must introduce additional abtractions, as is
done for GRANDPA and transactions protocols by following the syncing
protocol through `SyncEventStream`.

Fixes #512
Fixes #514
Fixes #515
Fixes #554
Fixes #556

Co-authored-by: Dmitry Markin <dmitry@markin.tech>
  • Loading branch information
altonen and dmitry-markin committed Sep 2, 2023
1 parent 8cfaad4 commit 15d73c0
Show file tree
Hide file tree
Showing 66 changed files with 5,159 additions and 1,996 deletions.
220 changes: 112 additions & 108 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions cumulus/client/relay-chain-minimal-node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ schnellru = "0.2.1"
tracing = "0.1.37"
async-trait = "0.1.73"
futures = "0.3.28"
parking_lot = "0.12.1"

[features]
network-protocol-staging = [
Expand Down
15 changes: 12 additions & 3 deletions cumulus/client/relay-chain-minimal-node/src/collator_overseer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

use futures::{select, StreamExt};
use parking_lot::Mutex;
use schnellru::{ByLength, LruMap};
use std::sync::Arc;
use std::{collections::HashMap, sync::Arc};

use polkadot_availability_recovery::AvailabilityRecoverySubsystem;
use polkadot_collator_protocol::{CollatorProtocolSubsystem, ProtocolSide};
Expand All @@ -27,7 +28,7 @@ use polkadot_network_bridge::{
use polkadot_node_collation_generation::CollationGenerationSubsystem;
use polkadot_node_core_runtime_api::RuntimeApiSubsystem;
use polkadot_node_network_protocol::{
peer_set::PeerSetProtocolNames,
peer_set::{PeerSet, PeerSetProtocolNames},
request_response::{
v1::{self, AvailableDataFetchingRequest},
vstaging, IncomingRequestReceiver, ReqProtocolNames,
Expand All @@ -41,7 +42,7 @@ use polkadot_overseer::{
use polkadot_primitives::CollatorPair;

use sc_authority_discovery::Service as AuthorityDiscoveryService;
use sc_network::NetworkStateInfo;
use sc_network::{NetworkStateInfo, NotificationService};
use sc_service::TaskManager;
use sp_runtime::traits::Block as BlockT;

Expand Down Expand Up @@ -77,6 +78,8 @@ pub(crate) struct CollatorOverseerGenArgs<'a> {
pub req_protocol_names: ReqProtocolNames,
/// Peerset protocols name mapping
pub peer_set_protocol_names: PeerSetProtocolNames,
/// Notification services for validation/collation protocols.
pub notification_services: HashMap<PeerSet, Box<dyn NotificationService>>,
}

fn build_overseer(
Expand All @@ -94,13 +97,16 @@ fn build_overseer(
collator_pair,
req_protocol_names,
peer_set_protocol_names,
notification_services,
}: CollatorOverseerGenArgs<'_>,
) -> Result<
(Overseer<SpawnGlue<sc_service::SpawnTaskHandle>, Arc<BlockChainRpcClient>>, OverseerHandle),
RelayChainError,
> {
let spawner = SpawnGlue(spawner);
let network_bridge_metrics: NetworkBridgeMetrics = Metrics::register(registry)?;
let notification_sinks = Arc::new(Mutex::new(HashMap::new()));

let builder = Overseer::builder()
.availability_distribution(DummySubsystem)
.availability_recovery(AvailabilityRecoverySubsystem::with_availability_store_skip(
Expand Down Expand Up @@ -131,13 +137,16 @@ fn build_overseer(
sync_oracle,
network_bridge_metrics.clone(),
peer_set_protocol_names.clone(),
notification_services,
notification_sinks.clone(),
))
.network_bridge_tx(NetworkBridgeTxSubsystem::new(
network_service,
authority_discovery_service,
network_bridge_metrics,
req_protocol_names,
peer_set_protocol_names,
notification_sinks,
))
.provisioner(DummySubsystem)
.runtime_api(RuntimeApiSubsystem::new(
Expand Down
14 changes: 9 additions & 5 deletions cumulus/client/relay-chain-minimal-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use cumulus_relay_chain_rpc_interface::{RelayChainRpcClient, RelayChainRpcInterf
use network::build_collator_network;
use polkadot_network_bridge::{peer_sets_info, IsAuthority};
use polkadot_node_network_protocol::{
peer_set::PeerSetProtocolNames,
peer_set::{PeerSet, PeerSetProtocolNames},
request_response::{
v1, vstaging, IncomingRequest, IncomingRequestReceiver, Protocol, ReqProtocolNames,
},
Expand Down Expand Up @@ -176,10 +176,13 @@ async fn new_minimal_relay_chain(
let peer_set_protocol_names =
PeerSetProtocolNames::new(genesis_hash, config.chain_spec.fork_id());
let is_authority = if role.is_authority() { IsAuthority::Yes } else { IsAuthority::No };

for config in peer_sets_info(is_authority, &peer_set_protocol_names) {
net_config.add_notification_protocol(config);
}
let notification_services = peer_sets_info(is_authority, &peer_set_protocol_names)
.into_iter()
.map(|(config, (peerset, service))| {
net_config.add_notification_protocol(config);
(peerset, service)
})
.collect::<std::collections::HashMap<PeerSet, Box<dyn sc_network::NotificationService>>>();

let request_protocol_names = ReqProtocolNames::new(genesis_hash, config.chain_spec.fork_id());
let (collation_req_receiver_v1, collation_req_receiver_vstaging, available_data_req_receiver) =
Expand Down Expand Up @@ -219,6 +222,7 @@ async fn new_minimal_relay_chain(
collator_pair,
req_protocol_names: request_protocol_names,
peer_set_protocol_names,
notification_services,
};

let overseer_handle =
Expand Down
25 changes: 10 additions & 15 deletions cumulus/client/relay-chain-minimal-node/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,9 @@ use sc_network::{
NetworkService,
};

use sc_network::config::FullNetworkConfiguration;
use sc_network::{config::FullNetworkConfiguration, NotificationService};
use sc_network_common::{role::Roles, sync::message::BlockAnnouncesHandshake};
use sc_service::{error::Error, Configuration, NetworkStarter, SpawnTaskHandle};
use sc_utils::mpsc::tracing_unbounded;

use std::{iter, sync::Arc};

Expand All @@ -44,7 +43,7 @@ pub(crate) fn build_collator_network(
Error,
> {
let protocol_id = config.protocol_id();
let block_announce_config = get_block_announce_proto_config::<Block>(
let (block_announce_config, _notification_service) = get_block_announce_proto_config::<Block>(
protocol_id.clone(),
&None,
Roles::from(&config.role),
Expand All @@ -64,8 +63,6 @@ pub(crate) fn build_collator_network(
let peer_store_handle = peer_store.handle();
spawn_handle.spawn("peer-store", Some("networking"), peer_store.run());

// RX is not used for anything because syncing is not started for the minimal node
let (tx, _rx) = tracing_unbounded("mpsc_syncing_engine_protocol", 100_000);
let network_params = sc_network::config::Params::<Block> {
role: config.role.clone(),
executor: {
Expand All @@ -81,7 +78,6 @@ pub(crate) fn build_collator_network(
protocol_id,
metrics_registry: config.prometheus_config.as_ref().map(|config| config.registry.clone()),
block_announce_config,
tx,
};

let network_worker = sc_network::NetworkWorker::new(network_params)?;
Expand Down Expand Up @@ -133,7 +129,7 @@ fn get_block_announce_proto_config<B: BlockT>(
best_number: NumberFor<B>,
best_hash: B::Hash,
genesis_hash: B::Hash,
) -> NonDefaultSetConfig {
) -> (NonDefaultSetConfig, Box<dyn NotificationService>) {
let block_announces_protocol = {
let genesis_hash = genesis_hash.as_ref();
if let Some(ref fork_id) = fork_id {
Expand All @@ -143,24 +139,23 @@ fn get_block_announce_proto_config<B: BlockT>(
}
};

NonDefaultSetConfig {
notifications_protocol: block_announces_protocol.into(),
fallback_names: iter::once(format!("/{}/block-announces/1", protocol_id.as_ref()).into())
.collect(),
max_notification_size: 1024 * 1024,
handshake: Some(NotificationHandshake::new(BlockAnnouncesHandshake::<B>::build(
NonDefaultSetConfig::new(
block_announces_protocol.into(),
iter::once(format!("/{}/block-announces/1", protocol_id.as_ref()).into()).collect(),
1024 * 1024,
Some(NotificationHandshake::new(BlockAnnouncesHandshake::<B>::build(
roles,
best_number,
best_hash,
genesis_hash,
))),
// NOTE: `set_config` will be ignored by `protocol.rs` as the block announcement
// protocol is still hardcoded into the peerset.
set_config: SetConfig {
SetConfig {
in_peers: 0,
out_peers: 0,
reserved_nodes: Vec::new(),
non_reserved_mode: NonReservedPeerMode::Deny,
},
}
)
}
1 change: 1 addition & 0 deletions polkadot/node/network/bridge/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ pub(crate) enum WireMessage<M> {
ViewUpdate(View),
}

#[derive(Debug)]
pub(crate) struct PeerData {
/// The Latest view sent by the peer.
view: View,
Expand Down
60 changes: 29 additions & 31 deletions polkadot/node/network/bridge/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,24 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

use std::{collections::HashSet, sync::Arc};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};

use async_trait::async_trait;
use futures::{prelude::*, stream::BoxStream};
use parking_lot::Mutex;

use parity_scale_codec::Encode;

use sc_network::{
config::parse_addr, multiaddr::Multiaddr, types::ProtocolName, Event as NetworkEvent,
IfDisconnected, NetworkEventStream, NetworkNotification, NetworkPeers, NetworkRequest,
NetworkService, OutboundFailure, ReputationChange, RequestFailure,
config::parse_addr, multiaddr::Multiaddr, types::ProtocolName, IfDisconnected, MessageSink,
NetworkPeers, NetworkRequest, NetworkService, OutboundFailure, ReputationChange,
RequestFailure,
};

use polkadot_node_network_protocol::{
peer_set::{PeerSet, PeerSetProtocolNames, ProtocolVersion},
peer_set::{PeerSet, ProtocolVersion},
request_response::{OutgoingRequest, Recipient, ReqProtocolNames, Requests},
PeerId,
};
Expand All @@ -45,51 +48,50 @@ const LOG_TARGET: &'static str = "parachain::network-bridge-net";
/// messages that are compatible with the passed peer set, as that is currently not enforced by
/// this function. These are messages of type `WireMessage` parameterized on the matching type.
pub(crate) fn send_message<M>(
net: &mut impl Network,
mut peers: Vec<PeerId>,
peer_set: PeerSet,
version: ProtocolVersion,
protocol_names: &PeerSetProtocolNames,
message: M,
metrics: &super::Metrics,
network_notification_sinks: &Arc<Mutex<HashMap<(PeerSet, PeerId), Box<dyn MessageSink>>>>,
) where
M: Encode + Clone,
{
if peers.is_empty() {
return
}

let message = {
let encoded = message.encode();
metrics.on_notification_sent(peer_set, version, encoded.len(), peers.len());
encoded
};

let notification_sinks = network_notification_sinks.lock();

// optimization: avoid cloning the message for the last peer in the
// list. The message payload can be quite large. If the underlying
// network used `Bytes` this would not be necessary.
//
// peer may have gotten disconnect by the time `send_message()` is called
// at which point the the sink is not available.
let last_peer = peers.pop();

// We always send messages on the "main" name even when a negotiated
// fallback is used. The libp2p implementation handles the fallback
// under the hood.
let protocol_name = protocol_names.get_main_name(peer_set);
peers.into_iter().for_each(|peer| {
net.write_notification(peer, protocol_name.clone(), message.clone());
if let Some(sink) = notification_sinks.get(&(peer_set, peer)) {
sink.send_sync_notification(message.clone());
}
});

if let Some(peer) = last_peer {
net.write_notification(peer, protocol_name, message);
if let Some(sink) = notification_sinks.get(&(peer_set, peer)) {
sink.send_sync_notification(message.clone());
}
}
}

/// An abstraction over networking for the purposes of this subsystem.
#[async_trait]
pub trait Network: Clone + Send + 'static {
/// Get a stream of all events occurring on the network. This may include events unrelated
/// to the Polkadot protocol - the user of this function should filter only for events related
/// to the [`VALIDATION_PROTOCOL_NAME`](VALIDATION_PROTOCOL_NAME)
/// or [`COLLATION_PROTOCOL_NAME`](COLLATION_PROTOCOL_NAME)
fn event_stream(&mut self) -> BoxStream<'static, NetworkEvent>;

/// Ask the network to keep a substream open with these nodes and not disconnect from them
/// until removed from the protocol's peer set.
/// Note that `out_peers` setting has no effect on this.
Expand Down Expand Up @@ -121,16 +123,12 @@ pub trait Network: Clone + Send + 'static {
/// Disconnect a given peer from the protocol specified without harming reputation.
fn disconnect_peer(&self, who: PeerId, protocol: ProtocolName);

/// Write a notification to a peer on the given protocol.
fn write_notification(&self, who: PeerId, protocol: ProtocolName, message: Vec<u8>);
/// Get peer role.
fn peer_role(&self, who: PeerId, handshake: Vec<u8>) -> Option<sc_network::ObservedRole>;
}

#[async_trait]
impl Network for Arc<NetworkService<Block, Hash>> {
fn event_stream(&mut self) -> BoxStream<'static, NetworkEvent> {
NetworkService::event_stream(self, "polkadot-network-bridge").boxed()
}

async fn set_reserved_peers(
&mut self,
protocol: ProtocolName,
Expand All @@ -155,10 +153,6 @@ impl Network for Arc<NetworkService<Block, Hash>> {
NetworkService::disconnect_peer(&**self, who, protocol);
}

fn write_notification(&self, who: PeerId, protocol: ProtocolName, message: Vec<u8>) {
NetworkService::write_notification(&**self, who, protocol, message);
}

async fn start_request<AD: AuthorityDiscovery>(
&self,
authority_discovery: &mut AD,
Expand Down Expand Up @@ -230,6 +224,10 @@ impl Network for Arc<NetworkService<Block, Hash>> {
if_disconnected,
);
}

fn peer_role(&self, who: PeerId, handshake: Vec<u8>) -> Option<sc_network::ObservedRole> {
NetworkService::peer_role(self, who, handshake)
}
}

/// We assume one `peer_id` per `authority_id`.
Expand Down
Loading

0 comments on commit 15d73c0

Please sign in to comment.