Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Switch to relay_dispatch_queue_remaining_capacity #2608

Merged
merged 3 commits into from
May 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
554 changes: 296 additions & 258 deletions Cargo.lock

Large diffs are not rendered by default.

10 changes: 5 additions & 5 deletions pallets/parachain-system/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,13 +233,13 @@ pub mod pallet {
};

<PendingUpwardMessages<T>>::mutate(|up| {
let (count, size) = relevant_messaging_state.relay_dispatch_queue_size;
let queue_size = relevant_messaging_state.relay_dispatch_queue_size;

let available_capacity = cmp::min(
host_config.max_upward_queue_count.saturating_sub(count),
host_config.max_upward_message_num_per_candidate,
queue_size.remaining_count,
host_config.max_upward_message_num_per_candidate.into(),
);
let available_size = host_config.max_upward_queue_size.saturating_sub(size);
let available_size = queue_size.remaining_size;

// Count the number of messages we can possibly fit in the given constraints, i.e.
// available_capacity and available_size.
Expand Down Expand Up @@ -431,7 +431,7 @@ pub mod pallet {
.read_abridged_host_configuration()
.expect("Invalid host configuration in relay chain state proof");
let relevant_messaging_state = relay_state_proof
.read_messaging_state_snapshot()
.read_messaging_state_snapshot(&host_config)
.expect("Invalid messaging state in relay chain state proof");

<ValidationData<T>>::put(&vfp);
Expand Down
54 changes: 44 additions & 10 deletions pallets/parachain-system/src/relay_state_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,17 @@ use sp_state_machine::{Backend, TrieBackend, TrieBackendBuilder};
use sp_std::vec::Vec;
use sp_trie::{HashDBT, MemoryDB, StorageProof, EMPTY_PREFIX};

/// The capacity of the upward message queue of a parachain on the relay chain.
// The field order should stay the same as the data can be found in the proof to ensure both are
// have the same encoded representation.
#[derive(Clone, Encode, Decode, TypeInfo, Default)]
pub struct RelayDispachQueueSize {
/// The number of additional messages that can be enqueued.
pub remaining_count: u32,
/// The total size of additional messages that can be enqueued.
pub remaining_size: u32,
}

/// A snapshot of some messaging related state of relay chain pertaining to the current parachain.
///
/// This data is essential for making sure that the parachain is aware of current resource use on
Expand All @@ -37,10 +48,7 @@ pub struct MessagingStateSnapshot {
pub dmq_mqc_head: relay_chain::Hash,

/// The current capacity of the upward message queue of the current parachain on the relay chain.
///
/// The capacity is represented by a tuple that consist of the `count` of the messages and the
/// `total_size` expressed as the sum of byte sizes of all messages in the queue.
pub relay_dispatch_queue_size: (u32, u32),
pub relay_dispatch_queue_size: RelayDispachQueueSize,

/// Information about all the inbound HRMP channels.
///
Expand Down Expand Up @@ -164,20 +172,46 @@ impl RelayChainStateProof {
/// Read the [`MessagingStateSnapshot`] from the relay chain state proof.
///
/// Returns an error if anything failed at reading or decoding.
pub fn read_messaging_state_snapshot(&self) -> Result<MessagingStateSnapshot, Error> {
pub fn read_messaging_state_snapshot(
&self,
host_config: &AbridgedHostConfiguration,
) -> Result<MessagingStateSnapshot, Error> {
let dmq_mqc_head: relay_chain::Hash = read_entry(
&self.trie_backend,
&relay_chain::well_known_keys::dmq_mqc_head(self.para_id),
Some(Default::default()),
)
.map_err(Error::DmqMqcHead)?;

let relay_dispatch_queue_size: (u32, u32) = read_entry(
let relay_dispatch_queue_size = read_optional_entry::<RelayDispachQueueSize, _>(
&self.trie_backend,
&relay_chain::well_known_keys::relay_dispatch_queue_size(self.para_id),
Some((0, 0)),
)
.map_err(Error::RelayDispatchQueueSize)?;
&relay_chain::well_known_keys::relay_dispatch_queue_remaining_capacity(self.para_id)
.key,
);

// TODO paritytech/polkadot#6283: Remove all usages of `relay_dispatch_queue_size`
//
// When the relay chain and all parachains support `relay_dispatch_queue_remaining_capacity`,
// this code here needs to be removed and above needs to be changed to `read_entry` that
// returns an error if `relay_dispatch_queue_remaining_capacity` can not be found/decoded.
//
// For now we just fallback to the old dispatch queue size if there is an error.
let relay_dispatch_queue_size = match relay_dispatch_queue_size {
Ok(Some(r)) => r,
_ => {
let res = read_entry::<(u32, u32), _>(
ggwpez marked this conversation as resolved.
Show resolved Hide resolved
&self.trie_backend,
#[allow(deprecated)]
&relay_chain::well_known_keys::relay_dispatch_queue_size(self.para_id),
Some((0, 0)),
)
.map_err(Error::RelayDispatchQueueSize)?;

let remaining_count = host_config.max_upward_queue_count.saturating_sub(res.0);
let remaining_size = host_config.max_upward_queue_size.saturating_sub(res.1);
RelayDispachQueueSize { remaining_count, remaining_size }
},
};

let ingress_channel_index: Vec<ParaId> = read_entry(
&self.trie_backend,
Expand Down
6 changes: 3 additions & 3 deletions pallets/parachain-system/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ fn send_upward_message_num_per_candidate() {
BlockTests::new()
.with_relay_sproof_builder(|_, _, sproof| {
sproof.host_config.max_upward_message_num_per_candidate = 1;
sproof.relay_dispatch_queue_size = None;
sproof.relay_dispatch_queue_remaining_capacity = None;
})
.add_with_post_test(
1,
Expand Down Expand Up @@ -544,8 +544,8 @@ fn send_upward_message_relay_bottleneck() {
sproof.host_config.max_upward_queue_count = 5;

match relay_block_num {
1 => sproof.relay_dispatch_queue_size = Some((5, 0)),
2 => sproof.relay_dispatch_queue_size = Some((4, 0)),
1 => sproof.relay_dispatch_queue_remaining_capacity = Some((0, 2048)),
2 => sproof.relay_dispatch_queue_remaining_capacity = Some((1, 2048)),
_ => unreachable!(),
}
})
Expand Down
2 changes: 2 additions & 0 deletions parachains/integration-tests/emulated/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ decl_test_relay_chains! {
RuntimeOrigin: polkadot_runtime::RuntimeOrigin,
RuntimeCall: polkadot_runtime::RuntimeCall,
RuntimeEvent: polkadot_runtime::RuntimeEvent,
MessageQueue: polkadot_runtime::MessageQueue,
XcmConfig: polkadot_runtime::xcm_config::XcmConfig,
SovereignAccountOf: polkadot_runtime::xcm_config::SovereignAccountOf,
System: polkadot_runtime::System,
Expand All @@ -41,6 +42,7 @@ decl_test_relay_chains! {
RuntimeOrigin: kusama_runtime::RuntimeOrigin,
RuntimeCall: polkadot_runtime::RuntimeCall,
RuntimeEvent: kusama_runtime::RuntimeEvent,
MessageQueue: polkadot_runtime::MessageQueue,
XcmConfig: kusama_runtime::xcm_config::XcmConfig,
SovereignAccountOf: kusama_runtime::xcm_config::SovereignAccountOf,
System: kusama_runtime::System,
Expand Down
4 changes: 4 additions & 0 deletions primitives/parachain-inherent/src/client_side.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,11 @@ async fn collect_relay_storage_proof(
relay_well_known_keys::CURRENT_SLOT.to_vec(),
relay_well_known_keys::ACTIVE_CONFIG.to_vec(),
relay_well_known_keys::dmq_mqc_head(para_id),
// TODO paritytech/polkadot#6283: Remove all usages of `relay_dispatch_queue_size`
// We need to keep this here until all parachains have migrated to `relay_dispatch_queue_remaining_capacity`.
#[allow(deprecated)]
relay_well_known_keys::relay_dispatch_queue_size(para_id),
relay_well_known_keys::relay_dispatch_queue_remaining_capacity(para_id).key,
relay_well_known_keys::hrmp_ingress_channel_index(para_id),
relay_well_known_keys::hrmp_egress_channel_index(para_id),
relay_well_known_keys::upgrade_go_ahead_signal(para_id),
Expand Down
11 changes: 7 additions & 4 deletions test/relay-sproof-builder/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub struct RelayStateSproofBuilder {
pub host_config: AbridgedHostConfiguration,
pub dmq_mqc_head: Option<relay_chain::Hash>,
pub upgrade_go_ahead: Option<UpgradeGoAhead>,
pub relay_dispatch_queue_size: Option<(u32, u32)>,
pub relay_dispatch_queue_remaining_capacity: Option<(u32, u32)>,
pub hrmp_ingress_channel_index: Option<Vec<ParaId>>,
pub hrmp_egress_channel_index: Option<Vec<ParaId>>,
pub hrmp_channels: BTreeMap<relay_chain::HrmpChannelId, AbridgedHrmpChannel>,
Expand All @@ -65,7 +65,7 @@ impl Default for RelayStateSproofBuilder {
},
dmq_mqc_head: None,
upgrade_go_ahead: None,
relay_dispatch_queue_size: None,
relay_dispatch_queue_remaining_capacity: None,
hrmp_ingress_channel_index: None,
hrmp_egress_channel_index: None,
hrmp_channels: BTreeMap::new(),
Expand Down Expand Up @@ -124,9 +124,12 @@ impl RelayStateSproofBuilder {
dmq_mqc_head.encode(),
);
}
if let Some(relay_dispatch_queue_size) = self.relay_dispatch_queue_size {
if let Some(relay_dispatch_queue_size) = self.relay_dispatch_queue_remaining_capacity {
insert(
relay_chain::well_known_keys::relay_dispatch_queue_size(self.para_id),
relay_chain::well_known_keys::relay_dispatch_queue_remaining_capacity(
self.para_id,
)
.key,
relay_dispatch_queue_size.encode(),
);
}
Expand Down
1 change: 1 addition & 0 deletions xcm/xcm-emulator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ sp-std = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-arithmetic = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-trie = { git = "https://github.com/paritytech/substrate", branch = "master" }
pallet-balances = { git = "https://github.com/paritytech/substrate", branch = "master" }
pallet-message-queue = { git = "https://github.com/paritytech/substrate", branch = "master" }

cumulus-primitives-core = { path = "../../primitives/core"}
cumulus-pallet-xcmp-queue = { path = "../../pallets/xcmp-queue" }
Expand Down
109 changes: 62 additions & 47 deletions xcm/xcm-emulator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ pub use casey::pascal;
pub use codec::Encode;
pub use frame_support::{
sp_runtime::BuildStorage,
traits::{Get, Hooks},
weights::Weight,
traits::{EnqueueMessage, Get, Hooks, ProcessMessage, ProcessMessageError, ServiceQueues},
weights::{Weight, WeightMeter},
};
pub use frame_system::AccountInfo;
pub use log;
Expand All @@ -41,13 +41,14 @@ pub use cumulus_primitives_core::{
pub use cumulus_primitives_parachain_inherent::ParachainInherentData;
pub use cumulus_test_relay_sproof_builder::RelayStateSproofBuilder;
pub use cumulus_test_service::get_account_id_from_seed;
pub use pallet_message_queue;
pub use parachain_info;
pub use parachains_common::{AccountId, BlockNumber};

pub use polkadot_primitives;
pub use polkadot_runtime_parachains::{
dmp,
ump::{MessageId, UmpSink, XcmSink},
inclusion::{AggregateMessageOrigin, UmpQueueId},
};
pub use std::{collections::HashMap, thread::LocalKey};
pub use xcm::{v3::prelude::*, VersionedXcm};
Expand Down Expand Up @@ -164,7 +165,7 @@ pub trait NetworkComponent<N: Network> {
}
}

pub trait RelayChain: UmpSink {
pub trait RelayChain: ProcessMessage {
type Runtime;
type RuntimeOrigin;
type RuntimeCall;
Expand Down Expand Up @@ -198,14 +199,15 @@ macro_rules! decl_test_relay_chains {
genesis = $genesis:expr,
on_init = $on_init:expr,
runtime = {
Runtime: $($runtime:tt)::*,
RuntimeOrigin: $($runtime_origin:tt)::*,
RuntimeCall: $($runtime_call:tt)::*,
RuntimeEvent: $($runtime_event:tt)::*,
XcmConfig: $($xcm_config:tt)::*,
SovereignAccountOf: $($sovereign_acc_of:tt)::*,
System: $($system:tt)::*,
Balances: $($balances:tt)::*,
Runtime: $runtime:path,
RuntimeOrigin: $runtime_origin:path,
RuntimeCall: $runtime_call:path,
RuntimeEvent: $runtime_event:path,
MessageQueue: $mq:path,
XcmConfig: $xcm_config:path,
SovereignAccountOf: $sovereign_acc_of:path,
System: $system:path,
Balances: $balances:path,
},
pallets_extra = {
$($pallet_name:ident: $pallet_path:path,)*
Expand All @@ -218,14 +220,14 @@ macro_rules! decl_test_relay_chains {
pub struct $name;

impl RelayChain for $name {
type Runtime = $($runtime)::*;
type RuntimeOrigin = $($runtime_origin)::*;
type RuntimeCall = $($runtime_call)::*;
type RuntimeEvent = $($runtime_event)::*;
type XcmConfig = $($xcm_config)::*;
type SovereignAccountOf = $($sovereign_acc_of)::*;
type System = $($system)::*;
type Balances = $($balances)::*;
type Runtime = $runtime;
type RuntimeOrigin = $runtime_origin;
type RuntimeCall = $runtime_call;
type RuntimeEvent = $runtime_event;
type XcmConfig = $xcm_config;
type SovereignAccountOf = $sovereign_acc_of;
type System = $system;
type Balances = $balances;
}

$crate::paste::paste! {
Expand All @@ -242,31 +244,43 @@ macro_rules! decl_test_relay_chains {
}
}

$crate::__impl_xcm_handlers_for_relay_chain!($name);
$crate::__impl_test_ext_for_relay_chain!($name, $genesis, $on_init);
)+
};
}
impl $crate::ProcessMessage for $name {
type Origin = $crate::ParaId;

#[macro_export]
macro_rules! __impl_xcm_handlers_for_relay_chain {
($name:ident) => {
impl $crate::UmpSink for $name {
fn process_upward_message(
origin: $crate::ParaId,
msg: &[u8],
max_weight: $crate::Weight,
) -> Result<$crate::Weight, ($crate::MessageId, $crate::Weight)> {
use $crate::{TestExt, UmpSink};

Self::execute_with(|| {
$crate::XcmSink::<
$crate::XcmExecutor<<Self as RelayChain>::XcmConfig>,
<Self as RelayChain>::Runtime,
>::process_upward_message(origin, msg, max_weight)
})
fn process_message(
msg: &[u8],
para: Self::Origin,
meter: &mut $crate::WeightMeter,
) -> Result<bool, $crate::ProcessMessageError> {
use $crate::{Weight, AggregateMessageOrigin, UmpQueueId, ServiceQueues, EnqueueMessage};
use $mq as message_queue;
use $runtime_event as runtime_event;

Self::execute_with(|| {
<$mq as EnqueueMessage<AggregateMessageOrigin>>::enqueue_message(
msg.try_into().expect("Message too long"),
AggregateMessageOrigin::Ump(UmpQueueId::Para(para.clone()))
);

<$system>::reset_events();
<$mq as ServiceQueues>::service_queues(Weight::MAX);
let events = <$system>::events();
let event = events.last().expect("There must be at least one event");

match &event.event {
runtime_event::MessageQueue(
$crate::pallet_message_queue::Event::Processed {origin, ..}) => {
assert_eq!(origin, &AggregateMessageOrigin::Ump(UmpQueueId::Para(para)));
},
event => panic!("Unexpected event: {:#?}", event),
}
Ok(true)
})
}
}
}

$crate::__impl_test_ext_for_relay_chain!($name, $genesis, $on_init);
)+
};
}

Expand Down Expand Up @@ -800,12 +814,13 @@ macro_rules! decl_test_networks {
}

fn _process_upward_messages() {
use $crate::{UmpSink, Bounded};
use $crate::{Bounded, ProcessMessage, WeightMeter};
while let Some((from_para_id, msg)) = $crate::UPWARD_MESSAGES.with(|b| b.borrow_mut().get_mut(stringify!($name)).unwrap().pop_front()) {
let _ = <$relay_chain>::process_upward_message(
from_para_id.into(),
let mut weight_meter = WeightMeter::max_limit();
let _ = <$relay_chain>::process_message(
&msg[..],
$crate::Weight::max_value(),
from_para_id.into(),
&mut weight_meter,
);
}
}
Expand Down