diff --git a/Cargo.lock b/Cargo.lock index 552ef2bb432..d25c4e24344 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2225,10 +2225,17 @@ name = "cumulus-client-consensus-aura" version = "0.1.0" dependencies = [ "async-trait", + "cumulus-client-collator", "cumulus-client-consensus-common", + "cumulus-client-consensus-proposer", "cumulus-primitives-core", + "cumulus-primitives-parachain-inherent", + "cumulus-relay-chain-interface", "futures", "parity-scale-codec", + "polkadot-node-primitives", + "polkadot-overseer", + "polkadot-primitives", "sc-client-api", "sc-consensus", "sc-consensus-aura", @@ -2244,6 +2251,8 @@ dependencies = [ "sp-inherents", "sp-keystore", "sp-runtime", + "sp-state-machine", + "sp-timestamp", "substrate-prometheus-endpoint", "tracing", ] @@ -2268,12 +2277,28 @@ dependencies = [ "schnellru", "sp-blockchain", "sp-consensus", + "sp-core", "sp-runtime", "sp-tracing", "sp-trie", + "substrate-prometheus-endpoint", "tracing", ] +[[package]] +name = "cumulus-client-consensus-proposer" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-trait", + "cumulus-primitives-parachain-inherent", + "sp-consensus", + "sp-inherents", + "sp-runtime", + "sp-state-machine", + "thiserror", +] + [[package]] name = "cumulus-client-consensus-relay-chain" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 4ceb8d5c04a..2e96126cfa6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,7 @@ members = [ "client/cli", "client/consensus/aura", "client/consensus/common", + "client/consensus/proposer", "client/consensus/relay-chain", "client/network", "client/pov-recovery", diff --git a/client/collator/Cargo.toml b/client/collator/Cargo.toml index b8e05fd37c5..6b04a319dcc 100644 --- a/client/collator/Cargo.toml +++ b/client/collator/Cargo.toml @@ -5,15 +5,15 @@ authors = ["Parity Technologies "] edition = "2021" [dependencies] +parking_lot = "0.12.1" codec = { package = "parity-scale-codec", version = "3.0.0", features = [ "derive" ] } futures = "0.3.21" -parking_lot = "0.12.0" tracing = "0.1.25" # Substrate sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "master" } -sp-api = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-consensus = { git = "https://github.com/paritytech/substrate", branch = "master" } +sp-api = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" } diff --git a/client/collator/src/lib.rs b/client/collator/src/lib.rs index 9e3b4953c0a..aca40b2b342 100644 --- a/client/collator/src/lib.rs +++ b/client/collator/src/lib.rs @@ -16,50 +16,46 @@ //! Cumulus Collator implementation for Substrate. -use cumulus_client_network::WaitToAnnounce; use cumulus_primitives_core::{ - relay_chain::Hash as PHash, CollationInfo, CollectCollationInfo, ParachainBlockData, - PersistedValidationData, + relay_chain::Hash as PHash, CollectCollationInfo, PersistedValidationData, }; use sc_client_api::BlockBackend; -use sp_api::{ApiExt, ProvideRuntimeApi}; -use sp_consensus::BlockStatus; +use sp_api::ProvideRuntimeApi; use sp_core::traits::SpawnNamed; -use sp_runtime::traits::{Block as BlockT, HashFor, Header as HeaderT, Zero}; +use sp_runtime::traits::{Block as BlockT, Header as HeaderT}; use cumulus_client_consensus_common::ParachainConsensus; -use polkadot_node_primitives::{ - BlockData, Collation, CollationGenerationConfig, CollationResult, MaybeCompressedPoV, PoV, -}; -use polkadot_node_subsystem::messages::{CollationGenerationMessage, CollatorProtocolMessage}; +use polkadot_node_primitives::{CollationResult, MaybeCompressedPoV}; use polkadot_overseer::Handle as OverseerHandle; use polkadot_primitives::{CollatorPair, Id as ParaId}; use codec::{Decode, Encode}; -use futures::{channel::oneshot, FutureExt}; -use parking_lot::Mutex; +use futures::prelude::*; use std::sync::Arc; -use tracing::Instrument; + +use crate::service::CollatorService; + +pub mod service; /// The logging target. const LOG_TARGET: &str = "cumulus-collator"; /// The implementation of the Cumulus `Collator`. +/// +/// Note that this implementation is soon to be deprecated and removed, and it is suggested to +/// directly use the [`CollatorService`] instead, so consensus engine implementations +/// live at the top level. pub struct Collator { - block_status: Arc, + service: CollatorService, parachain_consensus: Box>, - wait_to_announce: Arc>>, - runtime_api: Arc, } impl Clone for Collator { fn clone(&self) -> Self { - Self { - block_status: self.block_status.clone(), - wait_to_announce: self.wait_to_announce.clone(), + Collator { + service: self.service.clone(), parachain_consensus: self.parachain_consensus.clone(), - runtime_api: self.runtime_api.clone(), } } } @@ -73,159 +69,10 @@ where { /// Create a new instance. fn new( - block_status: Arc, - spawner: Arc, - announce_block: Arc>) + Send + Sync>, - runtime_api: Arc, + collator_service: CollatorService, parachain_consensus: Box>, ) -> Self { - let wait_to_announce = Arc::new(Mutex::new(WaitToAnnounce::new(spawner, announce_block))); - - Self { block_status, wait_to_announce, runtime_api, parachain_consensus } - } - - /// Checks the status of the given block hash in the Parachain. - /// - /// Returns `true` if the block could be found and is good to be build on. - fn check_block_status(&self, hash: Block::Hash, header: &Block::Header) -> bool { - match self.block_status.block_status(hash) { - Ok(BlockStatus::Queued) => { - tracing::debug!( - target: LOG_TARGET, - block_hash = ?hash, - "Skipping candidate production, because block is still queued for import.", - ); - false - }, - Ok(BlockStatus::InChainWithState) => true, - Ok(BlockStatus::InChainPruned) => { - tracing::error!( - target: LOG_TARGET, - "Skipping candidate production, because block `{:?}` is already pruned!", - hash, - ); - false - }, - Ok(BlockStatus::KnownBad) => { - tracing::error!( - target: LOG_TARGET, - block_hash = ?hash, - "Block is tagged as known bad and is included in the relay chain! Skipping candidate production!", - ); - false - }, - Ok(BlockStatus::Unknown) => { - if header.number().is_zero() { - tracing::error!( - target: LOG_TARGET, - block_hash = ?hash, - "Could not find the header of the genesis block in the database!", - ); - } else { - tracing::debug!( - target: LOG_TARGET, - block_hash = ?hash, - "Skipping candidate production, because block is unknown.", - ); - } - false - }, - Err(e) => { - tracing::error!( - target: LOG_TARGET, - block_hash = ?hash, - error = ?e, - "Failed to get block status.", - ); - false - }, - } - } - - /// Fetch the collation info from the runtime. - /// - /// Returns `Ok(Some(_))` on success, `Err(_)` on error or `Ok(None)` if the runtime api isn't implemented by the runtime. - fn fetch_collation_info( - &self, - block_hash: Block::Hash, - header: &Block::Header, - ) -> Result, sp_api::ApiError> { - let runtime_api = self.runtime_api.runtime_api(); - - let api_version = - match runtime_api.api_version::>(block_hash)? { - Some(version) => version, - None => { - tracing::error!( - target: LOG_TARGET, - "Could not fetch `CollectCollationInfo` runtime api version." - ); - return Ok(None) - }, - }; - - let collation_info = if api_version < 2 { - #[allow(deprecated)] - runtime_api - .collect_collation_info_before_version_2(block_hash)? - .into_latest(header.encode().into()) - } else { - runtime_api.collect_collation_info(block_hash, header)? - }; - - Ok(Some(collation_info)) - } - - fn build_collation( - &self, - block: ParachainBlockData, - block_hash: Block::Hash, - pov: PoV, - ) -> Option { - let collation_info = self - .fetch_collation_info(block_hash, block.header()) - .map_err(|e| { - tracing::error!( - target: LOG_TARGET, - error = ?e, - "Failed to collect collation info.", - ) - }) - .ok() - .flatten()?; - - let upward_messages = collation_info - .upward_messages - .try_into() - .map_err(|e| { - tracing::error!( - target: LOG_TARGET, - error = ?e, - "Number of upward messages should not be greater than `MAX_UPWARD_MESSAGE_NUM`", - ) - }) - .ok()?; - let horizontal_messages = collation_info - .horizontal_messages - .try_into() - .map_err(|e| { - tracing::error!( - target: LOG_TARGET, - error = ?e, - "Number of horizontal messages should not be greater than `MAX_HORIZONTAL_MESSAGE_NUM`", - ) - }) - .ok()?; - - Some(Collation { - upward_messages, - new_validation_code: collation_info.new_validation_code, - processed_downward_messages: collation_info.processed_downward_messages, - horizontal_messages, - hrmp_watermark: collation_info.hrmp_watermark, - head_data: collation_info.head_data, - proof_of_validity: MaybeCompressedPoV::Compressed(pov), - }) + Self { service: collator_service, parachain_consensus } } async fn produce_candidate( @@ -252,7 +99,7 @@ where }; let last_head_hash = last_head.hash(); - if !self.check_block_status(last_head_hash, &last_head) { + if !self.service.check_block_status(last_head_hash, &last_head) { return None } @@ -268,19 +115,9 @@ where .produce_candidate(&last_head, relay_parent, &validation_data) .await?; - let (header, extrinsics) = candidate.block.deconstruct(); - - let compact_proof = - match candidate.proof.into_compact_proof::>(*last_head.state_root()) { - Ok(proof) => proof, - Err(e) => { - tracing::error!(target: "cumulus-collator", "Failed to compact proof: {:?}", e); - return None - }, - }; + let block_hash = candidate.block.header().hash(); - // Create the parachain block data for the validators. - let b = ParachainBlockData::::new(header, extrinsics, compact_proof); + let (collation, b) = self.service.build_collation(&last_head, block_hash, candidate)?; tracing::info!( target: LOG_TARGET, @@ -290,25 +127,109 @@ where b.storage_proof().encode().len() as f64 / 1024f64, ); - let pov = - polkadot_node_primitives::maybe_compress_pov(PoV { block_data: BlockData(b.encode()) }); + if let MaybeCompressedPoV::Compressed(ref pov) = collation.proof_of_validity { + tracing::info!( + target: LOG_TARGET, + "Compressed PoV size: {}kb", + pov.block_data.0.len() as f64 / 1024f64, + ); + } - tracing::info!( - target: LOG_TARGET, - "Compressed PoV size: {}kb", - pov.block_data.0.len() as f64 / 1024f64, - ); + let result_sender = self.service.announce_with_barrier(block_hash); - let block_hash = b.header().hash(); - let collation = self.build_collation(b, block_hash, pov)?; + tracing::info!(target: LOG_TARGET, ?block_hash, "Produced proof-of-validity candidate.",); - let (result_sender, signed_stmt_recv) = oneshot::channel(); + Some(CollationResult { collation, result_sender: Some(result_sender) }) + } +} - self.wait_to_announce.lock().wait_to_announce(block_hash, signed_stmt_recv); +/// Relay-chain-driven collators are those whose block production is driven purely +/// by new relay chain blocks and the most recently included parachain blocks +/// within them. +/// +/// This method of driving collators is not suited to anything but the most simple parachain +/// consensus mechanisms, and this module may soon be deprecated. +pub mod relay_chain_driven { + use futures::{ + channel::{mpsc, oneshot}, + prelude::*, + }; + use polkadot_node_primitives::{CollationGenerationConfig, CollationResult}; + use polkadot_node_subsystem::messages::{CollationGenerationMessage, CollatorProtocolMessage}; + use polkadot_overseer::Handle as OverseerHandle; + use polkadot_primitives::{CollatorPair, Id as ParaId}; - tracing::info!(target: LOG_TARGET, ?block_hash, "Produced proof-of-validity candidate.",); + use cumulus_primitives_core::{relay_chain::Hash as PHash, PersistedValidationData}; - Some(CollationResult { collation, result_sender: Some(result_sender) }) + /// A request to author a collation, based on the advancement of the relay chain. + /// + /// See the module docs for more info on relay-chain-driven collators. + pub struct CollationRequest { + relay_parent: PHash, + pvd: PersistedValidationData, + sender: oneshot::Sender>, + } + + impl CollationRequest { + /// Get the relay parent of the collation request. + pub fn relay_parent(&self) -> &PHash { + &self.relay_parent + } + + /// Get the [`PersistedValidationData`] for the request. + pub fn persisted_validation_data(&self) -> &PersistedValidationData { + &self.pvd + } + + /// Complete the request with a collation, if any. + pub fn complete(self, collation: Option) { + let _ = self.sender.send(collation); + } + } + + /// Initialize the collator with Polkadot's collation-generation + /// subsystem, returning a stream of collation requests to handle. + pub async fn init( + key: CollatorPair, + para_id: ParaId, + overseer_handle: OverseerHandle, + ) -> mpsc::Receiver { + let mut overseer_handle = overseer_handle; + + let (stream_tx, stream_rx) = mpsc::channel(0); + let config = CollationGenerationConfig { + key, + para_id, + collator: Box::new(move |relay_parent, validation_data| { + // Cloning the channel on each usage effectively makes the channel + // unbounded. The channel is actually bounded by the block production + // and consensus systems of Polkadot, which limits the amount of possible + // blocks. + let mut stream_tx = stream_tx.clone(); + let validation_data = validation_data.clone(); + Box::pin(async move { + let (this_tx, this_rx) = oneshot::channel(); + let request = + CollationRequest { relay_parent, pvd: validation_data, sender: this_tx }; + + if stream_tx.send(request).await.is_err() { + return None + } + + this_rx.await.ok().flatten() + }) + }), + }; + + overseer_handle + .send_msg(CollationGenerationMessage::Initialize(config), "StartCollator") + .await; + + overseer_handle + .send_msg(CollatorProtocolMessage::CollateOn(para_id), "StartCollator") + .await; + + stream_rx } } @@ -330,7 +251,7 @@ pub async fn start_collator( para_id, block_status, announce_block, - mut overseer_handle, + overseer_handle, spawner, key, parachain_consensus, @@ -343,34 +264,28 @@ pub async fn start_collator( RA: ProvideRuntimeApi + Send + Sync + 'static, RA::Api: CollectCollationInfo, { - let collator = Collator::new( - block_status, - Arc::new(spawner), - announce_block, - runtime_api, - parachain_consensus, - ); + let collator_service = + CollatorService::new(block_status, Arc::new(spawner.clone()), announce_block, runtime_api); - let span = tracing::Span::current(); - let config = CollationGenerationConfig { - key, - para_id, - collator: Box::new(move |relay_parent, validation_data| { - let collator = collator.clone(); - collator - .produce_candidate(relay_parent, validation_data.clone()) - .instrument(span.clone()) - .boxed() - }), - }; + let collator = Collator::new(collator_service, parachain_consensus); + + let mut request_stream = relay_chain_driven::init(key, para_id, overseer_handle).await; - overseer_handle - .send_msg(CollationGenerationMessage::Initialize(config), "StartCollator") - .await; + let collation_future = Box::pin(async move { + while let Some(request) = request_stream.next().await { + let collation = collator + .clone() + .produce_candidate( + *request.relay_parent(), + request.persisted_validation_data().clone(), + ) + .await; + + request.complete(collation); + } + }); - overseer_handle - .send_msg(CollatorProtocolMessage::CollateOn(para_id), "StartCollator") - .await; + spawner.spawn("cumulus-relay-driven-collator", None, collation_future); } #[cfg(test)] @@ -378,12 +293,14 @@ mod tests { use super::*; use async_trait::async_trait; use cumulus_client_consensus_common::ParachainCandidate; + use cumulus_primitives_core::ParachainBlockData; use cumulus_test_client::{ Client, ClientBlockImportExt, DefaultTestClientBuilderExt, InitBlockBuilder, TestClientBuilder, TestClientBuilderExt, }; use cumulus_test_runtime::{Block, Header}; use futures::{channel::mpsc, executor::block_on, StreamExt}; + use polkadot_node_subsystem::messages::CollationGenerationMessage; use polkadot_node_subsystem_test_helpers::ForwardSubsystem; use polkadot_overseer::{dummy::dummy_overseer_builder, HeadSupportsParachains}; use sp_consensus::BlockOrigin; diff --git a/client/collator/src/service.rs b/client/collator/src/service.rs new file mode 100644 index 00000000000..7724b0a68a6 --- /dev/null +++ b/client/collator/src/service.rs @@ -0,0 +1,318 @@ +// Copyright 2023 Parity Technologies (UK) Ltd. +// This file is part of Cumulus. + +// Cumulus is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Cumulus is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Cumulus. If not, see . + +//! The Cumulus [`CollatorService`] is a utility struct for performing common +//! operations used in parachain consensus/authoring. + +use cumulus_client_network::WaitToAnnounce; +use cumulus_primitives_core::{CollationInfo, CollectCollationInfo, ParachainBlockData}; + +use sc_client_api::BlockBackend; +use sp_api::{ApiExt, ProvideRuntimeApi}; +use sp_consensus::BlockStatus; +use sp_core::traits::SpawnNamed; +use sp_runtime::traits::{Block as BlockT, HashFor, Header as HeaderT, Zero}; + +use cumulus_client_consensus_common::ParachainCandidate; +use polkadot_node_primitives::{ + BlockData, Collation, CollationSecondedSignal, MaybeCompressedPoV, PoV, +}; + +use codec::Encode; +use futures::channel::oneshot; +use parking_lot::Mutex; +use std::sync::Arc; + +/// The logging target. +const LOG_TARGET: &str = "cumulus-collator"; + +/// Utility functions generally applicable to writing collators for Cumulus. +pub trait ServiceInterface { + /// Checks the status of the given block hash in the Parachain. + /// + /// Returns `true` if the block could be found and is good to be build on. + fn check_block_status(&self, hash: Block::Hash, header: &Block::Header) -> bool; + + /// Build a full [`Collation`] from a given [`ParachainCandidate`]. This requires + /// that the underlying block has been fully imported into the underlying client, + /// as implementations will fetch underlying runtime API data. + /// + /// This also returns the unencoded parachain block data, in case that is desired. + fn build_collation( + &self, + parent_header: &Block::Header, + block_hash: Block::Hash, + candidate: ParachainCandidate, + ) -> Option<(Collation, ParachainBlockData)>; + + /// Inform networking systems that the block should be announced after an appropriate + /// signal has been received. This returns the sending half of the signal. + fn announce_with_barrier( + &self, + block_hash: Block::Hash, + ) -> oneshot::Sender; +} + +/// The [`CollatorService`] provides common utilities for parachain consensus and authoring. +/// +/// This includes logic for checking the block status of arbitrary parachain headers +/// gathered from the relay chain state, creating full [`Collation`]s to be shared with validators, +/// and distributing new parachain blocks along the network. +pub struct CollatorService { + block_status: Arc, + wait_to_announce: Arc>>, + runtime_api: Arc, +} + +impl Clone for CollatorService { + fn clone(&self) -> Self { + Self { + block_status: self.block_status.clone(), + wait_to_announce: self.wait_to_announce.clone(), + runtime_api: self.runtime_api.clone(), + } + } +} + +impl CollatorService +where + Block: BlockT, + BS: BlockBackend, + RA: ProvideRuntimeApi, + RA::Api: CollectCollationInfo, +{ + /// Create a new instance. + pub fn new( + block_status: Arc, + spawner: Arc, + announce_block: Arc>) + Send + Sync>, + runtime_api: Arc, + ) -> Self { + let wait_to_announce = Arc::new(Mutex::new(WaitToAnnounce::new(spawner, announce_block))); + + Self { block_status, wait_to_announce, runtime_api } + } + + /// Checks the status of the given block hash in the Parachain. + /// + /// Returns `true` if the block could be found and is good to be build on. + pub fn check_block_status(&self, hash: Block::Hash, header: &Block::Header) -> bool { + match self.block_status.block_status(hash) { + Ok(BlockStatus::Queued) => { + tracing::debug!( + target: LOG_TARGET, + block_hash = ?hash, + "Skipping candidate production, because block is still queued for import.", + ); + false + }, + Ok(BlockStatus::InChainWithState) => true, + Ok(BlockStatus::InChainPruned) => { + tracing::error!( + target: LOG_TARGET, + "Skipping candidate production, because block `{:?}` is already pruned!", + hash, + ); + false + }, + Ok(BlockStatus::KnownBad) => { + tracing::error!( + target: LOG_TARGET, + block_hash = ?hash, + "Block is tagged as known bad and is included in the relay chain! Skipping candidate production!", + ); + false + }, + Ok(BlockStatus::Unknown) => { + if header.number().is_zero() { + tracing::error!( + target: LOG_TARGET, + block_hash = ?hash, + "Could not find the header of the genesis block in the database!", + ); + } else { + tracing::debug!( + target: LOG_TARGET, + block_hash = ?hash, + "Skipping candidate production, because block is unknown.", + ); + } + false + }, + Err(e) => { + tracing::error!( + target: LOG_TARGET, + block_hash = ?hash, + error = ?e, + "Failed to get block status.", + ); + false + }, + } + } + + /// Fetch the collation info from the runtime. + /// + /// Returns `Ok(Some(_))` on success, `Err(_)` on error or `Ok(None)` if the runtime api isn't implemented by the runtime. + pub fn fetch_collation_info( + &self, + block_hash: Block::Hash, + header: &Block::Header, + ) -> Result, sp_api::ApiError> { + let runtime_api = self.runtime_api.runtime_api(); + + let api_version = + match runtime_api.api_version::>(block_hash)? { + Some(version) => version, + None => { + tracing::error!( + target: LOG_TARGET, + "Could not fetch `CollectCollationInfo` runtime api version." + ); + return Ok(None) + }, + }; + + let collation_info = if api_version < 2 { + #[allow(deprecated)] + runtime_api + .collect_collation_info_before_version_2(block_hash)? + .into_latest(header.encode().into()) + } else { + runtime_api.collect_collation_info(block_hash, header)? + }; + + Ok(Some(collation_info)) + } + + /// Build a full [`Collation`] from a given [`ParachainCandidate`]. This requires + /// that the underlying block has been fully imported into the underlying client, + /// as it fetches underlying runtime API data. + /// + /// This also returns the unencoded parachain block data, in case that is desired. + pub fn build_collation( + &self, + parent_header: &Block::Header, + block_hash: Block::Hash, + candidate: ParachainCandidate, + ) -> Option<(Collation, ParachainBlockData)> { + let (header, extrinsics) = candidate.block.deconstruct(); + + let compact_proof = match candidate + .proof + .into_compact_proof::>(*parent_header.state_root()) + { + Ok(proof) => proof, + Err(e) => { + tracing::error!(target: "cumulus-collator", "Failed to compact proof: {:?}", e); + return None + }, + }; + + // Create the parachain block data for the validators. + let collation_info = self + .fetch_collation_info(block_hash, &header) + .map_err(|e| { + tracing::error!( + target: LOG_TARGET, + error = ?e, + "Failed to collect collation info.", + ) + }) + .ok() + .flatten()?; + + let block_data = ParachainBlockData::::new(header, extrinsics, compact_proof); + + let pov = polkadot_node_primitives::maybe_compress_pov(PoV { + block_data: BlockData(block_data.encode()), + }); + + let upward_messages = collation_info + .upward_messages + .try_into() + .map_err(|e| { + tracing::error!( + target: LOG_TARGET, + error = ?e, + "Number of upward messages should not be greater than `MAX_UPWARD_MESSAGE_NUM`", + ) + }) + .ok()?; + let horizontal_messages = collation_info + .horizontal_messages + .try_into() + .map_err(|e| { + tracing::error!( + target: LOG_TARGET, + error = ?e, + "Number of horizontal messages should not be greater than `MAX_HORIZONTAL_MESSAGE_NUM`", + ) + }) + .ok()?; + + let collation = Collation { + upward_messages, + new_validation_code: collation_info.new_validation_code, + processed_downward_messages: collation_info.processed_downward_messages, + horizontal_messages, + hrmp_watermark: collation_info.hrmp_watermark, + head_data: collation_info.head_data, + proof_of_validity: MaybeCompressedPoV::Compressed(pov), + }; + + Some((collation, block_data)) + } + + /// Inform the networking systems that the block should be announced after an appropriate + /// signal has been received. This returns the sending half of the signal. + pub fn announce_with_barrier( + &self, + block_hash: Block::Hash, + ) -> oneshot::Sender { + let (result_sender, signed_stmt_recv) = oneshot::channel(); + self.wait_to_announce.lock().wait_to_announce(block_hash, signed_stmt_recv); + result_sender + } +} + +impl ServiceInterface for CollatorService +where + Block: BlockT, + BS: BlockBackend, + RA: ProvideRuntimeApi, + RA::Api: CollectCollationInfo, +{ + fn check_block_status(&self, hash: Block::Hash, header: &Block::Header) -> bool { + CollatorService::check_block_status(self, hash, header) + } + + fn build_collation( + &self, + parent_header: &Block::Header, + block_hash: Block::Hash, + candidate: ParachainCandidate, + ) -> Option<(Collation, ParachainBlockData)> { + CollatorService::build_collation(self, parent_header, block_hash, candidate) + } + + fn announce_with_barrier( + &self, + block_hash: Block::Hash, + ) -> oneshot::Sender { + CollatorService::announce_with_barrier(self, block_hash) + } +} diff --git a/client/consensus/aura/Cargo.toml b/client/consensus/aura/Cargo.toml index f284391494f..114e2ebed5b 100644 --- a/client/consensus/aura/Cargo.toml +++ b/client/consensus/aura/Cargo.toml @@ -27,8 +27,19 @@ sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-inherents = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" } +sp-timestamp = { git = "https://github.com/paritytech/substrate", branch = "master" } +sp-state-machine = { git = "https://github.com/paritytech/substrate", branch = "master" } substrate-prometheus-endpoint = { git = "https://github.com/paritytech/substrate", branch = "master" } # Cumulus cumulus-client-consensus-common = { path = "../common" } +cumulus-relay-chain-interface = { path = "../../relay-chain-interface" } +cumulus-client-consensus-proposer = { path = "../proposer" } cumulus-primitives-core = { path = "../../../primitives/core" } +cumulus-primitives-parachain-inherent = { path = "../../../primitives/parachain-inherent" } +cumulus-client-collator = { path = "../../collator" } + +# Polkadot +polkadot-primitives = { git = "https://github.com/paritytech/polkadot", branch = "master" } +polkadot-node-primitives = { git = "https://github.com/paritytech/polkadot", branch = "master" } +polkadot-overseer = { git = "https://github.com/paritytech/polkadot", branch = "master" } diff --git a/client/consensus/aura/src/lib.rs b/client/consensus/aura/src/lib.rs index b95b5ecf39d..416ae776da1 100644 --- a/client/consensus/aura/src/lib.rs +++ b/client/consensus/aura/src/lib.rs @@ -50,6 +50,8 @@ pub use import_queue::{build_verifier, import_queue, BuildVerifierParams, Import pub use sc_consensus_aura::{slot_duration, AuraVerifier, BuildAuraWorkerParams, SlotProportion}; pub use sc_consensus_slots::InherentDataProviderExt; +pub mod unstable_reimpl; + const LOG_TARGET: &str = "aura::cumulus"; /// The implementation of the AURA consensus for parachains. diff --git a/client/consensus/aura/src/unstable_reimpl.rs b/client/consensus/aura/src/unstable_reimpl.rs new file mode 100644 index 00000000000..f9602a363bf --- /dev/null +++ b/client/consensus/aura/src/unstable_reimpl.rs @@ -0,0 +1,529 @@ +// Copyright 2023 Parity Technologies (UK) Ltd. +// This file is part of Cumulus. + +// Cumulus is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Cumulus is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Cumulus. If not, see . + +//! The AuRa consensus algorithm for parachains. +//! +//! This extends the Substrate provided AuRa consensus implementation to make it compatible for +//! parachains. This provides the option to run a "bare" relay-chain driven Aura implementation, +//! but also exposes the core functionalities separately to be composed into more complex implementations. +//! +//! For more information about AuRa, the Substrate crate should be checked. + +use codec::{Decode, Encode}; +use cumulus_client_collator::service::ServiceInterface as CollatorServiceInterface; +use cumulus_client_consensus_common::{ParachainBlockImportMarker, ParachainCandidate}; +use cumulus_client_consensus_proposer::ProposerInterface; +use cumulus_primitives_core::{ + relay_chain::Hash as PHash, CollectCollationInfo, PersistedValidationData, +}; +use cumulus_primitives_parachain_inherent::ParachainInherentData; +use cumulus_relay_chain_interface::RelayChainInterface; + +use polkadot_node_primitives::{CollationResult, MaybeCompressedPoV}; +use polkadot_overseer::Handle as OverseerHandle; +use polkadot_primitives::{CollatorPair, Id as ParaId}; + +use futures::prelude::*; +use sc_client_api::{backend::AuxStore, BlockBackend, BlockOf}; +use sc_consensus::{ + import_queue::{BasicQueue, Verifier as VerifierT}, + BlockImport, BlockImportParams, ForkChoiceStrategy, StateAction, +}; +use sc_consensus_aura::standalone as aura_internal; +use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_DEBUG, CONSENSUS_TRACE}; +use sp_api::ProvideRuntimeApi; +use sp_application_crypto::AppPublic; +use sp_block_builder::BlockBuilder as BlockBuilderApi; +use sp_blockchain::HeaderBackend; +use sp_consensus::{error::Error as ConsensusError, BlockOrigin, SyncOracle}; +use sp_consensus_aura::{AuraApi, Slot, SlotDuration}; +use sp_core::crypto::Pair; +use sp_inherents::{CreateInherentDataProviders, InherentData, InherentDataProvider}; +use sp_keystore::KeystorePtr; +use sp_runtime::{ + generic::Digest, + traits::{Block as BlockT, HashFor, Header as HeaderT, Member}, +}; +use sp_state_machine::StorageChanges; +use std::{convert::TryFrom, error::Error, fmt::Debug, hash::Hash, sync::Arc, time::Duration}; + +/// Parameters for [`run_bare_relay_driven`]. +pub struct Params { + pub create_inherent_data_providers: CIDP, + pub block_import: BI, + pub para_client: Arc, + pub relay_client: Arc, + pub sync_oracle: SO, + pub keystore: KeystorePtr, + pub key: CollatorPair, + pub para_id: ParaId, + pub overseer_handle: OverseerHandle, + pub slot_duration: SlotDuration, + pub proposer: Proposer, + pub collator_service: CS, +} + +/// Run bare Aura consensus as a relay-chain-driven collator. +pub async fn run_bare_relay_driven( + params: Params, +) where + Block: BlockT, + Client: ProvideRuntimeApi + + BlockOf + + AuxStore + + HeaderBackend + + BlockBackend + + Send + + Sync + + 'static, + Client::Api: AuraApi + CollectCollationInfo, + RClient: RelayChainInterface, + CIDP: CreateInherentDataProviders + 'static, + BI: BlockImport + ParachainBlockImportMarker + Send + Sync + 'static, + SO: SyncOracle + Send + Sync + Clone + 'static, + Proposer: ProposerInterface, + Proposer::Transaction: Sync, + CS: CollatorServiceInterface, + P: Pair + Send + Sync, + P::Public: AppPublic + Hash + Member + Encode + Decode, + P::Signature: TryFrom> + Hash + Member + Encode + Decode, +{ + let mut proposer = params.proposer; + let mut block_import = params.block_import; + + let mut collation_requests = cumulus_client_collator::relay_chain_driven::init( + params.key, + params.para_id, + params.overseer_handle, + ) + .await; + + while let Some(request) = collation_requests.next().await { + macro_rules! reject_with_error { + ($err:expr) => {{ + request.complete(None); + tracing::error!(target: crate::LOG_TARGET, err = ?{ $err }); + continue; + }}; + } + + let validation_data = request.persisted_validation_data(); + + let parent_header = match Block::Header::decode(&mut &validation_data.parent_head.0[..]) { + Ok(x) => x, + Err(e) => reject_with_error!(e), + }; + + let parent_hash = parent_header.hash(); + + if !params.collator_service.check_block_status(parent_hash, &parent_header) { + continue + } + + let claim = match claim_slot::<_, _, P>( + &*params.para_client, + parent_hash, + params.slot_duration, + ¶ms.keystore, + ) + .await + { + Ok(None) => continue, + Ok(Some(c)) => c, + Err(e) => reject_with_error!(e), + }; + + let (parachain_inherent_data, other_inherent_data) = match create_inherent_data( + *request.relay_parent(), + &validation_data, + parent_hash, + params.para_id, + ¶ms.relay_client, + ¶ms.create_inherent_data_providers, + ) + .await + { + Ok(x) => x, + Err(e) => reject_with_error!(e), + }; + + let proposal = match proposer + .propose( + &parent_header, + ¶chain_inherent_data, + other_inherent_data, + Digest { logs: vec![claim.pre_digest] }, + // TODO [https://github.com/paritytech/cumulus/issues/2439] + // We should call out to a pluggable interface that provides + // the proposal duration. + Duration::from_millis(500), + // Set the block limit to 50% of the maximum PoV size. + // + // TODO: If we got benchmarking that includes the proof size, + // we should be able to use the maximum pov size. + Some((validation_data.max_pov_size / 2) as usize), + ) + .await + { + Ok(p) => p, + Err(e) => reject_with_error!(e), + }; + + let sealed_importable = match seal::<_, _, P>( + proposal.block, + proposal.storage_changes, + &claim.author_pub, + ¶ms.keystore, + ) { + Ok(s) => s, + Err(e) => reject_with_error!(e), + }; + + let post_hash = sealed_importable.post_hash(); + let block = Block::new( + sealed_importable.post_header(), + sealed_importable + .body + .as_ref() + .expect("body always created with this `propose` fn; qed") + .clone(), + ); + + if let Err(e) = block_import.import_block(sealed_importable).await { + reject_with_error!(e); + } + + let response = if let Some((collation, b)) = params.collator_service.build_collation( + &parent_header, + post_hash, + ParachainCandidate { block, proof: proposal.proof }, + ) { + tracing::info!( + target: crate::LOG_TARGET, + "PoV size {{ header: {}kb, extrinsics: {}kb, storage_proof: {}kb }}", + b.header().encode().len() as f64 / 1024f64, + b.extrinsics().encode().len() as f64 / 1024f64, + b.storage_proof().encode().len() as f64 / 1024f64, + ); + + if let MaybeCompressedPoV::Compressed(ref pov) = collation.proof_of_validity { + tracing::info!( + target: crate::LOG_TARGET, + "Compressed PoV size: {}kb", + pov.block_data.0.len() as f64 / 1024f64, + ); + } + + let result_sender = params.collator_service.announce_with_barrier(post_hash); + Some(CollationResult { collation, result_sender: Some(result_sender) }) + } else { + None + }; + + request.complete(response); + } +} + +fn slot_now(slot_duration: SlotDuration) -> Slot { + let timestamp = sp_timestamp::InherentDataProvider::from_system_time().timestamp(); + Slot::from_timestamp(timestamp, slot_duration) +} + +/// A claim on an Aura slot. +struct SlotClaim { + author_pub: Pub, + pre_digest: sp_runtime::DigestItem, +} + +async fn claim_slot( + client: &C, + parent_hash: B::Hash, + slot_duration: SlotDuration, + keystore: &KeystorePtr, +) -> Result>, Box> +where + B: BlockT, + C: ProvideRuntimeApi + Send + Sync + 'static, + C::Api: AuraApi, + P: Pair, + P::Public: Encode + Decode, + P::Signature: Encode + Decode, +{ + // load authorities + let authorities = client.runtime_api().authorities(parent_hash).map_err(Box::new)?; + + // Determine the current slot. + let slot_now = slot_now(slot_duration); + + // Try to claim the slot locally. + let author_pub = { + let res = aura_internal::claim_slot::

(slot_now, &authorities, keystore).await; + match res { + Some(p) => p, + None => return Ok(None), + } + }; + + // Produce the pre-digest. + let pre_digest = aura_internal::pre_digest::

(slot_now); + + Ok(Some(SlotClaim { author_pub, pre_digest })) +} + +async fn create_inherent_data( + relay_parent: PHash, + validation_data: &PersistedValidationData, + parent_hash: B::Hash, + para_id: ParaId, + relay_chain_interface: &impl RelayChainInterface, + create_inherent_data_providers: &impl CreateInherentDataProviders, +) -> Result<(ParachainInherentData, InherentData), Box> { + let paras_inherent_data = ParachainInherentData::create_at( + relay_parent, + relay_chain_interface, + validation_data, + para_id, + ) + .await; + + let paras_inherent_data = match paras_inherent_data { + Some(p) => p, + None => + return Err(format!("Could not create paras inherent data at {:?}", relay_parent).into()), + }; + + let other_inherent_data = create_inherent_data_providers + .create_inherent_data_providers(parent_hash, ()) + .map_err(|e| e as Box) + .await? + .create_inherent_data() + .await + .map_err(Box::new)?; + + Ok((paras_inherent_data, other_inherent_data)) +} + +fn seal( + pre_sealed: B, + storage_changes: StorageChanges>, + author_pub: &P::Public, + keystore: &KeystorePtr, +) -> Result, Box> +where + P: Pair, + P::Signature: Encode + Decode + TryFrom>, + P::Public: AppPublic, +{ + let (pre_header, body) = pre_sealed.deconstruct(); + let pre_hash = pre_header.hash(); + let block_number = *pre_header.number(); + + // seal the block. + let block_import_params = { + let seal_digest = + aura_internal::seal::<_, P>(&pre_hash, &author_pub, keystore).map_err(Box::new)?; + let mut block_import_params = BlockImportParams::new(BlockOrigin::Own, pre_header); + block_import_params.post_digests.push(seal_digest); + block_import_params.body = Some(body.clone()); + block_import_params.state_action = + StateAction::ApplyChanges(sc_consensus::StorageChanges::Changes(storage_changes)); + block_import_params.fork_choice = Some(ForkChoiceStrategy::LongestChain); + block_import_params + }; + let post_hash = block_import_params.post_hash(); + + tracing::info!( + target: crate::LOG_TARGET, + "🔖 Pre-sealed block for proposal at {}. Hash now {:?}, previously {:?}.", + block_number, + post_hash, + pre_hash, + ); + + Ok(block_import_params) +} + +struct Verifier { + client: Arc, + create_inherent_data_providers: CIDP, + slot_duration: SlotDuration, + telemetry: Option, + _marker: std::marker::PhantomData<(Block, P)>, +} + +#[async_trait::async_trait] +impl VerifierT for Verifier +where + P: Pair, + P::Signature: Encode + Decode, + P::Public: Encode + Decode + PartialEq + Clone + Debug, + Block: BlockT, + Client: ProvideRuntimeApi + Send + Sync, + >::Api: BlockBuilderApi + AuraApi, + + CIDP: CreateInherentDataProviders, +{ + async fn verify( + &mut self, + mut block_params: BlockImportParams, + ) -> Result, String> { + // Skip checks that include execution, if being told so, or when importing only state. + // + // This is done for example when gap syncing and it is expected that the block after the gap + // was checked/chosen properly, e.g. by warp syncing to this block using a finality proof. + if block_params.state_action.skip_execution_checks() || block_params.with_state() { + return Ok(block_params) + } + + let post_hash = block_params.header.hash(); + let parent_hash = *block_params.header.parent_hash(); + + // check seal and update pre-hash/post-hash + { + let authorities = aura_internal::fetch_authorities(self.client.as_ref(), parent_hash) + .map_err(|e| { + format!("Could not fetch authorities at {:?}: {}", parent_hash, e) + })?; + + let slot_now = slot_now(self.slot_duration); + let res = aura_internal::check_header_slot_and_seal::( + slot_now, + block_params.header, + &authorities, + ); + + match res { + Ok((pre_header, _slot, seal_digest)) => { + telemetry!( + self.telemetry; + CONSENSUS_TRACE; + "aura.checked_and_importing"; + "pre_header" => ?pre_header, + ); + + block_params.header = pre_header; + block_params.post_digests.push(seal_digest); + block_params.fork_choice = Some(ForkChoiceStrategy::LongestChain); + block_params.post_hash = Some(post_hash); + }, + Err(aura_internal::SealVerificationError::Deferred(hdr, slot)) => { + telemetry!( + self.telemetry; + CONSENSUS_DEBUG; + "aura.header_too_far_in_future"; + "hash" => ?post_hash, + "a" => ?hdr, + "b" => ?slot, + ); + + return Err(format!( + "Rejecting block ({:?}) from future slot {:?}", + post_hash, slot + )) + }, + Err(e) => + return Err(format!( + "Rejecting block ({:?}) with invalid seal ({:?})", + post_hash, e + )), + } + } + + // check inherents. + if let Some(body) = block_params.body.clone() { + let block = Block::new(block_params.header.clone(), body); + let create_inherent_data_providers = self + .create_inherent_data_providers + .create_inherent_data_providers(parent_hash, ()) + .await + .map_err(|e| format!("Could not create inherent data {:?}", e))?; + + let inherent_data = create_inherent_data_providers + .create_inherent_data() + .await + .map_err(|e| format!("Could not create inherent data {:?}", e))?; + + let inherent_res = self + .client + .runtime_api() + .check_inherents_with_context( + parent_hash, + block_params.origin.into(), + block, + inherent_data, + ) + .map_err(|e| format!("Unable to check block inherents {:?}", e))?; + + if !inherent_res.ok() { + for (i, e) in inherent_res.into_errors() { + match create_inherent_data_providers.try_handle_error(&i, &e).await { + Some(res) => res.map_err(|e| format!("Inherent Error {:?}", e))?, + None => + return Err(format!( + "Unknown inherent error, source {:?}", + String::from_utf8_lossy(&i[..]) + )), + } + } + } + } + + Ok(block_params) + } +} + +/// Start an import queue for a Cumulus node which checks blocks' seals and inherent data. +/// +/// Pass in only inherent data providers which don't include aura or parachain consensus inherents, +/// e.g. things like timestamp and custom inherents for the runtime. +/// +/// The others are generated explicitly internally. +/// +/// This should only be used for runtimes where the runtime does not check all inherents and +/// seals in `execute_block` (see ) +pub fn fully_verifying_import_queue( + client: Arc, + block_import: I, + create_inherent_data_providers: CIDP, + slot_duration: SlotDuration, + spawner: &impl sp_core::traits::SpawnEssentialNamed, + registry: Option<&substrate_prometheus_endpoint::Registry>, + telemetry: Option, +) -> BasicQueue +where + P: Pair, + P::Signature: Encode + Decode, + P::Public: Encode + Decode + PartialEq + Clone + Debug, + I: BlockImport + + ParachainBlockImportMarker + + Send + + Sync + + 'static, + I::Transaction: Send, + Client: ProvideRuntimeApi + Send + Sync + 'static, + >::Api: BlockBuilderApi + AuraApi, + CIDP: CreateInherentDataProviders + 'static, +{ + let verifier = Verifier:: { + client, + create_inherent_data_providers, + slot_duration, + telemetry, + _marker: std::marker::PhantomData, + }; + + BasicQueue::new(verifier, Box::new(block_import), None, spawner, registry) +} diff --git a/client/consensus/common/Cargo.toml b/client/consensus/common/Cargo.toml index d0bc28171fb..02b36320062 100644 --- a/client/consensus/common/Cargo.toml +++ b/client/consensus/common/Cargo.toml @@ -18,8 +18,10 @@ sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "mas sc-consensus = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-blockchain = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-consensus = { git = "https://github.com/paritytech/substrate", branch = "master" } +sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-trie = { git = "https://github.com/paritytech/substrate", branch = "master" } +substrate-prometheus-endpoint = { git = "https://github.com/paritytech/substrate", branch = "master" } # Polkadot polkadot-primitives = { git = "https://github.com/paritytech/polkadot", branch = "master" } diff --git a/client/consensus/common/src/import_queue.rs b/client/consensus/common/src/import_queue.rs new file mode 100644 index 00000000000..948fe065c42 --- /dev/null +++ b/client/consensus/common/src/import_queue.rs @@ -0,0 +1,77 @@ +// Copyright 2019-2023 Parity Technologies (UK) Ltd. +// This file is part of Cumulus. + +// Cumulus is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Cumulus is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Cumulus. If not, see . + +//! (unstable) Composable utilities for constructing import queues for parachains. +//! +//! Unlike standalone chains, parachains have the requirement that all consensus logic +//! must be checked within the runtime. This property means that work which is normally +//! done in the import queue per-block, such as checking signatures, quorums, and whether +//! inherent extrinsics were constructed faithfully do not need to be done, per se. +//! +//! It may seem that it would be beneficial for the client to do these checks regardless, +//! but in practice this means that clients would just reject blocks which are _valid_ according +//! to their Parachain Validation Function, which is the ultimate source of consensus truth. +//! +//! However, parachain runtimes expose two different access points for executing blocks +//! in full nodes versus executing those blocks in the parachain validation environment. +//! At the time of writing, the inherent and consensus checks in most Cumulus runtimes +//! are only performed during parachain validation, not full node block execution. +//! +//! See for details. + +use sp_consensus::error::Error as ConsensusError; +use sp_runtime::traits::Block as BlockT; + +use sc_consensus::{ + block_import::{BlockImport, BlockImportParams}, + import_queue::{BasicQueue, Verifier}, +}; + +use crate::ParachainBlockImportMarker; + +/// A [`Verifier`] for blocks which verifies absolutely nothing. +/// +/// This should only be used when the runtime is responsible for checking block seals and inherents. +pub struct VerifyNothing; + +#[async_trait::async_trait] +impl Verifier for VerifyNothing { + async fn verify( + &mut self, + params: BlockImportParams, + ) -> Result, String> { + Ok(params) + } +} + +/// An import queue which does no verification. +/// +/// This should only be used when the runtime is responsible for checking block seals and inherents. +pub fn verify_nothing_import_queue( + block_import: I, + spawner: &impl sp_core::traits::SpawnEssentialNamed, + registry: Option<&substrate_prometheus_endpoint::Registry>, +) -> BasicQueue +where + I: BlockImport + + ParachainBlockImportMarker + + Send + + Sync + + 'static, + I::Transaction: Send, +{ + BasicQueue::new(VerifyNothing, Box::new(block_import), None, spawner, registry) +} diff --git a/client/consensus/common/src/lib.rs b/client/consensus/common/src/lib.rs index 86a781adc01..b74829e191f 100644 --- a/client/consensus/common/src/lib.rs +++ b/client/consensus/common/src/lib.rs @@ -32,6 +32,8 @@ pub use parachain_consensus::run_parachain_consensus; use level_monitor::LevelMonitor; pub use level_monitor::{LevelLimit, MAX_LEAVES_PER_LEVEL_SENSIBLE_DEFAULT}; +pub mod import_queue; + /// The result of [`ParachainConsensus::produce_candidate`]. pub struct ParachainCandidate { /// The block that was built for this candidate. diff --git a/client/consensus/proposer/Cargo.toml b/client/consensus/proposer/Cargo.toml new file mode 100644 index 00000000000..7dff8d31950 --- /dev/null +++ b/client/consensus/proposer/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "cumulus-client-consensus-proposer" +description = "A Substrate `Proposer` for building parachain blocks" +version = "0.1.0" +authors = ["Parity Technologies "] +edition = "2021" + +[dependencies] +anyhow = "1.0" +async-trait = "0.1.68" +thiserror = "1.0.40" + +# Substrate +sp-consensus = { git = "https://github.com/paritytech/substrate", branch = "master" } +sp-inherents = { git = "https://github.com/paritytech/substrate", branch = "master" } +sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" } +sp-state-machine = { git = "https://github.com/paritytech/substrate", branch = "master" } + +# Cumulus +cumulus-primitives-parachain-inherent = { path = "../../../primitives/parachain-inherent" } diff --git a/client/consensus/proposer/src/lib.rs b/client/consensus/proposer/src/lib.rs new file mode 100644 index 00000000000..514ad58da74 --- /dev/null +++ b/client/consensus/proposer/src/lib.rs @@ -0,0 +1,137 @@ +// Copyright 2023 Parity Technologies (UK) Ltd. +// This file is part of Cumulus. + +// Cumulus is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Cumulus is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Cumulus. If not, see . + +//! The Cumulus [`Proposer`] is a wrapper around a Substrate [`sp_consensus::Environment`] +//! for creating new parachain blocks. +//! +//! This utility is designed to be composed within any collator consensus algorithm. + +use async_trait::async_trait; + +use cumulus_primitives_parachain_inherent::ParachainInherentData; +use sp_consensus::{EnableProofRecording, Environment, Proposal, Proposer as SubstrateProposer}; +use sp_inherents::InherentData; +use sp_runtime::{traits::Block as BlockT, Digest}; +use sp_state_machine::StorageProof; + +use std::{fmt::Debug, time::Duration}; + +/// Errors that can occur when proposing a parachain block. +#[derive(thiserror::Error, Debug)] +#[error(transparent)] +pub struct Error { + inner: anyhow::Error, +} + +impl Error { + /// Create an error tied to the creation of a proposer. + pub fn proposer_creation(err: impl Into) -> Self { + Error { inner: err.into().context("Proposer Creation") } + } + + /// Create an error tied to the proposing logic itself. + pub fn proposing(err: impl Into) -> Self { + Error { inner: err.into().context("Proposing") } + } +} + +/// A type alias for easily referring to the type of a proposal produced by a specific +/// [`Proposer`]. +pub type ProposalOf = Proposal>::Transaction, StorageProof>; + +/// An interface for proposers. +#[async_trait] +pub trait ProposerInterface { + /// The underlying DB transaction type produced with the block proposal. + type Transaction: Default + Send + 'static; + + /// Propose a collation using the supplied `InherentData` and the provided + /// `ParachainInherentData`. + /// + /// Also specify any required inherent digests, the maximum proposal duration, + /// and the block size limit in bytes. See the documentation on [`sp_consensus::Proposer::propose`] + /// for more details on how to interpret these parameters. + /// + /// The `InherentData` and `Digest` are left deliberately general in order to accommodate + /// all possible collator selection algorithms or inherent creation mechanisms, + /// while the `ParachainInherentData` is made explicit so it will be constructed appropriately. + /// + /// If the `InherentData` passed into this function already has a `ParachainInherentData`, + /// this should throw an error. + async fn propose( + &mut self, + parent_header: &Block::Header, + paras_inherent_data: &ParachainInherentData, + other_inherent_data: InherentData, + inherent_digests: Digest, + max_duration: Duration, + block_size_limit: Option, + ) -> Result, Error>; +} + +/// A simple wrapper around a Substrate proposer for creating collations. +pub struct Proposer { + inner: T, + _marker: std::marker::PhantomData, +} + +impl Proposer { + /// Create a new Cumulus [`Proposer`]. + pub fn new(inner: T) -> Self { + Proposer { inner, _marker: std::marker::PhantomData } + } +} + +#[async_trait] +impl ProposerInterface for Proposer +where + B: sp_runtime::traits::Block, + T: Environment + Send, + T::Error: Send + Sync + 'static, + T::Proposer: SubstrateProposer, + >::Error: Send + Sync + 'static, +{ + type Transaction = <>::Proposer as SubstrateProposer>::Transaction; + + async fn propose( + &mut self, + parent_header: &B::Header, + paras_inherent_data: &ParachainInherentData, + other_inherent_data: InherentData, + inherent_digests: Digest, + max_duration: Duration, + block_size_limit: Option, + ) -> Result, Error> { + let proposer = self + .inner + .init(parent_header) + .await + .map_err(|e| Error::proposer_creation(anyhow::Error::new(e)))?; + + let mut inherent_data = other_inherent_data; + inherent_data + .put_data( + cumulus_primitives_parachain_inherent::INHERENT_IDENTIFIER, + ¶s_inherent_data, + ) + .map_err(|e| Error::proposing(anyhow::Error::new(e)))?; + + proposer + .propose(inherent_data, inherent_digests, max_duration, block_size_limit) + .await + .map_err(|e| Error::proposing(anyhow::Error::new(e)).into()) + } +}