diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 00c414693a4..38bccdb91bf 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -635,6 +635,35 @@ zombienet-0005-migrate_solo_to_para: tags: - zombienet-polkadot-integration-test +0006-rpc_collator_builds_blocks: + stage: integration-test + image: "${ZOMBIENET_IMAGE}" + <<: *zombienet-refs + needs: + - job: build-push-image-test-parachain + variables: + POLKADOT_IMAGE: "docker.io/paritypr/polkadot-debug:master" + GH_DIR: "https://github.com/paritytech/cumulus/tree/${CI_COMMIT_SHORT_SHA}/zombienet_tests" + COL_IMAGE: "docker.io/paritypr/test-parachain:${CI_COMMIT_REF_NAME}-${CI_COMMIT_SHORT_SHA}" + before_script: + - echo "Zombie-net Tests Config" + - echo "${ZOMBIENET_IMAGE}" + - echo "${RELAY_IMAGE}" + - echo "${COL_IMAGE}" + - echo "${GH_DIR}" + - export DEBUG=zombie + - export RELAY_IMAGE=${POLKADOT_IMAGE} + - export COL_IMAGE=${COL_IMAGE} + script: + - /home/nonroot/zombie-net/scripts/ci/run-test-env-manager.sh + --github-remote-dir="${GH_DIR}" + --concurrency=1 + --test="0006-rpc_collator_builds_blocks.feature" + allow_failure: true + retry: 2 + tags: + - zombienet-polkadot-integration-test + #### stage: .post # This job cancels the whole pipeline if any of provided jobs fail. diff --git a/Cargo.lock b/Cargo.lock index be498e906c8..5c39be1c946 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1692,6 +1692,7 @@ dependencies = [ "polkadot-node-subsystem", "polkadot-overseer", "polkadot-primitives", + "portpicker", "rand 0.8.5", "sc-cli", "sc-client-api", @@ -1959,6 +1960,7 @@ dependencies = [ "async-trait", "cumulus-primitives-core", "cumulus-relay-chain-interface", + "cumulus-test-service", "futures", "futures-timer", "polkadot-cli", @@ -1966,6 +1968,7 @@ dependencies = [ "polkadot-primitives", "polkadot-service", "polkadot-test-client", + "prioritized-metered-channel", "sc-cli", "sc-client-api", "sc-sysinfo", @@ -1997,6 +2000,48 @@ dependencies = [ "thiserror", ] +[[package]] +name = "cumulus-relay-chain-minimal-node" +version = "0.1.0" +dependencies = [ + "async-trait", + "cumulus-primitives-core", + "cumulus-relay-chain-interface", + "cumulus-relay-chain-rpc-interface", + "futures", + "lru 0.8.0", + "polkadot-availability-distribution", + "polkadot-core-primitives", + "polkadot-network-bridge", + "polkadot-node-core-av-store", + "polkadot-node-network-protocol", + "polkadot-node-subsystem-util", + "polkadot-overseer", + "polkadot-primitives", + "polkadot-service", + "sc-authority-discovery", + "sc-client-api", + "sc-consensus", + "sc-keystore", + "sc-network", + "sc-network-common", + "sc-network-light", + "sc-network-sync", + "sc-service", + "sc-telemetry", + "sc-tracing", + "sc-transaction-pool", + "sc-transaction-pool-api", + "sp-api", + "sp-blockchain", + "sp-consensus", + "sp-consensus-babe", + "sp-runtime", + "tokio", + "tracing", + "url", +] + [[package]] name = "cumulus-relay-chain-rpc-interface" version = "0.1.0" @@ -2012,6 +2057,9 @@ dependencies = [ "polkadot-service", "sc-client-api", "sc-rpc-api", + "sp-api", + "sp-authority-discovery", + "sp-consensus-babe", "sp-core", "sp-runtime", "sp-state-machine", @@ -2119,6 +2167,7 @@ dependencies = [ "cumulus-primitives-parachain-inherent", "cumulus-relay-chain-inprocess-interface", "cumulus-relay-chain-interface", + "cumulus-relay-chain-minimal-node", "cumulus-relay-chain-rpc-interface", "cumulus-test-relay-validation-worker-provider", "cumulus-test-runtime", @@ -6484,6 +6533,7 @@ dependencies = [ "cumulus-primitives-parachain-inherent", "cumulus-relay-chain-inprocess-interface", "cumulus-relay-chain-interface", + "cumulus-relay-chain-minimal-node", "cumulus-relay-chain-rpc-interface", "frame-benchmarking", "frame-benchmarking-cli", @@ -7747,6 +7797,7 @@ dependencies = [ "cumulus-primitives-parachain-inherent", "cumulus-relay-chain-inprocess-interface", "cumulus-relay-chain-interface", + "cumulus-relay-chain-minimal-node", "cumulus-relay-chain-rpc-interface", "frame-benchmarking", "frame-benchmarking-cli", diff --git a/Cargo.toml b/Cargo.toml index 487fc124da8..cacb7bea7f1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,7 @@ members = [ "client/relay-chain-interface", "client/relay-chain-inprocess-interface", "client/relay-chain-rpc-interface", + "client/relay-chain-minimal-node", "pallets/aura-ext", "pallets/collator-selection", "pallets/dmp-queue", diff --git a/client/cli/src/lib.rs b/client/cli/src/lib.rs index 9fea382a7ee..6afe0c53b66 100644 --- a/client/cli/src/lib.rs +++ b/client/cli/src/lib.rs @@ -264,7 +264,8 @@ impl sc_cli::CliConfiguration for ExportGenesisWasmCommand { fn validate_relay_chain_url(arg: &str) -> Result { let url = Url::parse(arg).map_err(|e| e.to_string())?; - if url.scheme() == "ws" { + let scheme = url.scheme(); + if scheme == "ws" || scheme == "wss" { Ok(url) } else { Err(format!( @@ -290,9 +291,8 @@ pub struct RunCmd { /// EXPERIMENTAL: Specify an URL to a relay chain full node to communicate with. #[clap( long, - value_parser = validate_relay_chain_url, - conflicts_with_all = &["alice", "bob", "charlie", "dave", "eve", "ferdie", "one", "two"] ) - ] + value_parser = validate_relay_chain_url + )] pub relay_chain_rpc_url: Option, } diff --git a/client/network/src/tests.rs b/client/network/src/tests.rs index c2093c75ada..cef327b8763 100644 --- a/client/network/src/tests.rs +++ b/client/network/src/tests.rs @@ -174,7 +174,7 @@ impl RelayChainInterface for DummyRelayChainInterface { Ok(false) } - fn overseer_handle(&self) -> RelayChainResult> { + fn overseer_handle(&self) -> RelayChainResult { unimplemented!("Not needed for test") } diff --git a/client/pov-recovery/Cargo.toml b/client/pov-recovery/Cargo.toml index 5bc5adc87da..530e3340ec9 100644 --- a/client/pov-recovery/Cargo.toml +++ b/client/pov-recovery/Cargo.toml @@ -31,6 +31,7 @@ cumulus-relay-chain-interface = {path = "../relay-chain-interface"} [dev-dependencies] tokio = { version = "1.21.1", features = ["macros"] } +portpicker = "0.1.1" # Cumulus cumulus-test-service = { path = "../../test/service" } diff --git a/client/pov-recovery/src/lib.rs b/client/pov-recovery/src/lib.rs index aeff69d56c5..3327d4bb86b 100644 --- a/client/pov-recovery/src/lib.rs +++ b/client/pov-recovery/src/lib.rs @@ -181,7 +181,7 @@ where Ok(_) => return, Err(e) => { tracing::debug!( - target: "cumulus-consensus", + target: LOG_TARGET, error = ?e, block_hash = ?hash, "Failed to get block status", @@ -190,6 +190,7 @@ where }, } + tracing::debug!(target: LOG_TARGET, ?hash, "Adding pending candidate"); if self .pending_candidates .insert( @@ -233,6 +234,7 @@ where None => return, }; + tracing::debug!(target: LOG_TARGET, ?block_hash, "Issuing recovery request"); self.active_candidate_recovery .recover_candidate(block_hash, pending_candidate) .await; @@ -301,7 +303,7 @@ where Ok(BlockStatus::Unknown) => { if self.active_candidate_recovery.is_being_recovered(&parent) { tracing::debug!( - target: "cumulus-consensus", + target: LOG_TARGET, ?block_hash, parent_hash = ?parent, "Parent is still being recovered, waiting.", @@ -311,7 +313,7 @@ where return } else { tracing::debug!( - target: "cumulus-consensus", + target: LOG_TARGET, ?block_hash, parent_hash = ?parent, "Parent not found while trying to import recovered block.", @@ -324,7 +326,7 @@ where }, Err(error) => { tracing::debug!( - target: "cumulus-consensus", + target: LOG_TARGET, block_hash = ?parent, ?error, "Error while checking block status", @@ -346,6 +348,8 @@ where /// This will also recursivley drain `waiting_for_parent` and import them as well. async fn import_block(&mut self, block: Block) { let mut blocks = VecDeque::new(); + + tracing::debug!(target: LOG_TARGET, hash = ?block.hash(), "Importing block retrieved using pov_recovery"); blocks.push_back(block); let mut incoming_blocks = Vec::new(); diff --git a/client/pov-recovery/tests/pov_recovery.rs b/client/pov-recovery/tests/pov_recovery.rs index bd93d00dd4c..dd8be634896 100644 --- a/client/pov-recovery/tests/pov_recovery.rs +++ b/client/pov-recovery/tests/pov_recovery.rs @@ -16,6 +16,7 @@ use cumulus_primitives_core::ParaId; use cumulus_test_service::{initial_head_data, Keyring::*}; +use futures::join; use std::sync::Arc; /// Tests the PoV recovery. @@ -34,12 +35,13 @@ async fn pov_recovery() { let tokio_handle = tokio::runtime::Handle::current(); // Start alice + let ws_port = portpicker::pick_unused_port().expect("No free ports"); let alice = cumulus_test_service::run_relay_chain_validator_node( tokio_handle.clone(), Alice, || {}, Vec::new(), - None, + Some(ws_port), ); // Start bob @@ -90,10 +92,25 @@ async fn pov_recovery() { .build() .await; - let eve = cumulus_test_service::TestNodeBuilder::new(para_id, tokio_handle, Eve) + let eve = cumulus_test_service::TestNodeBuilder::new(para_id, tokio_handle.clone(), Eve) + .use_null_consensus() + .connect_to_parachain_node(&charlie) + .connect_to_relay_chain_nodes(vec![&alice, &bob]) + .wrap_announce_block(|_| { + // Never announce any block + Arc::new(|_, _| {}) + }) + .build() + .await; + + // Run ferdie as parachain RPC collator and one as parachain RPC full node + // + // They will need to recover the pov blocks through availability recovery. + let ferdie = cumulus_test_service::TestNodeBuilder::new(para_id, tokio_handle.clone(), Ferdie) .use_null_consensus() .connect_to_parachain_node(&charlie) .connect_to_relay_chain_nodes(vec![&alice, &bob]) + .use_external_relay_chain_node_at_port(ws_port) .wrap_announce_block(|_| { // Never announce any block Arc::new(|_, _| {}) @@ -101,5 +118,23 @@ async fn pov_recovery() { .build() .await; - futures::future::join(dave.wait_for_blocks(7), eve.wait_for_blocks(7)).await; + let one = cumulus_test_service::TestNodeBuilder::new(para_id, tokio_handle, One) + .enable_collator() + .use_null_consensus() + .connect_to_parachain_node(&charlie) + .connect_to_relay_chain_nodes(vec![&alice, &bob]) + .use_external_relay_chain_node_at_port(ws_port) + .wrap_announce_block(|_| { + // Never announce any block + Arc::new(|_, _| {}) + }) + .build() + .await; + + join!( + dave.wait_for_blocks(7), + eve.wait_for_blocks(7), + ferdie.wait_for_blocks(7), + one.wait_for_blocks(7) + ); } diff --git a/client/relay-chain-inprocess-interface/Cargo.toml b/client/relay-chain-inprocess-interface/Cargo.toml index e2c38c87de7..80afe12228d 100644 --- a/client/relay-chain-inprocess-interface/Cargo.toml +++ b/client/relay-chain-inprocess-interface/Cargo.toml @@ -38,3 +38,7 @@ sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master # Polkadot polkadot-primitives = { git = "https://github.com/paritytech/polkadot", branch = "master" } polkadot-test-client = { git = "https://github.com/paritytech/polkadot", branch = "master" } +metered = { package = "prioritized-metered-channel", version = "0.2.0" } + +# Cumulus +cumulus-test-service = { path = "../../test/service" } diff --git a/client/relay-chain-inprocess-interface/src/lib.rs b/client/relay-chain-inprocess-interface/src/lib.rs index d713ab9cf38..70df07ad478 100644 --- a/client/relay-chain-inprocess-interface/src/lib.rs +++ b/client/relay-chain-inprocess-interface/src/lib.rs @@ -50,7 +50,7 @@ pub struct RelayChainInProcessInterface { full_client: Arc, backend: Arc, sync_oracle: Arc, - overseer_handle: Option, + overseer_handle: Handle, } impl RelayChainInProcessInterface { @@ -59,7 +59,7 @@ impl RelayChainInProcessInterface { full_client: Arc, backend: Arc, sync_oracle: Arc, - overseer_handle: Option, + overseer_handle: Handle, ) -> Self { Self { full_client, backend, sync_oracle, overseer_handle } } @@ -171,7 +171,7 @@ where Ok(self.sync_oracle.is_major_syncing()) } - fn overseer_handle(&self) -> RelayChainResult> { + fn overseer_handle(&self) -> RelayChainResult { Ok(self.overseer_handle.clone()) } @@ -288,7 +288,7 @@ struct RelayChainInProcessInterfaceBuilder { polkadot_client: polkadot_client::Client, backend: Arc, sync_oracle: Arc, - overseer_handle: Option, + overseer_handle: Handle, } impl RelayChainInProcessInterfaceBuilder { @@ -378,7 +378,9 @@ pub fn build_inprocess_relay_chain( polkadot_client: full_node.client.clone(), backend: full_node.backend.clone(), sync_oracle, - overseer_handle: full_node.overseer_handle.clone(), + overseer_handle: full_node.overseer_handle.clone().ok_or(RelayChainError::GenericError( + "Overseer not running in full node.".to_string(), + ))?, }; task_manager.add_child(full_node.task_manager); @@ -425,10 +427,12 @@ mod tests { let block = block_builder.build().expect("Finalizes the block").block; let dummy_network: Arc = Arc::new(DummyNetwork {}); + let (tx, _rx) = metered::channel(30); + let mock_handle = Handle::new(tx); ( client.clone(), block, - RelayChainInProcessInterface::new(client, backend.clone(), dummy_network, None), + RelayChainInProcessInterface::new(client, backend.clone(), dummy_network, mock_handle), ) } diff --git a/client/relay-chain-interface/src/lib.rs b/client/relay-chain-interface/src/lib.rs index 91cfb531b73..4505ac70973 100644 --- a/client/relay-chain-interface/src/lib.rs +++ b/client/relay-chain-interface/src/lib.rs @@ -23,7 +23,8 @@ use cumulus_primitives_core::{ }, InboundDownwardMessage, ParaId, PersistedValidationData, }; -use polkadot_overseer::Handle as OverseerHandle; +use polkadot_overseer::{prometheus::PrometheusError, Handle as OverseerHandle}; +use polkadot_service::SubstrateServiceError; use sc_client_api::StorageProof; use futures::Stream; @@ -58,18 +59,34 @@ pub enum RelayChainError { WorkerCommunicationError(String), #[error("Scale codec deserialization error: {0}")] DeserializationError(CodecError), - #[error("Scale codec deserialization error: {0}")] + #[error("Polkadot service error: {0}")] ServiceError(#[from] polkadot_service::Error), + #[error("Substrate service error: {0}")] + SubServiceError(#[from] SubstrateServiceError), + #[error("Prometheus error: {0}")] + PrometheusError(#[from] PrometheusError), #[error("Unspecified error occured: {0}")] GenericError(String), } +impl From for ApiError { + fn from(r: RelayChainError) -> Self { + sp_api::ApiError::Application(Box::new(r)) + } +} + impl From for RelayChainError { fn from(e: CodecError) -> Self { RelayChainError::DeserializationError(e) } } +impl From for sp_blockchain::Error { + fn from(r: RelayChainError) -> Self { + sp_blockchain::Error::Application(Box::new(r)) + } +} + /// Trait that provides all necessary methods for interaction between collator and relay chain. #[async_trait] pub trait RelayChainInterface: Send + Sync { @@ -155,7 +172,7 @@ pub trait RelayChainInterface: Send + Sync { async fn is_major_syncing(&self) -> RelayChainResult; /// Get a handle to the overseer. - fn overseer_handle(&self) -> RelayChainResult>; + fn overseer_handle(&self) -> RelayChainResult; /// Generate a storage read proof. async fn prove_read( @@ -233,7 +250,7 @@ where (**self).is_major_syncing().await } - fn overseer_handle(&self) -> RelayChainResult> { + fn overseer_handle(&self) -> RelayChainResult { (**self).overseer_handle() } diff --git a/client/relay-chain-minimal-node/Cargo.toml b/client/relay-chain-minimal-node/Cargo.toml new file mode 100644 index 00000000000..6c9cb255363 --- /dev/null +++ b/client/relay-chain-minimal-node/Cargo.toml @@ -0,0 +1,49 @@ +[package] +authors = ["Parity Technologies "] +name = "cumulus-relay-chain-minimal-node" +version = "0.1.0" +edition = "2021" + +[dependencies] +# polkadot deps +polkadot-primitives = { git = "https://github.com/paritytech/polkadot", branch = "master" } +polkadot-core-primitives = { git = "https://github.com/paritytech/polkadot", branch = "master" } +polkadot-overseer = { git = "https://github.com/paritytech/polkadot", branch = "master" } +polkadot-service = { git = "https://github.com/paritytech/polkadot", branch = "master" } +polkadot-node-subsystem-util = { git = "https://github.com/paritytech/polkadot", branch = "master" } +polkadot-node-network-protocol = { git = "https://github.com/paritytech/polkadot", branch = "master" } +polkadot-network-bridge = { git = "https://github.com/paritytech/polkadot", branch = "master" } +polkadot-node-core-av-store = { git = "https://github.com/paritytech/polkadot", branch = "master" } +polkadot-availability-distribution = { git = "https://github.com/paritytech/polkadot", branch = "master" } + +# substrate deps +sc-authority-discovery = { git = "https://github.com/paritytech/substrate", branch = "master" } +sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "master" } +sc-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" } +sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" } +sc-network-sync = { git = "https://github.com/paritytech/substrate", branch = "master" } +sc-network-common = { git = "https://github.com/paritytech/substrate", branch = "master" } +sc-network-light = { git = "https://github.com/paritytech/substrate", branch = "master" } +sc-service = { git = "https://github.com/paritytech/substrate", branch = "master" } +sc-transaction-pool-api = { git = "https://github.com/paritytech/substrate", branch = "master" } +sc-transaction-pool = { git = "https://github.com/paritytech/substrate", branch = "master" } +sc-consensus = { git = "https://github.com/paritytech/substrate", branch = "master" } +sc-tracing = { git = "https://github.com/paritytech/substrate", branch = "master" } +sc-telemetry = { git = "https://github.com/paritytech/substrate", branch = "master" } +sp-api = { git = "https://github.com/paritytech/substrate", branch = "master" } +sp-blockchain = { git = "https://github.com/paritytech/substrate", branch = "master" } +sp-consensus-babe = { git = "https://github.com/paritytech/substrate", branch = "master" } +sp-consensus = { git = "https://github.com/paritytech/substrate", branch = "master" } +sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" } + +# cumulus deps +cumulus-relay-chain-interface = { path = "../relay-chain-interface" } +cumulus-relay-chain-rpc-interface = { path = "../relay-chain-rpc-interface" } +cumulus-primitives-core = { path = "../../primitives/core" } + +lru = "0.8" +tracing = "0.1.25" +async-trait = "0.1.52" +futures = "0.3.24" +url = "2.2.2" +tokio = { version = "1.17.0", features = ["macros"] } diff --git a/client/relay-chain-minimal-node/src/blockchain_rpc_client.rs b/client/relay-chain-minimal-node/src/blockchain_rpc_client.rs new file mode 100644 index 00000000000..bf1a3c9a38c --- /dev/null +++ b/client/relay-chain-minimal-node/src/blockchain_rpc_client.rs @@ -0,0 +1,463 @@ +// Copyright 2022 Parity Technologies (UK) Ltd. +// This file is part of Cumulus. + +// Cumulus is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Cumulus is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Cumulus. If not, see . + +use std::{pin::Pin, str::FromStr}; + +use cumulus_relay_chain_interface::{RelayChainError, RelayChainResult}; +use cumulus_relay_chain_rpc_interface::RelayChainRpcClient; +use futures::{Future, Stream, StreamExt}; +use polkadot_core_primitives::{Block, BlockId, Hash, Header}; +use polkadot_overseer::RuntimeApiSubsystemClient; +use polkadot_service::{AuxStore, HeaderBackend}; +use sc_authority_discovery::AuthorityDiscovery; + +use sc_network_common::config::MultiaddrWithPeerId; +use sp_api::{ApiError, RuntimeApiInfo}; +use sp_blockchain::Info; + +const LOG_TARGET: &str = "blockchain-rpc-client"; + +#[derive(Clone)] +pub struct BlockChainRpcClient { + rpc_client: RelayChainRpcClient, +} + +impl BlockChainRpcClient { + pub fn new(rpc_client: RelayChainRpcClient) -> Self { + Self { rpc_client } + } + + pub async fn chain_get_header( + &self, + hash: Option, + ) -> Result, RelayChainError> { + self.rpc_client.chain_get_header(hash).await + } + + pub async fn block_get_hash( + &self, + number: Option, + ) -> Result, RelayChainError> { + self.rpc_client.chain_get_block_hash(number).await + } +} + +// Implementation required by Availability-Distribution subsystem +// but never called in our case. +impl AuxStore for BlockChainRpcClient { + fn insert_aux< + 'a, + 'b: 'a, + 'c: 'a, + I: IntoIterator, + D: IntoIterator, + >( + &self, + _insert: I, + _delete: D, + ) -> sp_blockchain::Result<()> { + unimplemented!("Not supported on the RPC collator") + } + + fn get_aux(&self, _key: &[u8]) -> sp_blockchain::Result>> { + unimplemented!("Not supported on the RPC collator") + } +} + +#[async_trait::async_trait] +impl RuntimeApiSubsystemClient for BlockChainRpcClient { + async fn validators( + &self, + at: Hash, + ) -> Result, sp_api::ApiError> { + Ok(self.rpc_client.parachain_host_validators(at).await?) + } + + async fn validator_groups( + &self, + at: Hash, + ) -> Result< + ( + Vec>, + polkadot_primitives::v2::GroupRotationInfo, + ), + sp_api::ApiError, + > { + Ok(self.rpc_client.parachain_host_validator_groups(at).await?) + } + + async fn availability_cores( + &self, + at: Hash, + ) -> Result< + Vec>, + sp_api::ApiError, + > { + Ok(self.rpc_client.parachain_host_availability_cores(at).await?) + } + + async fn persisted_validation_data( + &self, + at: Hash, + para_id: cumulus_primitives_core::ParaId, + assumption: polkadot_primitives::v2::OccupiedCoreAssumption, + ) -> Result< + Option< + cumulus_primitives_core::PersistedValidationData< + Hash, + polkadot_core_primitives::BlockNumber, + >, + >, + sp_api::ApiError, + > { + Ok(self + .rpc_client + .parachain_host_persisted_validation_data(at, para_id, assumption) + .await?) + } + + async fn assumed_validation_data( + &self, + at: Hash, + para_id: cumulus_primitives_core::ParaId, + expected_persisted_validation_data_hash: Hash, + ) -> Result< + Option<( + cumulus_primitives_core::PersistedValidationData< + Hash, + polkadot_core_primitives::BlockNumber, + >, + polkadot_primitives::v2::ValidationCodeHash, + )>, + sp_api::ApiError, + > { + Ok(self + .rpc_client + .parachain_host_assumed_validation_data( + at, + para_id, + expected_persisted_validation_data_hash, + ) + .await?) + } + + async fn check_validation_outputs( + &self, + at: Hash, + para_id: cumulus_primitives_core::ParaId, + outputs: polkadot_primitives::v2::CandidateCommitments, + ) -> Result { + Ok(self + .rpc_client + .parachain_host_check_validation_outputs(at, para_id, outputs) + .await?) + } + + async fn session_index_for_child( + &self, + at: Hash, + ) -> Result { + Ok(self.rpc_client.parachain_host_session_index_for_child(at).await?) + } + + async fn validation_code( + &self, + at: Hash, + para_id: cumulus_primitives_core::ParaId, + assumption: polkadot_primitives::v2::OccupiedCoreAssumption, + ) -> Result, sp_api::ApiError> { + Ok(self.rpc_client.parachain_host_validation_code(at, para_id, assumption).await?) + } + + async fn candidate_pending_availability( + &self, + at: Hash, + para_id: cumulus_primitives_core::ParaId, + ) -> Result>, sp_api::ApiError> + { + Ok(self + .rpc_client + .parachain_host_candidate_pending_availability(at, para_id) + .await?) + } + + async fn candidate_events( + &self, + at: Hash, + ) -> Result>, sp_api::ApiError> { + Ok(self.rpc_client.parachain_host_candidate_events(at).await?) + } + + async fn dmq_contents( + &self, + at: Hash, + recipient: cumulus_primitives_core::ParaId, + ) -> Result< + Vec>, + sp_api::ApiError, + > { + Ok(self.rpc_client.parachain_host_dmq_contents(recipient, at).await?) + } + + async fn inbound_hrmp_channels_contents( + &self, + at: Hash, + recipient: cumulus_primitives_core::ParaId, + ) -> Result< + std::collections::BTreeMap< + cumulus_primitives_core::ParaId, + Vec< + polkadot_core_primitives::InboundHrmpMessage, + >, + >, + sp_api::ApiError, + > { + Ok(self + .rpc_client + .parachain_host_inbound_hrmp_channels_contents(recipient, at) + .await?) + } + + async fn validation_code_by_hash( + &self, + at: Hash, + validation_code_hash: polkadot_primitives::v2::ValidationCodeHash, + ) -> Result, sp_api::ApiError> { + Ok(self + .rpc_client + .parachain_host_validation_code_by_hash(at, validation_code_hash) + .await?) + } + + async fn on_chain_votes( + &self, + at: Hash, + ) -> Result>, sp_api::ApiError> { + Ok(self.rpc_client.parachain_host_on_chain_votes(at).await?) + } + + async fn session_info( + &self, + at: Hash, + index: polkadot_primitives::v2::SessionIndex, + ) -> Result, sp_api::ApiError> { + Ok(self.rpc_client.parachain_host_session_info(at, index).await?) + } + + async fn session_info_before_version_2( + &self, + at: Hash, + index: polkadot_primitives::v2::SessionIndex, + ) -> Result, sp_api::ApiError> { + Ok(self.rpc_client.parachain_host_session_info_before_version_2(at, index).await?) + } + + async fn submit_pvf_check_statement( + &self, + at: Hash, + stmt: polkadot_primitives::v2::PvfCheckStatement, + signature: polkadot_primitives::v2::ValidatorSignature, + ) -> Result<(), sp_api::ApiError> { + Ok(self + .rpc_client + .parachain_host_submit_pvf_check_statement(at, stmt, signature) + .await?) + } + + async fn pvfs_require_precheck( + &self, + at: Hash, + ) -> Result, sp_api::ApiError> { + Ok(self.rpc_client.parachain_host_pvfs_require_precheck(at).await?) + } + + async fn validation_code_hash( + &self, + at: Hash, + para_id: cumulus_primitives_core::ParaId, + assumption: polkadot_primitives::v2::OccupiedCoreAssumption, + ) -> Result, sp_api::ApiError> { + Ok(self + .rpc_client + .parachain_host_validation_code_hash(at, para_id, assumption) + .await?) + } + + async fn current_epoch(&self, at: Hash) -> Result { + Ok(self.rpc_client.babe_api_current_epoch(at).await?) + } + + async fn authorities( + &self, + at: Hash, + ) -> std::result::Result, sp_api::ApiError> { + Ok(self.rpc_client.authority_discovery_authorities(at).await?) + } + + async fn api_version_parachain_host(&self, at: Hash) -> Result, sp_api::ApiError> { + let api_id = >::ID; + Ok(self.rpc_client.runtime_version(at).await.map(|v| v.api_version(&api_id))?) + } + + async fn disputes( + &self, + at: Hash, + ) -> Result< + Vec<( + polkadot_primitives::v2::SessionIndex, + polkadot_primitives::v2::CandidateHash, + polkadot_primitives::v2::DisputeState, + )>, + ApiError, + > { + Ok(self.rpc_client.parachain_host_staging_get_disputes(at).await?) + } +} + +#[async_trait::async_trait] +impl AuthorityDiscovery for BlockChainRpcClient { + async fn authorities( + &self, + at: Hash, + ) -> std::result::Result, sp_api::ApiError> { + let result = self.rpc_client.authority_discovery_authorities(at).await?; + Ok(result) + } +} + +impl BlockChainRpcClient { + pub async fn local_listen_addresses( + &self, + ) -> Result, RelayChainError> { + let addresses = self.rpc_client.system_local_listen_addresses().await?; + tracing::debug!(target: LOG_TARGET, ?addresses, "Fetched listen address from RPC node."); + + let mut result_vec = Vec::new(); + for address in addresses { + match MultiaddrWithPeerId::from_str(&address) { + Ok(addr) => result_vec.push(addr), + Err(err) => + return Err(RelayChainError::GenericError(format!( + "Failed to parse a local listen addresses from the RPC node: {}", + err + ))), + } + } + + Ok(result_vec) + } + + pub async fn import_notification_stream( + &self, + ) -> RelayChainResult + Send>>> { + Ok(self.rpc_client.get_imported_heads_stream().await?.boxed()) + } + + pub async fn finality_notification_stream( + &self, + ) -> RelayChainResult + Send>>> { + Ok(self.rpc_client.get_finalized_heads_stream().await?.boxed()) + } +} + +fn block_local(fut: impl Future) -> T { + let tokio_handle = tokio::runtime::Handle::current(); + tokio::task::block_in_place(|| tokio_handle.block_on(fut)) +} + +impl HeaderBackend for BlockChainRpcClient { + fn header( + &self, + id: BlockId, + ) -> sp_blockchain::Result::Header>> { + let fetch_header = |hash| block_local(self.rpc_client.chain_get_header(Some(hash))); + + match id { + BlockId::Hash(hash) => Ok(fetch_header(hash)?), + BlockId::Number(number) => { + if let Some(hash) = HeaderBackend::::hash(self, number)? { + Ok(fetch_header(hash)?) + } else { + Ok(None) + } + }, + } + } + + fn info(&self) -> Info { + let best_header = block_local(self.rpc_client.chain_get_header(None)) + .expect("Unable to get header from relay chain.") + .unwrap(); + let genesis_hash = block_local(self.rpc_client.chain_get_head(Some(0))) + .expect("Unable to get header from relay chain."); + let finalized_head = block_local(self.rpc_client.chain_get_finalized_head()) + .expect("Unable to get finalized head from relay chain."); + let finalized_header = block_local(self.rpc_client.chain_get_header(Some(finalized_head))) + .expect("Unable to get finalized header from relay chain.") + .unwrap(); + Info { + best_hash: best_header.hash(), + best_number: best_header.number, + genesis_hash, + finalized_hash: finalized_head, + finalized_number: finalized_header.number, + finalized_state: None, + number_leaves: 1, + block_gap: None, + } + } + + fn status( + &self, + id: sp_api::BlockId, + ) -> sp_blockchain::Result { + let exists = match id { + BlockId::Hash(_) => self.header(id)?.is_some(), + BlockId::Number(n) => { + let best_header = block_local(self.rpc_client.chain_get_header(None))?; + if let Some(best) = best_header { + n < best.number + } else { + false + } + }, + }; + + if exists { + Ok(sc_client_api::blockchain::BlockStatus::InChain) + } else { + Ok(sc_client_api::blockchain::BlockStatus::Unknown) + } + } + + fn number( + &self, + hash: ::Hash, + ) -> sp_blockchain::Result< + Option<<::Header as polkadot_service::HeaderT>::Number>, + > { + let result = block_local(self.rpc_client.chain_get_header(Some(hash)))? + .map(|maybe_header| maybe_header.number); + Ok(result) + } + + fn hash( + &self, + number: polkadot_service::NumberFor, + ) -> sp_blockchain::Result::Hash>> { + Ok(block_local(self.rpc_client.chain_get_block_hash(number.into()))?) + } +} diff --git a/client/relay-chain-minimal-node/src/collator_overseer.rs b/client/relay-chain-minimal-node/src/collator_overseer.rs new file mode 100644 index 00000000000..6efb1a9ce2e --- /dev/null +++ b/client/relay-chain-minimal-node/src/collator_overseer.rs @@ -0,0 +1,274 @@ +// Copyright 2022 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +use cumulus_relay_chain_interface::RelayChainError; +use lru::LruCache; +use polkadot_availability_distribution::{ + AvailabilityDistributionSubsystem, IncomingRequestReceivers, +}; +use polkadot_node_core_av_store::Config; +use polkadot_node_network_protocol::{ + peer_set::PeerSetProtocolNames, + request_response::{ + v1::{ + AvailableDataFetchingRequest, ChunkFetchingRequest, CollationFetchingRequest, + PoVFetchingRequest, + }, + IncomingRequestReceiver, ReqProtocolNames, + }, +}; +use polkadot_node_subsystem_util::metrics::{prometheus::Registry, Metrics}; +use polkadot_overseer::{ + BlockInfo, DummySubsystem, MetricsTrait, Overseer, OverseerHandle, OverseerMetrics, SpawnGlue, + KNOWN_LEAVES_CACHE_SIZE, +}; +use polkadot_primitives::v2::CollatorPair; +use polkadot_service::{ + overseer::{ + AvailabilityRecoverySubsystem, AvailabilityStoreSubsystem, ChainApiSubsystem, + CollationGenerationSubsystem, CollatorProtocolSubsystem, NetworkBridgeMetrics, + NetworkBridgeRxSubsystem, NetworkBridgeTxSubsystem, ProtocolSide, RuntimeApiSubsystem, + }, + Error, OverseerConnector, +}; +use sc_authority_discovery::Service as AuthorityDiscoveryService; +use sc_keystore::LocalKeystore; +use sc_network::NetworkStateInfo; + +use std::sync::Arc; + +use cumulus_primitives_core::relay_chain::{Block, Hash as PHash}; + +use polkadot_service::{Handle, TaskManager}; + +use crate::BlockChainRpcClient; +use futures::{select, StreamExt}; +use sp_runtime::traits::Block as BlockT; + +/// Arguments passed for overseer construction. +pub(crate) struct CollatorOverseerGenArgs<'a> { + /// Runtime client generic, providing the `ProvieRuntimeApi` trait besides others. + pub runtime_client: Arc, + /// Underlying network service implementation. + pub network_service: Arc>, + /// Underlying authority discovery service. + pub authority_discovery_service: AuthorityDiscoveryService, + // Receiver for collation request protocol + pub collation_req_receiver: IncomingRequestReceiver, + // Receiver for PoV request protocol + pub pov_req_receiver: IncomingRequestReceiver, + // Receiver for chunk request protocol + pub chunk_req_receiver: IncomingRequestReceiver, + // Receiver for availability request protocol + pub available_data_req_receiver: IncomingRequestReceiver, + /// Prometheus registry, commonly used for production systems, less so for test. + pub registry: Option<&'a Registry>, + /// Task spawner to be used throughout the overseer and the APIs it provides. + pub spawner: sc_service::SpawnTaskHandle, + /// Determines the behavior of the collator. + pub collator_pair: CollatorPair, + /// Request response protocols + pub req_protocol_names: ReqProtocolNames, + /// Peerset protocols name mapping + pub peer_set_protocol_names: PeerSetProtocolNames, + /// Config for the availability store + pub availability_config: Config, + /// The underlying key value store for the parachains. + pub parachains_db: Arc, +} + +fn build_overseer<'a>( + connector: OverseerConnector, + CollatorOverseerGenArgs { + runtime_client, + network_service, + authority_discovery_service, + collation_req_receiver, + available_data_req_receiver, + availability_config, + registry, + spawner, + collator_pair, + req_protocol_names, + peer_set_protocol_names, + parachains_db, + pov_req_receiver, + chunk_req_receiver, + }: CollatorOverseerGenArgs<'a>, +) -> Result< + (Overseer, Arc>, OverseerHandle), + Error, +> { + let leaves = Vec::new(); + let metrics = ::register(registry)?; + let keystore = Arc::new(LocalKeystore::in_memory()); + let spawner = SpawnGlue(spawner); + let network_bridge_metrics: NetworkBridgeMetrics = Metrics::register(registry)?; + let builder = Overseer::builder() + .availability_distribution(AvailabilityDistributionSubsystem::new( + keystore.clone(), + IncomingRequestReceivers { pov_req_receiver, chunk_req_receiver }, + Metrics::register(registry)?, + )) + .availability_recovery(AvailabilityRecoverySubsystem::with_chunks_only( + available_data_req_receiver, + Metrics::register(registry)?, + )) + .availability_store(AvailabilityStoreSubsystem::new( + parachains_db.clone(), + availability_config, + Metrics::register(registry)?, + )) + .bitfield_distribution(DummySubsystem) + .bitfield_signing(DummySubsystem) + .candidate_backing(DummySubsystem) + .candidate_validation(DummySubsystem) + .pvf_checker(DummySubsystem) + .chain_api(ChainApiSubsystem::new(runtime_client.clone(), Metrics::register(registry)?)) + .collation_generation(CollationGenerationSubsystem::new(Metrics::register(registry)?)) + .collator_protocol({ + let side = ProtocolSide::Collator( + network_service.local_peer_id().clone(), + collator_pair, + collation_req_receiver, + Metrics::register(registry)?, + ); + CollatorProtocolSubsystem::new(side) + }) + .network_bridge_rx(NetworkBridgeRxSubsystem::new( + network_service.clone(), + authority_discovery_service.clone(), + Box::new(network_service.clone()), + network_bridge_metrics.clone(), + peer_set_protocol_names.clone(), + )) + .network_bridge_tx(NetworkBridgeTxSubsystem::new( + network_service.clone(), + authority_discovery_service.clone(), + network_bridge_metrics, + req_protocol_names, + peer_set_protocol_names, + )) + .provisioner(DummySubsystem) + .runtime_api(RuntimeApiSubsystem::new( + runtime_client.clone(), + Metrics::register(registry)?, + spawner.clone(), + )) + .statement_distribution(DummySubsystem) + .approval_distribution(DummySubsystem) + .approval_voting(DummySubsystem) + .gossip_support(DummySubsystem) + .dispute_coordinator(DummySubsystem) + .dispute_distribution(DummySubsystem) + .chain_selection(DummySubsystem) + .leaves(Vec::from_iter( + leaves + .into_iter() + .map(|BlockInfo { hash, parent_hash: _, number }| (hash, number)), + )) + .activation_external_listeners(Default::default()) + .span_per_active_leaf(Default::default()) + .active_leaves(Default::default()) + .supports_parachains(runtime_client) + .known_leaves(LruCache::new(KNOWN_LEAVES_CACHE_SIZE)) + .metrics(metrics) + .spawner(spawner); + + builder.build_with_connector(connector).map_err(|e| e.into()) +} + +pub(crate) fn spawn_overseer( + overseer_args: CollatorOverseerGenArgs, + task_manager: &TaskManager, + relay_chain_rpc_client: Arc, +) -> Result { + let (overseer, overseer_handle) = build_overseer(OverseerConnector::default(), overseer_args) + .map_err(|e| { + tracing::error!("Failed to initialize overseer: {}", e); + e + })?; + + let overseer_handle = Handle::new(overseer_handle.clone()); + { + let handle = overseer_handle.clone(); + task_manager.spawn_essential_handle().spawn_blocking( + "overseer", + None, + Box::pin(async move { + use futures::{pin_mut, FutureExt}; + + let forward = forward_collator_events(relay_chain_rpc_client, handle).fuse(); + + let overseer_fut = overseer.run().fuse(); + + pin_mut!(overseer_fut); + pin_mut!(forward); + + select! { + _ = forward => (), + _ = overseer_fut => (), + } + }), + ); + } + Ok(overseer_handle) +} + +/// Minimal relay chain node representation +pub struct NewMinimalNode { + /// Task manager running all tasks for the minimal node + pub task_manager: TaskManager, + /// Overseer handle to interact with subsystems + pub overseer_handle: Handle, + /// Network service + pub network: Arc::Hash>>, +} + +/// Glues together the [`Overseer`] and `BlockchainEvents` by forwarding +/// import and finality notifications into the [`OverseerHandle`]. +async fn forward_collator_events( + client: Arc, + mut handle: Handle, +) -> Result<(), RelayChainError> { + let mut finality = client.finality_notification_stream().await?.fuse(); + let mut imports = client.import_notification_stream().await?.fuse(); + + loop { + select! { + f = finality.next() => { + match f { + Some(header) => { + tracing::info!(target: "minimal-polkadot-node", "Received finalized block via RPC: #{} ({})", header.number, header.hash()); + let block_info = BlockInfo { hash: header.hash(), parent_hash: header.parent_hash, number: header.number }; + handle.block_finalized(block_info).await; + } + None => return Err(RelayChainError::GenericError("Relay chain finality stream ended.".to_string())), + } + }, + i = imports.next() => { + match i { + Some(header) => { + tracing::info!(target: "minimal-polkadot-node", "Received imported block via RPC: #{} ({})", header.number, header.hash()); + let block_info = BlockInfo { hash: header.hash(), parent_hash: header.parent_hash, number: header.number }; + handle.block_imported(block_info).await; + } + None => return Err(RelayChainError::GenericError("Relay chain import stream ended.".to_string())), + } + } + } + } +} diff --git a/client/relay-chain-minimal-node/src/lib.rs b/client/relay-chain-minimal-node/src/lib.rs new file mode 100644 index 00000000000..60b8a809a9c --- /dev/null +++ b/client/relay-chain-minimal-node/src/lib.rs @@ -0,0 +1,223 @@ +// Copyright 2017-2022 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +use collator_overseer::{CollatorOverseerGenArgs, NewMinimalNode}; + +use cumulus_relay_chain_interface::{RelayChainError, RelayChainInterface, RelayChainResult}; +use cumulus_relay_chain_rpc_interface::{RelayChainRpcInterface, Url}; +use polkadot_network_bridge::{peer_sets_info, IsAuthority}; +use polkadot_node_network_protocol::{ + peer_set::PeerSetProtocolNames, + request_response::{self, IncomingRequest, ReqProtocolNames}, +}; +use polkadot_node_subsystem_util::metrics::prometheus::Registry; +use polkadot_primitives::v2::CollatorPair; + +use sc_authority_discovery::Service as AuthorityDiscoveryService; +use sc_network::{Event, NetworkService}; +use sc_network_common::service::NetworkEventStream; +use std::sync::Arc; + +use polkadot_service::{open_database, Configuration, TaskManager}; + +use futures::StreamExt; + +use sp_runtime::{app_crypto::Pair, traits::Block as BlockT}; + +mod collator_overseer; + +mod network; + +mod blockchain_rpc_client; +pub use blockchain_rpc_client::BlockChainRpcClient; + +fn build_authority_discovery_service( + task_manager: &TaskManager, + client: Arc, + config: &Configuration, + network: Arc::Hash>>, + prometheus_registry: Option, +) -> AuthorityDiscoveryService { + let auth_disc_publish_non_global_ips = config.network.allow_non_globals_in_dht; + let authority_discovery_role = sc_authority_discovery::Role::Discover; + let dht_event_stream = network.event_stream("authority-discovery").filter_map(|e| async move { + match e { + Event::Dht(e) => Some(e), + _ => None, + } + }); + let (worker, service) = sc_authority_discovery::new_worker_and_service_with_config( + sc_authority_discovery::WorkerConfig { + publish_non_global_ips: auth_disc_publish_non_global_ips, + // Require that authority discovery records are signed. + strict_record_validation: true, + ..Default::default() + }, + client, + network.clone(), + Box::pin(dht_event_stream), + authority_discovery_role, + prometheus_registry.clone(), + ); + + task_manager.spawn_handle().spawn( + "authority-discovery-worker", + Some("authority-discovery"), + worker.run(), + ); + service +} + +pub async fn build_minimal_relay_chain_node( + polkadot_config: Configuration, + task_manager: &mut TaskManager, + relay_chain_url: Url, +) -> RelayChainResult<(Arc<(dyn RelayChainInterface + 'static)>, Option)> { + let client = cumulus_relay_chain_rpc_interface::create_client_and_start_worker( + relay_chain_url, + task_manager, + ) + .await?; + let collator_pair = CollatorPair::generate().0; + let collator_node = new_minimal_relay_chain( + polkadot_config, + collator_pair.clone(), + Arc::new(BlockChainRpcClient::new(client.clone())), + ) + .await?; + task_manager.add_child(collator_node.task_manager); + Ok(( + Arc::new(RelayChainRpcInterface::new(client, collator_node.overseer_handle)), + Some(collator_pair), + )) +} + +/// Builds a minimal relay chain node. Chain data is fetched +/// via [`BlockChainRpcClient`] and fed into the overseer and its subsystems. +/// +/// Instead of spawning all subsystems, this minimal node will only spawn subsystems +/// required to collate: +/// - AvailabilityRecovery +/// - CollationGeneration +/// - CollatorProtocol +/// - NetworkBridgeRx +/// - NetworkBridgeTx +/// - RuntimeApi +/// - ChainApi +/// - AvailabilityDistribution +#[sc_tracing::logging::prefix_logs_with("Relaychain")] +async fn new_minimal_relay_chain( + mut config: Configuration, + collator_pair: CollatorPair, + relay_chain_rpc_client: Arc, +) -> Result { + let role = config.role.clone(); + + // Use the given RPC node as bootnode, since we do not have a chain spec with valid boot nodes + let mut boot_node_address = relay_chain_rpc_client.local_listen_addresses().await?; + config.network.boot_nodes.append(&mut boot_node_address); + + let task_manager = { + let registry = config.prometheus_config.as_ref().map(|cfg| &cfg.registry); + TaskManager::new(config.tokio_handle.clone(), registry)? + }; + + let prometheus_registry = config.prometheus_registry().cloned(); + + let genesis_hash = relay_chain_rpc_client + .block_get_hash(Some(0)) + .await + .expect("Genesis block hash is always available; qed") + .unwrap_or_default(); + + let peer_set_protocol_names = + PeerSetProtocolNames::new(genesis_hash, config.chain_spec.fork_id()); + let is_authority = if role.is_authority() { IsAuthority::Yes } else { IsAuthority::No }; + config + .network + .extra_sets + .extend(peer_sets_info(is_authority, &peer_set_protocol_names)); + + let request_protocol_names = ReqProtocolNames::new(genesis_hash, config.chain_spec.fork_id()); + let (collation_req_receiver, available_data_req_receiver, pov_req_receiver, chunk_req_receiver) = + build_request_response_protocol_receivers(&request_protocol_names, &mut config); + let (network, network_starter) = + network::build_collator_network(network::BuildCollatorNetworkParams { + config: &config, + client: relay_chain_rpc_client.clone(), + spawn_handle: task_manager.spawn_handle(), + genesis_hash, + })?; + + let authority_discovery_service = build_authority_discovery_service( + &task_manager, + relay_chain_rpc_client.clone(), + &config, + network.clone(), + prometheus_registry.clone(), + ); + + let parachains_db = open_database(&config.database)?; + + let overseer_args = CollatorOverseerGenArgs { + runtime_client: relay_chain_rpc_client.clone(), + network_service: network.clone(), + authority_discovery_service, + collation_req_receiver, + available_data_req_receiver, + registry: prometheus_registry.as_ref(), + spawner: task_manager.spawn_handle(), + collator_pair, + req_protocol_names: request_protocol_names, + peer_set_protocol_names, + parachains_db, + availability_config: polkadot_service::AVAILABILITY_CONFIG, + pov_req_receiver, + chunk_req_receiver, + }; + + let overseer_handle = collator_overseer::spawn_overseer( + overseer_args, + &task_manager, + relay_chain_rpc_client.clone(), + )?; + + network_starter.start_network(); + + Ok(NewMinimalNode { task_manager, overseer_handle, network }) +} + +fn build_request_response_protocol_receivers( + request_protocol_names: &ReqProtocolNames, + config: &mut Configuration, +) -> ( + request_response::IncomingRequestReceiver, + request_response::IncomingRequestReceiver, + request_response::IncomingRequestReceiver, + request_response::IncomingRequestReceiver, +) { + let (collation_req_receiver, cfg) = + IncomingRequest::get_config_receiver(request_protocol_names); + config.network.request_response_protocols.push(cfg); + let (available_data_req_receiver, cfg) = + IncomingRequest::get_config_receiver(request_protocol_names); + config.network.request_response_protocols.push(cfg); + let (pov_req_receiver, cfg) = IncomingRequest::get_config_receiver(request_protocol_names); + config.network.request_response_protocols.push(cfg); + let (chunk_req_receiver, cfg) = IncomingRequest::get_config_receiver(request_protocol_names); + config.network.request_response_protocols.push(cfg); + (collation_req_receiver, available_data_req_receiver, pov_req_receiver, chunk_req_receiver) +} diff --git a/client/relay-chain-minimal-node/src/network.rs b/client/relay-chain-minimal-node/src/network.rs new file mode 100644 index 00000000000..a5237f5ea65 --- /dev/null +++ b/client/relay-chain-minimal-node/src/network.rs @@ -0,0 +1,384 @@ +// Copyright 2022 Parity Technologies (UK) Ltd. +// This file is part of Cumulus. + +// Cumulus is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Cumulus is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Cumulus. If not, see . + +use polkadot_core_primitives::{Block, Hash}; +use polkadot_service::{BlockT, NumberFor}; + +use polkadot_node_network_protocol::PeerId; +use sc_network::{NetworkService, SyncState}; + +use sc_network_common::sync::{Metrics, SyncStatus}; +use sc_network_light::light_client_requests; +use sc_network_sync::{block_request_handler, state_request_handler}; +use sc_service::{error::Error, Configuration, NetworkStarter, SpawnTaskHandle}; +use sp_consensus::BlockOrigin; +use sp_runtime::Justifications; + +use std::sync::Arc; + +use crate::BlockChainRpcClient; + +pub(crate) struct BuildCollatorNetworkParams<'a> { + /// The service configuration. + pub config: &'a Configuration, + /// A shared client returned by `new_full_parts`. + pub client: Arc, + /// A handle for spawning tasks. + pub spawn_handle: SpawnTaskHandle, + /// Genesis hash + pub genesis_hash: Hash, +} + +/// Build the network service, the network status sinks and an RPC sender. +pub(crate) fn build_collator_network( + params: BuildCollatorNetworkParams, +) -> Result<(Arc>, NetworkStarter), Error> { + let BuildCollatorNetworkParams { config, client, spawn_handle, genesis_hash } = params; + + let protocol_id = config.protocol_id(); + + let block_request_protocol_config = + block_request_handler::generate_protocol_config(&protocol_id, genesis_hash, None); + + let state_request_protocol_config = + state_request_handler::generate_protocol_config(&protocol_id, genesis_hash, None); + + let light_client_request_protocol_config = + light_client_requests::generate_protocol_config(&protocol_id, genesis_hash, None); + + let network_params = sc_network::config::Params { + role: config.role.clone(), + executor: { + let spawn_handle = Clone::clone(&spawn_handle); + Some(Box::new(move |fut| { + spawn_handle.spawn("libp2p-node", Some("networking"), fut); + })) + }, + fork_id: None, + chain_sync: Box::new(DummyChainSync), + network_config: config.network.clone(), + chain: client.clone(), + import_queue: Box::new(DummyImportQueue), + protocol_id, + metrics_registry: config.prometheus_config.as_ref().map(|config| config.registry.clone()), + block_request_protocol_config, + state_request_protocol_config, + warp_sync_protocol_config: None, + light_client_request_protocol_config, + request_response_protocol_configs: Vec::new(), + }; + + let network_worker = sc_network::NetworkWorker::new(network_params)?; + let network_service = network_worker.service().clone(); + + let (network_start_tx, network_start_rx) = futures::channel::oneshot::channel(); + + // The network worker is responsible for gathering all network messages and processing + // them. This is quite a heavy task, and at the time of the writing of this comment it + // frequently happens that this future takes several seconds or in some situations + // even more than a minute until it has processed its entire queue. This is clearly an + // issue, and ideally we would like to fix the network future to take as little time as + // possible, but we also take the extra harm-prevention measure to execute the networking + // future using `spawn_blocking`. + spawn_handle.spawn_blocking("network-worker", Some("networking"), async move { + if network_start_rx.await.is_err() { + tracing::warn!( + "The NetworkStart returned as part of `build_network` has been silently dropped" + ); + // This `return` might seem unnecessary, but we don't want to make it look like + // everything is working as normal even though the user is clearly misusing the API. + return + } + + network_worker.await + }); + + let network_starter = NetworkStarter::new(network_start_tx); + + Ok((network_service, network_starter)) +} + +/// Empty ChainSync shell. Syncing code is not necessary for +/// the minimal node, but network currently requires it. So +/// we provide a noop implementation. +struct DummyChainSync; + +impl sc_network_common::sync::ChainSync for DummyChainSync { + fn peer_info(&self, _who: &PeerId) -> Option> { + None + } + + fn status(&self) -> sc_network_common::sync::SyncStatus { + SyncStatus { + state: SyncState::Idle, + best_seen_block: None, + num_peers: 0, + queued_blocks: 0, + state_sync: None, + warp_sync: None, + } + } + + fn num_sync_requests(&self) -> usize { + 0 + } + + fn num_downloaded_blocks(&self) -> usize { + 0 + } + + fn num_peers(&self) -> usize { + 0 + } + + fn new_peer( + &mut self, + _who: PeerId, + _best_hash: ::Hash, + _best_number: polkadot_service::NumberFor, + ) -> Result< + Option>, + sc_network_common::sync::BadPeer, + > { + Ok(None) + } + + fn update_chain_info( + &mut self, + _best_hash: &::Hash, + _best_number: polkadot_service::NumberFor, + ) { + } + + fn request_justification( + &mut self, + _hash: &::Hash, + _number: polkadot_service::NumberFor, + ) { + } + + fn clear_justification_requests(&mut self) {} + + fn set_sync_fork_request( + &mut self, + _peers: Vec, + _hash: &::Hash, + _number: polkadot_service::NumberFor, + ) { + } + + fn justification_requests( + &mut self, + ) -> Box)> + '_> + { + Box::new(std::iter::empty()) + } + + fn block_requests( + &mut self, + ) -> Box)> + '_> + { + Box::new(std::iter::empty()) + } + + fn state_request(&mut self) -> Option<(PeerId, sc_network_common::sync::OpaqueStateRequest)> { + None + } + + fn warp_sync_request( + &mut self, + ) -> Option<(PeerId, sc_network_common::sync::warp::WarpProofRequest)> { + None + } + + fn on_block_data( + &mut self, + _who: &PeerId, + _request: Option>, + _response: sc_network_common::sync::message::BlockResponse, + ) -> Result, sc_network_common::sync::BadPeer> { + unimplemented!("Not supported on the RPC collator") + } + + fn on_state_data( + &mut self, + _who: &PeerId, + _response: sc_network_common::sync::OpaqueStateResponse, + ) -> Result, sc_network_common::sync::BadPeer> { + unimplemented!("Not supported on the RPC collator") + } + + fn on_warp_sync_data( + &mut self, + _who: &PeerId, + _response: sc_network_common::sync::warp::EncodedProof, + ) -> Result<(), sc_network_common::sync::BadPeer> { + unimplemented!("Not supported on the RPC collator") + } + + fn on_block_justification( + &mut self, + _who: PeerId, + _response: sc_network_common::sync::message::BlockResponse, + ) -> Result, sc_network_common::sync::BadPeer> + { + unimplemented!("Not supported on the RPC collator") + } + + fn on_blocks_processed( + &mut self, + _imported: usize, + _count: usize, + _results: Vec<( + Result< + sc_consensus::BlockImportStatus>, + sc_consensus::BlockImportError, + >, + ::Hash, + )>, + ) -> Box< + dyn Iterator< + Item = Result< + (PeerId, sc_network_common::sync::message::BlockRequest), + sc_network_common::sync::BadPeer, + >, + >, + > { + Box::new(std::iter::empty()) + } + + fn on_justification_import( + &mut self, + _hash: ::Hash, + _number: polkadot_service::NumberFor, + _success: bool, + ) { + } + + fn on_block_finalized( + &mut self, + _hash: &::Hash, + _number: polkadot_service::NumberFor, + ) { + } + + fn push_block_announce_validation( + &mut self, + _who: PeerId, + _hash: ::Hash, + _announce: sc_network_common::sync::message::BlockAnnounce<::Header>, + _is_best: bool, + ) { + } + + fn poll_block_announce_validation( + &mut self, + _cx: &mut std::task::Context, + ) -> std::task::Poll::Header>> + { + std::task::Poll::Pending + } + + fn peer_disconnected( + &mut self, + _who: &PeerId, + ) -> Option> { + None + } + + fn metrics(&self) -> sc_network_common::sync::Metrics { + Metrics { + queued_blocks: 0, + fork_targets: 0, + justifications: sc_network_common::sync::metrics::Metrics { + pending_requests: 0, + active_requests: 0, + importing_requests: 0, + failed_requests: 0, + }, + } + } + + fn create_opaque_block_request( + &self, + _request: &sc_network_common::sync::message::BlockRequest, + ) -> sc_network_common::sync::OpaqueBlockRequest { + unimplemented!("Not supported on the RPC collator") + } + + fn encode_block_request( + &self, + _request: &sc_network_common::sync::OpaqueBlockRequest, + ) -> Result, String> { + unimplemented!("Not supported on the RPC collator") + } + + fn decode_block_response( + &self, + _response: &[u8], + ) -> Result { + unimplemented!("Not supported on the RPC collator") + } + + fn block_response_into_blocks( + &self, + _request: &sc_network_common::sync::message::BlockRequest, + _response: sc_network_common::sync::OpaqueBlockResponse, + ) -> Result>, String> { + unimplemented!("Not supported on the RPC collator") + } + + fn encode_state_request( + &self, + _request: &sc_network_common::sync::OpaqueStateRequest, + ) -> Result, String> { + unimplemented!("Not supported on the RPC collator") + } + + fn decode_state_response( + &self, + _response: &[u8], + ) -> Result { + unimplemented!("Not supported on the RPC collator") + } +} + +struct DummyImportQueue; + +impl sc_service::ImportQueue for DummyImportQueue { + fn import_blocks( + &mut self, + _origin: BlockOrigin, + _blocks: Vec>, + ) { + } + + fn import_justifications( + &mut self, + _who: PeerId, + _hash: Hash, + _number: NumberFor, + _justifications: Justifications, + ) { + } + + fn poll_actions( + &mut self, + _cx: &mut futures::task::Context, + _link: &mut dyn sc_consensus::import_queue::Link, + ) { + } +} diff --git a/client/relay-chain-rpc-interface/Cargo.toml b/client/relay-chain-rpc-interface/Cargo.toml index fe2f9570100..d694862d219 100644 --- a/client/relay-chain-rpc-interface/Cargo.toml +++ b/client/relay-chain-rpc-interface/Cargo.toml @@ -11,8 +11,11 @@ polkadot-service = { git = "https://github.com/paritytech/polkadot", branch = "m cumulus-primitives-core = { path = "../../primitives/core" } cumulus-relay-chain-interface = { path = "../relay-chain-interface" } +sp-api = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" } +sp-consensus-babe = { git = "https://github.com/paritytech/substrate", branch = "master" } +sp-authority-discovery = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-state-machine = { git = "https://github.com/paritytech/substrate", branch = "master" } sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-storage = { git = "https://github.com/paritytech/substrate", branch = "master" } diff --git a/client/relay-chain-rpc-interface/src/lib.rs b/client/relay-chain-rpc-interface/src/lib.rs index f295c693ecd..1d35ec4e747 100644 --- a/client/relay-chain-rpc-interface/src/lib.rs +++ b/client/relay-chain-rpc-interface/src/lib.rs @@ -44,11 +44,12 @@ const TIMEOUT_IN_SECONDS: u64 = 6; #[derive(Clone)] pub struct RelayChainRpcInterface { rpc_client: RelayChainRpcClient, + overseer_handle: Handle, } impl RelayChainRpcInterface { - pub fn new(rpc_client: RelayChainRpcClient) -> Self { - Self { rpc_client } + pub fn new(rpc_client: RelayChainRpcClient, overseer_handle: Handle) -> Self { + Self { rpc_client, overseer_handle } } } @@ -118,15 +119,15 @@ impl RelayChainInterface for RelayChainRpcInterface { } async fn best_block_hash(&self) -> RelayChainResult { - self.rpc_client.chain_get_head().await + self.rpc_client.chain_get_head(None).await } async fn is_major_syncing(&self) -> RelayChainResult { self.rpc_client.system_health().await.map(|h| h.is_syncing) } - fn overseer_handle(&self) -> RelayChainResult> { - unimplemented!("Overseer handle is not available on relay-chain-rpc-interface"); + fn overseer_handle(&self) -> RelayChainResult { + Ok(self.overseer_handle.clone()) } async fn get_storage_by_key( diff --git a/client/relay-chain-rpc-interface/src/rpc_client.rs b/client/relay-chain-rpc-interface/src/rpc_client.rs index 71014b18e0e..3422248735f 100644 --- a/client/relay-chain-rpc-interface/src/rpc_client.rs +++ b/client/relay-chain-rpc-interface/src/rpc_client.rs @@ -17,8 +17,13 @@ use backoff::{future::retry_notify, ExponentialBackoff}; use cumulus_primitives_core::{ relay_chain::{ - v2::{CommittedCandidateReceipt, OccupiedCoreAssumption, SessionIndex, ValidatorId}, - Hash as PHash, Header as PHeader, InboundHrmpMessage, + v2::{ + CandidateCommitments, CandidateEvent, CommittedCandidateReceipt, CoreState, + DisputeState, GroupRotationInfo, OccupiedCoreAssumption, OldV1SessionInfo, + PvfCheckStatement, ScrapedOnChainVotes, SessionIndex, SessionInfo, ValidationCode, + ValidationCodeHash, ValidatorId, ValidatorIndex, ValidatorSignature, + }, + CandidateHash, Hash as PHash, Header as PHeader, InboundHrmpMessage, }, InboundDownwardMessage, ParaId, PersistedValidationData, }; @@ -37,9 +42,11 @@ use jsonrpsee::{ ws_client::WsClientBuilder, }; use parity_scale_codec::{Decode, Encode}; -use polkadot_service::TaskManager; +use polkadot_service::{BlockNumber, TaskManager}; use sc_client_api::StorageData; use sc_rpc_api::{state::ReadProof, system::Health}; +use sp_api::RuntimeVersion; +use sp_consensus_babe::Epoch; use sp_core::sp_std::collections::btree_map::BTreeMap; use sp_runtime::DeserializeOwned; use sp_storage::StorageKey; @@ -253,8 +260,6 @@ impl RelayChainRpcClient { Decode::decode(&mut &*res.0).map_err(Into::into) } - /// Subscribe to a notification stream via RPC - /// Perform RPC request async fn request<'a, R>( &self, @@ -300,10 +305,69 @@ impl RelayChainRpcClient { RelayChainError::RpcCallError(method.to_string(), err)}) } + /// Returns information regarding the current epoch. + pub async fn babe_api_current_epoch(&self, at: PHash) -> Result { + self.call_remote_runtime_function("BabeApi_current_epoch", at, None::<()>).await + } + + /// Old method to fetch v1 session info. + pub async fn parachain_host_session_info_before_version_2( + &self, + at: PHash, + index: SessionIndex, + ) -> Result, RelayChainError> { + self.call_remote_runtime_function( + "ParachainHost_session_info_before_version_2", + at, + Some(index), + ) + .await + } + + /// Scrape dispute relevant from on-chain, backing votes and resolved disputes. + pub async fn parachain_host_on_chain_votes( + &self, + at: PHash, + ) -> Result>, RelayChainError> { + self.call_remote_runtime_function("ParachainHost_on_chain_votes", at, None::<()>) + .await + } + + /// Returns code hashes of PVFs that require pre-checking by validators in the active set. + pub async fn parachain_host_pvfs_require_precheck( + &self, + at: PHash, + ) -> Result, RelayChainError> { + self.call_remote_runtime_function("ParachainHost_pvfs_require_precheck", at, None::<()>) + .await + } + + /// Submits a PVF pre-checking statement into the transaction pool. + pub async fn parachain_host_submit_pvf_check_statement( + &self, + at: PHash, + stmt: PvfCheckStatement, + signature: ValidatorSignature, + ) -> Result<(), RelayChainError> { + self.call_remote_runtime_function( + "ParachainHost_submit_pvf_check_statement", + at, + Some((stmt, signature)), + ) + .await + } + + /// Get local listen address of the node + pub async fn system_local_listen_addresses(&self) -> Result, RelayChainError> { + self.request("system_localListenAddresses", None).await + } + + /// Get system health information pub async fn system_health(&self) -> Result { self.request("system_health", None).await } + /// Get read proof for `storage_keys` pub async fn state_get_read_proof( &self, storage_keys: Vec, @@ -313,6 +377,7 @@ impl RelayChainRpcClient { self.request("state_getReadProof", params).await } + /// Retrieve storage item at `storage_key` pub async fn state_get_storage( &self, storage_key: StorageKey, @@ -322,10 +387,191 @@ impl RelayChainRpcClient { self.request("state_getStorage", params).await } - pub async fn chain_get_head(&self) -> Result { - self.request("chain_getHead", None).await + /// Get hash of the n-th block in the canon chain. + /// + /// By default returns latest block hash. + pub async fn chain_get_head(&self, at: Option) -> Result { + let params = rpc_params!(at); + self.request("chain_getHead", params).await + } + + /// Returns the validator groups and rotation info localized based on the hypothetical child + /// of a block whose state this is invoked on. Note that `now` in the `GroupRotationInfo` + /// should be the successor of the number of the block. + pub async fn parachain_host_validator_groups( + &self, + at: PHash, + ) -> Result<(Vec>, GroupRotationInfo), RelayChainError> { + self.call_remote_runtime_function("ParachainHost_validator_groups", at, None::<()>) + .await + } + + /// Get a vector of events concerning candidates that occurred within a block. + pub async fn parachain_host_candidate_events( + &self, + at: PHash, + ) -> Result, RelayChainError> { + self.call_remote_runtime_function("ParachainHost_candidate_events", at, None::<()>) + .await + } + + /// Checks if the given validation outputs pass the acceptance criteria. + pub async fn parachain_host_check_validation_outputs( + &self, + at: PHash, + para_id: ParaId, + outputs: CandidateCommitments, + ) -> Result { + self.call_remote_runtime_function( + "ParachainHost_check_validation_outputs", + at, + Some((para_id, outputs)), + ) + .await + } + + /// Returns the persisted validation data for the given `ParaId` along with the corresponding + /// validation code hash. Instead of accepting assumption about the para, matches the validation + /// data hash against an expected one and yields `None` if they're not equal. + pub async fn parachain_host_assumed_validation_data( + &self, + at: PHash, + para_id: ParaId, + expected_hash: PHash, + ) -> Result, RelayChainError> { + self.call_remote_runtime_function( + "ParachainHost_persisted_assumed_validation_data", + at, + Some((para_id, expected_hash)), + ) + .await + } + + /// Get hash of last finalized block. + pub async fn chain_get_finalized_head(&self) -> Result { + self.request("chain_getFinalizedHead", None).await + } + + /// Get hash of n-th block. + pub async fn chain_get_block_hash( + &self, + block_number: Option, + ) -> Result, RelayChainError> { + let params = rpc_params!(block_number); + self.request("chain_getBlockHash", params).await + } + + /// Yields the persisted validation data for the given `ParaId` along with an assumption that + /// should be used if the para currently occupies a core. + /// + /// Returns `None` if either the para is not registered or the assumption is `Freed` + /// and the para already occupies a core. + pub async fn parachain_host_persisted_validation_data( + &self, + at: PHash, + para_id: ParaId, + occupied_core_assumption: OccupiedCoreAssumption, + ) -> Result, RelayChainError> { + self.call_remote_runtime_function( + "ParachainHost_persisted_validation_data", + at, + Some((para_id, occupied_core_assumption)), + ) + .await + } + + /// Get the validation code from its hash. + pub async fn parachain_host_validation_code_by_hash( + &self, + at: PHash, + validation_code_hash: ValidationCodeHash, + ) -> Result, RelayChainError> { + self.call_remote_runtime_function( + "ParachainHost_validation_code_by_hash", + at, + Some(validation_code_hash), + ) + .await + } + + /// Yields information on all availability cores as relevant to the child block. + /// Cores are either free or occupied. Free cores can have paras assigned to them. + pub async fn parachain_host_availability_cores( + &self, + at: PHash, + ) -> Result>, RelayChainError> { + self.call_remote_runtime_function("ParachainHost_availability_cores", at, None::<()>) + .await + } + + /// Get runtime version + pub async fn runtime_version(&self, at: PHash) -> Result { + let params = rpc_params!(at); + self.request("state_getRuntimeVersion", params).await + } + + /// Returns all onchain disputes. + /// This is a staging method! Do not use on production runtimes! + pub async fn parachain_host_staging_get_disputes( + &self, + at: PHash, + ) -> Result)>, RelayChainError> { + self.call_remote_runtime_function("ParachainHost_staging_get_disputes", at, None::<()>) + .await + } + + pub async fn authority_discovery_authorities( + &self, + at: PHash, + ) -> Result, RelayChainError> { + self.call_remote_runtime_function("AuthorityDiscoveryApi_authorities", at, None::<()>) + .await + } + + /// Fetch the validation code used by a para, making the given `OccupiedCoreAssumption`. + /// + /// Returns `None` if either the para is not registered or the assumption is `Freed` + /// and the para already occupies a core. + pub async fn parachain_host_validation_code( + &self, + at: PHash, + para_id: ParaId, + occupied_core_assumption: OccupiedCoreAssumption, + ) -> Result, RelayChainError> { + self.call_remote_runtime_function( + "ParachainHost_validation_code", + at, + Some((para_id, occupied_core_assumption)), + ) + .await + } + + /// Fetch the hash of the validation code used by a para, making the given `OccupiedCoreAssumption`. + pub async fn parachain_host_validation_code_hash( + &self, + at: PHash, + para_id: ParaId, + occupied_core_assumption: OccupiedCoreAssumption, + ) -> Result, RelayChainError> { + self.call_remote_runtime_function( + "ParachainHost_validation_code_hash", + at, + Some((para_id, occupied_core_assumption)), + ) + .await } + /// Get the session info for the given session, if stored. + pub async fn parachain_host_session_info( + &self, + at: PHash, + index: SessionIndex, + ) -> Result, RelayChainError> { + self.call_remote_runtime_function("ParachainHost_session_info", at, Some(index)) + .await + } + + /// Get header at specified hash pub async fn chain_get_header( &self, hash: Option, @@ -334,6 +580,8 @@ impl RelayChainRpcClient { self.request("chain_getHeader", params).await } + /// Get the receipt of a candidate pending availability. This returns `Some` for any paras + /// assigned to occupied cores in `availability_cores` and `None` otherwise. pub async fn parachain_host_candidate_pending_availability( &self, at: PHash, @@ -347,6 +595,9 @@ impl RelayChainRpcClient { .await } + /// Returns the session index expected at a child of the block. + /// + /// This can be used to instantiate a `SigningContext`. pub async fn parachain_host_session_index_for_child( &self, at: PHash, @@ -355,6 +606,7 @@ impl RelayChainRpcClient { .await } + /// Get the current validators. pub async fn parachain_host_validators( &self, at: PHash, @@ -363,20 +615,8 @@ impl RelayChainRpcClient { .await } - pub async fn parachain_host_persisted_validation_data( - &self, - at: PHash, - para_id: ParaId, - occupied_core_assumption: OccupiedCoreAssumption, - ) -> Result, RelayChainError> { - self.call_remote_runtime_function( - "ParachainHost_persisted_validation_data", - at, - Some((para_id, occupied_core_assumption)), - ) - .await - } - + /// Get the contents of all channels addressed to the given recipient. Channels that have no + /// messages in them are also included. pub async fn parachain_host_inbound_hrmp_channels_contents( &self, para_id: ParaId, @@ -390,6 +630,7 @@ impl RelayChainRpcClient { .await } + /// Get all the pending inbound messages in the downward message queue for a para. pub async fn parachain_host_dmq_contents( &self, para_id: ParaId, @@ -399,15 +640,7 @@ impl RelayChainRpcClient { .await } - fn send_register_message_to_worker( - &self, - message: NotificationRegisterMessage, - ) -> Result<(), RelayChainError> { - self.to_worker_channel - .try_send(message) - .map_err(|e| RelayChainError::WorkerCommunicationError(e.to_string())) - } - + /// Get a stream of all imported relay chain headers pub async fn get_imported_heads_stream(&self) -> Result, RelayChainError> { let (tx, rx) = futures::channel::mpsc::channel::(NOTIFICATION_CHANNEL_SIZE_LIMIT); self.send_register_message_to_worker(NotificationRegisterMessage::RegisterImportListener( @@ -416,6 +649,7 @@ impl RelayChainRpcClient { Ok(rx) } + /// Get a stream of new best relay chain headers pub async fn get_best_heads_stream(&self) -> Result, RelayChainError> { let (tx, rx) = futures::channel::mpsc::channel::(NOTIFICATION_CHANNEL_SIZE_LIMIT); self.send_register_message_to_worker( @@ -424,6 +658,7 @@ impl RelayChainRpcClient { Ok(rx) } + /// Get a stream of finalized relay chain headers pub async fn get_finalized_heads_stream(&self) -> Result, RelayChainError> { let (tx, rx) = futures::channel::mpsc::channel::(NOTIFICATION_CHANNEL_SIZE_LIMIT); self.send_register_message_to_worker( @@ -432,6 +667,15 @@ impl RelayChainRpcClient { Ok(rx) } + fn send_register_message_to_worker( + &self, + message: NotificationRegisterMessage, + ) -> Result<(), RelayChainError> { + self.to_worker_channel + .try_send(message) + .map_err(|e| RelayChainError::WorkerCommunicationError(e.to_string())) + } + async fn subscribe_imported_heads( ws_client: &JsonRpcClient, ) -> Result, RelayChainError> { diff --git a/client/service/src/lib.rs b/client/service/src/lib.rs index 269112b1622..067ca1c83f3 100644 --- a/client/service/src/lib.rs +++ b/client/service/src/lib.rs @@ -18,7 +18,6 @@ //! //! Provides functions for starting a collator node or a normal full node. -use cumulus_client_cli::CollatorOptions; use cumulus_client_consensus_common::ParachainConsensus; use cumulus_primitives_core::{CollectCollationInfo, ParaId}; use cumulus_relay_chain_interface::RelayChainInterface; @@ -108,8 +107,7 @@ where let overseer_handle = relay_chain_interface .overseer_handle() - .map_err(|e| sc_service::Error::Application(Box::new(e)))? - .ok_or_else(|| "Polkadot full node did not provide an `OverseerHandle`!")?; + .map_err(|e| sc_service::Error::Application(Box::new(e)))?; let pov_recovery = cumulus_client_pov_recovery::PoVRecovery::new( overseer_handle.clone(), @@ -149,7 +147,6 @@ pub struct StartFullNodeParams<'a, Block: BlockT, Client, RCInterface, IQ> { pub announce_block: Arc>) + Send + Sync>, pub relay_chain_slot_duration: Duration, pub import_queue: IQ, - pub collator_options: CollatorOptions, } /// Start a full node for a parachain. @@ -165,7 +162,6 @@ pub fn start_full_node( para_id, relay_chain_slot_duration, import_queue, - collator_options, }: StartFullNodeParams, ) -> sc_service::error::Result<()> where @@ -193,18 +189,9 @@ where .spawn_essential_handle() .spawn("cumulus-consensus", None, consensus); - // PoV Recovery is currently not supported when we connect to the - // relay chain via RPC, so we return early. The node will work, but not be able to recover PoVs from the - // relay chain if blocks are not announced on parachain. This will be enabled again once - // https://github.com/paritytech/cumulus/issues/545 is finished. - if collator_options.relay_chain_rpc_url.is_some() { - return Ok(()) - } - let overseer_handle = relay_chain_interface .overseer_handle() - .map_err(|e| sc_service::Error::Application(Box::new(e)))? - .ok_or_else(|| "Polkadot full node did not provide an `OverseerHandle`!")?; + .map_err(|e| sc_service::Error::Application(Box::new(e)))?; let pov_recovery = cumulus_client_pov_recovery::PoVRecovery::new( overseer_handle, diff --git a/parachain-template/node/Cargo.toml b/parachain-template/node/Cargo.toml index a1b1408936e..6db0087fffe 100644 --- a/parachain-template/node/Cargo.toml +++ b/parachain-template/node/Cargo.toml @@ -70,6 +70,7 @@ cumulus-primitives-parachain-inherent = { path = "../../primitives/parachain-inh cumulus-relay-chain-inprocess-interface = { path = "../../client/relay-chain-inprocess-interface" } cumulus-relay-chain-interface = { path = "../../client/relay-chain-interface" } cumulus-relay-chain-rpc-interface = { path = "../../client/relay-chain-rpc-interface" } +cumulus-relay-chain-minimal-node = { path = "../../client/relay-chain-minimal-node" } [build-dependencies] substrate-build-script-utils = { git = "https://github.com/paritytech/substrate", branch = "master" } diff --git a/parachain-template/node/src/command.rs b/parachain-template/node/src/command.rs index 7f7bfd140af..ee497b413f5 100644 --- a/parachain-template/node/src/command.rs +++ b/parachain-template/node/src/command.rs @@ -4,7 +4,7 @@ use codec::Encode; use cumulus_client_cli::generate_genesis_block; use cumulus_primitives_core::ParaId; use frame_benchmarking_cli::{BenchmarkCmd, SUBSTRATE_REFERENCE_HARDWARE}; -use log::info; +use log::{info, warn}; use parachain_template_runtime::{Block, RuntimeApi}; use sc_cli::{ ChainSpec, CliConfiguration, DefaultConfigurationValues, ImportParams, KeystoreParams, @@ -304,6 +304,10 @@ pub fn run() -> Result<()> { info!("Parachain genesis state: {}", genesis_state); info!("Is collating: {}", if config.role.is_authority() { "yes" } else { "no" }); + if collator_options.relay_chain_rpc_url.is_some() && cli.relay_chain_args.len() > 0 { + warn!("Detected relay chain node arguments together with --relay-chain-rpc-url. This command starts a minimal Polkadot node that only uses a network-related subset of all relay chain CLI options."); + } + crate::service::start_parachain_node( config, polkadot_config, diff --git a/parachain-template/node/src/service.rs b/parachain-template/node/src/service.rs index a4dda555749..91d8d54244d 100644 --- a/parachain-template/node/src/service.rs +++ b/parachain-template/node/src/service.rs @@ -22,7 +22,7 @@ use cumulus_client_service::{ use cumulus_primitives_core::ParaId; use cumulus_relay_chain_inprocess_interface::build_inprocess_relay_chain; use cumulus_relay_chain_interface::{RelayChainError, RelayChainInterface, RelayChainResult}; -use cumulus_relay_chain_rpc_interface::{create_client_and_start_worker, RelayChainRpcInterface}; +use cumulus_relay_chain_minimal_node::build_minimal_relay_chain_node; // Substrate Imports use sc_executor::NativeElseWasmExecutor; @@ -176,10 +176,8 @@ async fn build_relay_chain_interface( hwbench: Option, ) -> RelayChainResult<(Arc<(dyn RelayChainInterface + 'static)>, Option)> { match collator_options.relay_chain_rpc_url { - Some(relay_chain_url) => { - let client = create_client_and_start_worker(relay_chain_url, task_manager).await?; - Ok((Arc::new(RelayChainRpcInterface::new(client)) as Arc<_>, None)) - }, + Some(relay_chain_url) => + build_minimal_relay_chain_node(polkadot_config, task_manager, relay_chain_url).await, None => build_inprocess_relay_chain( polkadot_config, parachain_config, @@ -365,7 +363,6 @@ where )?; let spawner = task_manager.spawn_handle(); - let params = StartCollatorParams { para_id: id, block_status: client.clone(), @@ -390,7 +387,6 @@ where relay_chain_interface, relay_chain_slot_duration, import_queue, - collator_options, }; start_full_node(params)?; diff --git a/polkadot-parachain/Cargo.toml b/polkadot-parachain/Cargo.toml index 68b5830bd0c..356662560f6 100644 --- a/polkadot-parachain/Cargo.toml +++ b/polkadot-parachain/Cargo.toml @@ -83,6 +83,7 @@ cumulus-primitives-parachain-inherent = { path = "../primitives/parachain-inhere cumulus-relay-chain-interface = { path = "../client/relay-chain-interface" } cumulus-relay-chain-inprocess-interface = { path = "../client/relay-chain-inprocess-interface" } cumulus-relay-chain-rpc-interface = { path = "../client/relay-chain-rpc-interface" } +cumulus-relay-chain-minimal-node = { path = "../client/relay-chain-minimal-node" } [build-dependencies] substrate-build-script-utils = { git = "https://github.com/paritytech/substrate", branch = "master" } diff --git a/polkadot-parachain/src/cli.rs b/polkadot-parachain/src/cli.rs index c99e5459836..36a4cfd562d 100644 --- a/polkadot-parachain/src/cli.rs +++ b/polkadot-parachain/src/cli.rs @@ -83,7 +83,7 @@ pub struct Cli { pub no_hardware_benchmarks: bool, /// Relay chain arguments - #[clap(raw = true, conflicts_with = "relay-chain-rpc-url")] + #[clap(raw = true)] pub relaychain_args: Vec, } diff --git a/polkadot-parachain/src/command.rs b/polkadot-parachain/src/command.rs index 0c3fbe67026..fbffbc03c1b 100644 --- a/polkadot-parachain/src/command.rs +++ b/polkadot-parachain/src/command.rs @@ -26,7 +26,7 @@ use codec::Encode; use cumulus_client_cli::generate_genesis_block; use cumulus_primitives_core::ParaId; use frame_benchmarking_cli::{BenchmarkCmd, SUBSTRATE_REFERENCE_HARDWARE}; -use log::info; +use log::{info, warn}; use parachains_common::{AuraId, StatemintAuraId}; use sc_cli::{ ChainSpec, CliConfiguration, DefaultConfigurationValues, ImportParams, KeystoreParams, @@ -678,6 +678,10 @@ pub fn run() -> Result<()> { info!("Parachain genesis state: {}", genesis_state); info!("Is collating: {}", if config.role.is_authority() { "yes" } else { "no" }); + if collator_options.relay_chain_rpc_url.is_some() && cli.relaychain_args.len() > 0 { + warn!("Detected relay chain node arguments together with --relay-chain-rpc-url. This command starts a minimal Polkadot node that only uses a network-related subset of all relay chain CLI options."); + } + match config.chain_spec.runtime() { Runtime::Statemint => crate::service::start_generic_aura_node::< statemint_runtime::RuntimeApi, diff --git a/polkadot-parachain/src/service.rs b/polkadot-parachain/src/service.rs index 45f14319749..ca47af08eec 100644 --- a/polkadot-parachain/src/service.rs +++ b/polkadot-parachain/src/service.rs @@ -30,7 +30,7 @@ use cumulus_primitives_core::{ }; use cumulus_relay_chain_inprocess_interface::build_inprocess_relay_chain; use cumulus_relay_chain_interface::{RelayChainError, RelayChainInterface, RelayChainResult}; -use cumulus_relay_chain_rpc_interface::{create_client_and_start_worker, RelayChainRpcInterface}; +use cumulus_relay_chain_minimal_node::build_minimal_relay_chain_node; use polkadot_service::CollatorPair; use sp_core::Pair; @@ -267,10 +267,8 @@ async fn build_relay_chain_interface( hwbench: Option, ) -> RelayChainResult<(Arc<(dyn RelayChainInterface + 'static)>, Option)> { match collator_options.relay_chain_rpc_url { - Some(relay_chain_url) => { - let client = create_client_and_start_worker(relay_chain_url, task_manager).await?; - Ok((Arc::new(RelayChainRpcInterface::new(client)) as Arc<_>, None)) - }, + Some(relay_chain_url) => + build_minimal_relay_chain_node(polkadot_config, task_manager, relay_chain_url).await, None => build_inprocess_relay_chain( polkadot_config, parachain_config, @@ -467,7 +465,6 @@ where relay_chain_interface, relay_chain_slot_duration, import_queue, - collator_options, }; start_full_node(params)?; @@ -677,7 +674,6 @@ where relay_chain_interface, relay_chain_slot_duration, import_queue, - collator_options, }; start_full_node(params)?; @@ -1492,7 +1488,6 @@ where relay_chain_interface, relay_chain_slot_duration, import_queue, - collator_options, }; start_full_node(params)?; diff --git a/test/service/Cargo.toml b/test/service/Cargo.toml index 8d9c9c04fa7..bd3f4b9c90c 100644 --- a/test/service/Cargo.toml +++ b/test/service/Cargo.toml @@ -69,6 +69,7 @@ cumulus-relay-chain-interface = { path = "../../client/relay-chain-interface" } cumulus-relay-chain-rpc-interface = { path = "../../client/relay-chain-rpc-interface" } cumulus-test-relay-validation-worker-provider = { path = "../relay-validation-worker-provider" } cumulus-test-runtime = { path = "../runtime" } +cumulus-relay-chain-minimal-node = { path = "../../client/relay-chain-minimal-node" } [dev-dependencies] futures = "0.3.24" diff --git a/test/service/src/cli.rs b/test/service/src/cli.rs index 40b63798856..4cf4b925997 100644 --- a/test/service/src/cli.rs +++ b/test/service/src/cli.rs @@ -41,7 +41,7 @@ pub struct TestCollatorCli { pub parachain_id: u32, /// Relay chain arguments - #[clap(raw = true, conflicts_with = "relay-chain-rpc-url")] + #[clap(raw = true)] pub relaychain_args: Vec, #[clap(long)] diff --git a/test/service/src/lib.rs b/test/service/src/lib.rs index c211ece81d6..9e9f5883f97 100644 --- a/test/service/src/lib.rs +++ b/test/service/src/lib.rs @@ -38,7 +38,8 @@ use cumulus_client_service::{ use cumulus_primitives_core::ParaId; use cumulus_relay_chain_inprocess_interface::RelayChainInProcessInterface; use cumulus_relay_chain_interface::{RelayChainError, RelayChainInterface, RelayChainResult}; -use cumulus_relay_chain_rpc_interface::{create_client_and_start_worker, RelayChainRpcInterface}; +use cumulus_relay_chain_minimal_node::build_minimal_relay_chain_node; + use cumulus_test_runtime::{Hash, Header, NodeBlock as Block, RuntimeApi}; use frame_system_rpc_runtime_api::AccountNonceApi; @@ -183,8 +184,9 @@ async fn build_relay_chain_interface( task_manager: &mut TaskManager, ) -> RelayChainResult> { if let Some(relay_chain_url) = collator_options.relay_chain_rpc_url { - let client = create_client_and_start_worker(relay_chain_url, task_manager).await?; - return Ok(Arc::new(RelayChainRpcInterface::new(client)) as Arc<_>) + return build_minimal_relay_chain_node(relay_chain_config, task_manager, relay_chain_url) + .await + .map(|r| r.0) } let relay_chain_full_node = polkadot_test_service::new_full( @@ -198,12 +200,15 @@ async fn build_relay_chain_interface( )?; task_manager.add_child(relay_chain_full_node.task_manager); + tracing::info!("Using inprocess node."); Ok(Arc::new(RelayChainInProcessInterface::new( relay_chain_full_node.client.clone(), relay_chain_full_node.backend.clone(), Arc::new(relay_chain_full_node.network.clone()), - relay_chain_full_node.overseer_handle, - )) as Arc<_>) + relay_chain_full_node.overseer_handle.ok_or(RelayChainError::GenericError( + "Overseer should be running in full node.".to_string(), + ))?, + ))) } /// Start a node with the given parachain `Configuration` and relay chain `Configuration`. @@ -367,7 +372,6 @@ where // the recovery delay of pov-recovery. We don't want to wait for too // long on the full node to recover, so we reduce this time here. relay_chain_slot_duration: Duration::from_millis(6), - collator_options, }; start_full_node(params)?; @@ -473,9 +477,9 @@ impl TestNodeBuilder { /// node. pub fn connect_to_parachain_nodes<'a>( mut self, - nodes: impl Iterator, + nodes: impl IntoIterator, ) -> Self { - self.parachain_nodes.extend(nodes.map(|n| n.addr.clone())); + self.parachain_nodes.extend(nodes.into_iter().map(|n| n.addr.clone())); self } diff --git a/test/service/src/main.rs b/test/service/src/main.rs index b87a8ed191d..93be592a472 100644 --- a/test/service/src/main.rs +++ b/test/service/src/main.rs @@ -78,7 +78,8 @@ fn main() -> Result<(), sc_cli::Error> { }) }, None => { - let mut builder = sc_cli::LoggerBuilder::new(""); + let log_filters = cli.run.normalize().log_filters(); + let mut builder = sc_cli::LoggerBuilder::new(log_filters.unwrap_or_default()); builder.with_colors(true); let _ = builder.init(); diff --git a/zombienet_tests/0006-rpc_collator_builds_blocks.feature b/zombienet_tests/0006-rpc_collator_builds_blocks.feature new file mode 100644 index 00000000000..558e65f96db --- /dev/null +++ b/zombienet_tests/0006-rpc_collator_builds_blocks.feature @@ -0,0 +1,17 @@ +Description: RPC collator should build blocks +Network: ./0006-rpc_collator_builds_blocks.toml +Creds: config + +alice: is up +bob: is up +charlie: is up +one: is up +two: is up +dave: is up +eve: is up + +alice: parachain 2000 is registered within 225 seconds +alice: parachain 2000 block height is at least 10 within 250 seconds + +dave: reports block height is at least 12 within 250 seconds +eve: reports block height is at least 12 within 250 seconds diff --git a/zombienet_tests/0006-rpc_collator_builds_blocks.toml b/zombienet_tests/0006-rpc_collator_builds_blocks.toml new file mode 100644 index 00000000000..9414532682a --- /dev/null +++ b/zombienet_tests/0006-rpc_collator_builds_blocks.toml @@ -0,0 +1,46 @@ +[relaychain] +default_image = "{{RELAY_IMAGE}}" +default_command = "polkadot" +default_args = [ "-lparachain=debug" ] + +chain = "rococo-local" + + [[relaychain.nodes]] + name = "alice" + validator = true + + [[relaychain.nodes]] + name = "bob" + validator = true + + [[relaychain.nodes]] + name = "charlie" + validator = true + + [[relaychain.nodes]] + name = "one" + validator = false + + [[relaychain.nodes]] + name = "two" + validator = false + +[[parachains]] +id = 2000 +cumulus_based = true + + # run dave as parachain full node + [[parachains.collators]] + name = "dave" + validator = true + image = "{{COL_IMAGE}}" + command = "test-parachain" + args = ["-lparachain=debug,blockchain-rpc-client=debug", "--relay-chain-rpc-url {{'one'|zombie('wsUri')}}", "-- --bootnodes {{'one'|zombie('multiAddress')}}"] + + # run eve as parachain full node + [[parachains.collators]] + name = "eve" + validator = true + image = "{{COL_IMAGE}}" + command = "test-parachain" + args = ["-lparachain=debug,blockchain-rpc-client=debug", "--relay-chain-rpc-url {{'two'|zombie('wsUri')}}", "-- --bootnodes {{'two'|zombie('multiAddress')}}"]