Skip to content

Commit

Permalink
Fix invalid messages relay delivery transactions (#1940)
Browse files Browse the repository at this point in the history
* + failing test: previous_nonces_are_selected_if_reorg_happens_at_target_chain

* maybe fix for the issue?

* another fix

* previous_nonces_are_selected_if_reorg_happens_at_target_chain for MessageDeliveryStrategy

* remove commented code

* spelling
  • Loading branch information
svyatonik committed Mar 7, 2023
1 parent 8ad152b commit c18a758
Show file tree
Hide file tree
Showing 3 changed files with 220 additions and 72 deletions.
110 changes: 96 additions & 14 deletions relays/messages/src/message_race_delivery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ struct MessageDeliveryStrategy<P: MessageLane, SC, TC> {
/// Latest confirmed nonces at the source client + the header id where we have first met this
/// nonce.
latest_confirmed_nonces_at_source: VecDeque<(SourceHeaderIdOf<P>, MessageNonce)>,
/// Target nonces from the source client.
/// Target nonces available at the **best** block of the target chain.
target_nonces: Option<TargetClientNonces<DeliveryRaceTargetNoncesData>>,
/// Basic delivery strategy.
strategy: MessageDeliveryStrategyBase<P>,
Expand Down Expand Up @@ -387,13 +387,11 @@ where
race_state: &mut RaceState<SourceHeaderIdOf<P>, TargetHeaderIdOf<P>, P::MessagesProof>,
) {
// best target nonces must always be ge than finalized target nonces
let mut target_nonces = self.target_nonces.take().unwrap_or_else(|| nonces.clone());
target_nonces.nonces_data = nonces.nonces_data.clone();
target_nonces.latest_nonce = std::cmp::max(target_nonces.latest_nonce, nonces.latest_nonce);
self.target_nonces = Some(target_nonces);
let latest_nonce = nonces.latest_nonce;
self.target_nonces = Some(nonces);

self.strategy.best_target_nonces_updated(
TargetClientNonces { latest_nonce: nonces.latest_nonce, nonces_data: () },
TargetClientNonces { latest_nonce, nonces_data: () },
race_state,
)
}
Expand Down Expand Up @@ -432,14 +430,16 @@ where
&self,
race_state: RaceState<SourceHeaderIdOf<P>, TargetHeaderIdOf<P>, P::MessagesProof>,
) -> Option<(RangeInclusive<MessageNonce>, Self::ProofParameters)> {
let best_target_nonce = self.strategy.best_at_target()?;
let best_finalized_source_header_id_at_best_target =
race_state.best_finalized_source_header_id_at_best_target.clone()?;
let latest_confirmed_nonce_at_source = self
.latest_confirmed_nonces_at_source
.iter()
.take_while(|(id, _)| id.0 <= best_finalized_source_header_id_at_best_target.0)
.last()
.map(|(_, nonce)| *nonce)?;
.map(|(_, nonce)| *nonce)
.unwrap_or(best_target_nonce);
let target_nonces = self.target_nonces.as_ref()?;

// There's additional condition in the message delivery race: target would reject messages
Expand Down Expand Up @@ -529,8 +529,8 @@ where
let lane_source_client = self.lane_source_client.clone();
let lane_target_client = self.lane_target_client.clone();

let maximal_source_queue_index =
self.strategy.maximal_available_source_queue_index(race_state)?;
let available_source_queue_indices =
self.strategy.available_source_queue_indices(race_state)?;
let source_queue = self.strategy.source_queue();

let reference = RelayMessagesBatchReference {
Expand All @@ -539,15 +539,13 @@ where
max_messages_size_in_single_batch,
lane_source_client: lane_source_client.clone(),
lane_target_client: lane_target_client.clone(),
best_target_nonce,
nonces_queue: source_queue.clone(),
nonces_queue_range: 0..maximal_source_queue_index + 1,
nonces_queue_range: available_source_queue_indices,
metrics: self.metrics_msg.clone(),
};

let range_end = MessageRaceLimits::decide(reference).await?;

let range_begin = source_queue[0].1.begin();
let selected_nonces = range_begin..=range_end;
let selected_nonces = MessageRaceLimits::decide(reference).await?;
let dispatch_weight = self.dispatch_weight_for_range(&selected_nonces);

Some((
Expand Down Expand Up @@ -1057,4 +1055,88 @@ mod tests {
);
assert_eq!(strategy.required_source_header_at_target(&source_header_id), None);
}

#[async_std::test]
async fn previous_nonces_are_selected_if_reorg_happens_at_target_chain() {
// this is the copy of the similar test in the `mesage_race_strategy.rs`, but it also tests
// that the `MessageDeliveryStrategy` acts properly in the similar scenario

// tune parameters to allow 5 nonces per delivery transaction
let (mut state, mut strategy) = prepare_strategy();
strategy.max_unrewarded_relayer_entries_at_target = 5;
strategy.max_unconfirmed_nonces_at_target = 5;
strategy.max_messages_in_single_batch = 5;
strategy.max_messages_weight_in_single_batch = Weight::from_parts(5, 0);
strategy.max_messages_size_in_single_batch = 5;

// in this state we have 4 available nonces for delivery
assert_eq!(
strategy.select_nonces_to_deliver(state.clone()).await,
Some((
20..=23,
MessageProofParameters {
outbound_state_proof_required: false,
dispatch_weight: Weight::from_parts(4, 0),
}
)),
);

// let's say we have submitted 20..=23
state.nonces_submitted = Some(20..=23);

// then new nonce 24 appear at the source block 2
let new_nonce_24 = vec![(
24,
MessageDetails { dispatch_weight: Weight::from_parts(1, 0), size: 0, reward: 0 },
)]
.into_iter()
.collect();
let source_header_2 = header_id(2);
state.best_finalized_source_header_id_at_source = Some(source_header_2);
strategy.source_nonces_updated(
source_header_2,
SourceClientNonces { new_nonces: new_nonce_24, confirmed_nonce: None },
);
// and nonce 23 appear at the best block of the target node (best finalized still has 0
// nonces)
let target_nonces_data = DeliveryRaceTargetNoncesData {
confirmed_nonce: 19,
unrewarded_relayers: UnrewardedRelayersState::default(),
};
let target_header_2 = header_id(2);
state.best_target_header_id = Some(target_header_2);
strategy.best_target_nonces_updated(
TargetClientNonces { latest_nonce: 23, nonces_data: target_nonces_data.clone() },
&mut state,
);

// then best target header is retracted
strategy.best_target_nonces_updated(
TargetClientNonces { latest_nonce: 19, nonces_data: target_nonces_data.clone() },
&mut state,
);

// ... and some fork with 19 delivered nonces is finalized
let target_header_2_fork = header_id(2_1);
state.best_finalized_source_header_id_at_source = Some(source_header_2);
state.best_finalized_source_header_id_at_best_target = Some(source_header_2);
state.best_target_header_id = Some(target_header_2_fork);
state.best_finalized_target_header_id = Some(target_header_2_fork);
strategy.finalized_target_nonces_updated(
TargetClientNonces { latest_nonce: 19, nonces_data: target_nonces_data.clone() },
&mut state,
);

// now we have to select nonces 20..=23 for delivery again
assert_eq!(
strategy.select_nonces_to_deliver(state.clone()).await,
Some((
20..=24,
MessageProofParameters {
outbound_state_proof_required: false,
dispatch_weight: Weight::from_parts(5, 0),
}
)),
);
}
}
20 changes: 13 additions & 7 deletions relays/messages/src/message_race_limits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
//! enforcement strategy

use num_traits::Zero;
use std::ops::Range;
use std::ops::RangeInclusive;

use bp_messages::{MessageNonce, Weight};

Expand Down Expand Up @@ -76,14 +76,17 @@ pub struct RelayMessagesBatchReference<
pub lane_target_client: TargetClient,
/// Metrics reference.
pub metrics: Option<MessageLaneLoopMetrics>,
/// Best available nonce at the **best** target block. We do not want to deliver nonces
/// less than this nonce, even though the block may be retracted.
pub best_target_nonce: MessageNonce,
/// Source queue.
pub nonces_queue: SourceRangesQueue<
P::SourceHeaderHash,
P::SourceHeaderNumber,
MessageDetailsMap<P::SourceChainBalance>,
>,
/// Source queue range
pub nonces_queue_range: Range<usize>,
/// Range of indices within the `nonces_queue` that are available for selection.
pub nonces_queue_range: RangeInclusive<usize>,
}

/// Limits of the message race transactions.
Expand All @@ -97,14 +100,16 @@ impl MessageRaceLimits {
TargetClient: MessageLaneTargetClient<P>,
>(
reference: RelayMessagesBatchReference<P, SourceClient, TargetClient>,
) -> Option<MessageNonce> {
) -> Option<RangeInclusive<MessageNonce>> {
let mut hard_selected_count = 0;

let mut selected_weight = Weight::zero();
let mut selected_count: MessageNonce = 0;

let hard_selected_begin_nonce =
reference.nonces_queue[reference.nonces_queue_range.start].1.begin();
let hard_selected_begin_nonce = std::cmp::max(
reference.best_target_nonce + 1,
reference.nonces_queue[*reference.nonces_queue_range.start()].1.begin(),
);

// relay reference
let mut relay_reference = RelayReference {
Expand All @@ -129,6 +134,7 @@ impl MessageRaceLimits {
.nonces_queue
.range(reference.nonces_queue_range.clone())
.flat_map(|(_, ready_nonces)| ready_nonces.iter())
.filter(|(nonce, _)| **nonce >= hard_selected_begin_nonce)
.enumerate();
for (index, (nonce, details)) in all_ready_nonces {
relay_reference.index = index;
Expand Down Expand Up @@ -192,7 +198,7 @@ impl MessageRaceLimits {
if hard_selected_count != 0 {
let selected_max_nonce =
hard_selected_begin_nonce + hard_selected_count as MessageNonce - 1;
Some(selected_max_nonce)
Some(hard_selected_begin_nonce..=selected_max_nonce)
} else {
None
}
Expand Down
Loading

0 comments on commit c18a758

Please sign in to comment.