Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
dmp-queue: Store messages if already processed more than the maximum (#…
Browse files Browse the repository at this point in the history
…2343) (#2347)

* dmp-queue: Store messages if already processed more than the maximum

* Put new event at the end

Co-authored-by: Bastian Köcher <git@kchr.de>
  • Loading branch information
EgorPopelyaev and bkchr committed Mar 20, 2023
1 parent b983cf5 commit 7d3a913
Showing 1 changed file with 81 additions and 39 deletions.
120 changes: 81 additions & 39 deletions pallets/dmp-queue/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,8 @@ pub mod pallet {
},
/// Downward message from the overweight queue was executed.
OverweightServiced { overweight_index: OverweightIndex, weight_used: Weight },
/// The maximum number of downward messages was.
MaxMessagesExhausted { message_id: MessageId },
}

impl<T: Config> Pallet<T> {
Expand Down Expand Up @@ -307,46 +309,53 @@ pub mod pallet {
};

for (i, (sent_at, data)) in iter.enumerate() {
if messages_processed >= MAX_MESSAGES_PER_BLOCK {
break
}
if maybe_enqueue_page.is_none() {
// We're not currently enqueuing - try to execute inline.
let remaining_weight = limit.saturating_sub(used);
messages_processed += 1;
match Self::try_service_message(remaining_weight, sent_at, &data[..]) {
Ok(consumed) => used += consumed,
Err((message_id, required_weight)) =>
// Too much weight required right now.
{
let is_under_limit = Overweight::<T>::count() < MAX_OVERWEIGHT_MESSAGES;
used.saturating_accrue(T::DbWeight::get().reads(1));
if required_weight.any_gt(config.max_individual) && is_under_limit {
// overweight - add to overweight queue and continue with
// message execution.
let overweight_index = page_index.overweight_count;
Overweight::<T>::insert(overweight_index, (sent_at, data));
Self::deposit_event(Event::OverweightEnqueued {
message_id,
overweight_index,
required_weight,
});
page_index.overweight_count += 1;
// Not needed for control flow, but only to ensure that the compiler
// understands that we won't attempt to re-use `data` later.
continue
} else {
// not overweight. stop executing inline and enqueue normally
// from here on.
let item_count_left = item_count.saturating_sub(i);
maybe_enqueue_page = Some(Vec::with_capacity(item_count_left));
Self::deposit_event(Event::WeightExhausted {
message_id,
remaining_weight,
required_weight,
});
}
},
if messages_processed >= MAX_MESSAGES_PER_BLOCK {
let item_count_left = item_count.saturating_sub(i);
maybe_enqueue_page = Some(Vec::with_capacity(item_count_left));

Self::deposit_event(Event::MaxMessagesExhausted {
message_id: sp_io::hashing::blake2_256(&data),
});
} else {
// We're not currently enqueuing - try to execute inline.
let remaining_weight = limit.saturating_sub(used);
messages_processed += 1;
match Self::try_service_message(remaining_weight, sent_at, &data[..]) {
Ok(consumed) => used += consumed,
Err((message_id, required_weight)) =>
// Too much weight required right now.
{
let is_under_limit =
Overweight::<T>::count() < MAX_OVERWEIGHT_MESSAGES;
used.saturating_accrue(T::DbWeight::get().reads(1));
if required_weight.any_gt(config.max_individual) && is_under_limit {
// overweight - add to overweight queue and continue with
// message execution.
let overweight_index = page_index.overweight_count;
Overweight::<T>::insert(overweight_index, (sent_at, data));
Self::deposit_event(Event::OverweightEnqueued {
message_id,
overweight_index,
required_weight,
});
page_index.overweight_count += 1;
// Not needed for control flow, but only to ensure that the compiler
// understands that we won't attempt to re-use `data` later.
continue
} else {
// not overweight. stop executing inline and enqueue normally
// from here on.
let item_count_left = item_count.saturating_sub(i);
maybe_enqueue_page = Some(Vec::with_capacity(item_count_left));
Self::deposit_event(Event::WeightExhausted {
message_id,
remaining_weight,
required_weight,
});
}
},
}
}
}
// Cannot be an `else` here since the `maybe_enqueue_page` may have changed.
Expand Down Expand Up @@ -889,4 +898,37 @@ mod tests {
assert_eq!(pages_queued(), 1);
});
}

#[test]
fn handle_max_messages_per_block() {
new_test_ext().execute_with(|| {
enqueue(&vec![msg(1000), msg(1001)]);
enqueue(&vec![msg(1002), msg(1003)]);
enqueue(&vec![msg(1004), msg(1005)]);

let incoming = (0..MAX_MESSAGES_PER_BLOCK)
.into_iter()
.map(|i| msg(1006 + i as u64))
.collect::<Vec<_>>();
handle_messages(&incoming, Weight::from_parts(25000, 25000));

assert_eq!(
take_trace(),
(0..MAX_MESSAGES_PER_BLOCK)
.into_iter()
.map(|i| msg_complete(1000 + i as u64))
.collect::<Vec<_>>(),
);
assert_eq!(pages_queued(), 1);

handle_messages(&[], Weight::from_parts(25000, 25000));
assert_eq!(
take_trace(),
(MAX_MESSAGES_PER_BLOCK..MAX_MESSAGES_PER_BLOCK + 6)
.into_iter()
.map(|i| msg_complete(1000 + i as u64))
.collect::<Vec<_>>(),
);
});
}
}

0 comments on commit 7d3a913

Please sign in to comment.