Skip to content

Commit

Permalink
Fix Hermes retrying mechanism not regenerating messages (#1951)
Browse files Browse the repository at this point in the history
* Sketching out refactor

* Only relay operational data if `clear_interval` == 0

* Back to making changes to `process_pending`

* Pass `clear_interval` parameter to `process_pending` fn

* Add RelayPath parameter to `process_pending` fn

* Make `regenerate_operational_data` private.

* Call `process_pending` such that operational data can now be regenerated

* Fix clippy warnings

* Remove unnecessary comment

* Add changelog entry

* Update doc comment for `regenerate_operational_data` method

* Replace `clear_interval` param with `do_resubmit` in fn signatures

* Improve doc comments for the `process_pending` fn

* Introduce `Resubmit` type instead of boolean

* Document the interaction between `clear_interval` and `tx_confirmation` parameters

* Fix incorrect comment

* Switch from if on `Resubmit` to match

* Fix Queue::push_back method

Co-authored-by: Romain Ruetschi <romain@informal.systems>
  • Loading branch information
seanchen1991 and romac committed Apr 20, 2022
1 parent 8b5f0d3 commit 3e7b276
Show file tree
Hide file tree
Showing 8 changed files with 109 additions and 40 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
- Fixed Hermes retrying mechanism not regenerating operational data for messages ([#1792](https://github.com/informalsystems/ibc-rs/pull/1951))
4 changes: 3 additions & 1 deletion config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ clear_on_start = true
# Toggle the transaction confirmation mechanism.
# The tx confirmation mechanism periodically queries the `/tx_search` RPC
# endpoint to check that previously-submitted transactions
# (to any chain in this config file) have delivered successfully.
# (to any chain in this config file) have been successfully delivered.
# If they have not been, and `clear_interval = 0`, then those packets are
# queued up for re-submission.
# Experimental feature. Affects telemetry if set to false.
# [Default: true]
tx_confirmation = true
Expand Down
4 changes: 3 additions & 1 deletion relayer/src/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ use crate::chain::counterparty::check_channel_counterparty;
use crate::chain::handle::ChainHandle;
use crate::channel::{Channel, ChannelSide};
use crate::link::error::LinkError;
use crate::link::relay_path::RelayPath;

pub mod cli;
pub mod error;
pub mod operational_data;

mod pending;
mod relay_path;
mod relay_sender;
Expand All @@ -26,6 +26,8 @@ use tx_hashes::TxHashes;
// Re-export the telemetries summary
pub use relay_summary::RelaySummary;

pub use relay_path::{RelayPath, Resubmit};

#[derive(Clone, Debug)]
pub struct LinkParameters {
pub src_port_id: PortId,
Expand Down
52 changes: 38 additions & 14 deletions relayer/src/link/pending.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use ibc::events::IbcEvent;
use ibc::query::{QueryTxHash, QueryTxRequest};

use crate::error::Error as RelayerError;
use crate::link::error::LinkError;
use crate::link::{error::LinkError, RelayPath};
use crate::util::queue::Queue;
use crate::{
chain::handle::ChainHandle,
Expand Down Expand Up @@ -65,7 +65,7 @@ impl<Chain: ChainHandle> PendingTxs<Chain> {
self.chain.id()
}

// Insert new pending transaction to the back of the queue.
/// Insert a new pending transaction to the back of the queue.
pub fn insert_new_pending_tx(&self, r: AsyncReply, od: OperationalData) {
let mut tx_hashes = Vec::new();
let mut error_events = Vec::new();
Expand Down Expand Up @@ -133,11 +133,17 @@ impl<Chain: ChainHandle> PendingTxs<Chain> {
Ok(Some(all_events))
}

/// Try and process one pending transaction if available.
pub fn process_pending(
/// Try and process one pending transaction within the given timeout duration if one
/// is available.
///
/// A `resubmit` closure is provided when the clear interval for packets is 0. If this closure
/// is provided, the pending transactions that fail to process within the given timeout duration
/// are resubmitted following the logic specified by the closure.
pub fn process_pending<ChainA: ChainHandle, ChainB: ChainHandle>(
&self,
timeout: Duration,
resubmit: impl FnOnce(OperationalData) -> Result<AsyncReply, LinkError>,
relay_path: &RelayPath<ChainA, ChainB>,
resubmit: Option<impl FnOnce(OperationalData) -> Result<AsyncReply, LinkError>>,
) -> Result<Option<RelaySummary>, LinkError> {
// We process pending transactions in a FIFO manner, so take from
// the front of the queue.
Expand Down Expand Up @@ -177,16 +183,34 @@ impl<Chain: ChainHandle> PendingTxs<Chain> {
// relayer to resubmit the transaction to the chain again.
error!("timed out while confirming {}", tx_hashes);

let resubmit_res = resubmit(pending.original_od.clone());

match resubmit_res {
Ok(reply) => {
self.insert_new_pending_tx(reply, pending.original_od);
Ok(None)
match resubmit {
Some(f) => {
// The pending tx needs to be resubmitted. This involves replacing the tx's
// stale operational data with a fresh copy and then applying the `resubmit`
// closure to it.
let new_od = relay_path
.regenerate_operational_data(pending.original_od.clone());

trace!("regenerated operational data for {}", tx_hashes);

match new_od.map(f) {
Some(Ok(reply)) => {
self.insert_new_pending_tx(reply, pending.original_od);
Ok(None)
}
Some(Err(e)) => {
self.pending_queue.push_back(pending);
Err(e)
}
None => {
// No operational data was regenerated; nothing to resubmit
Ok(None)
}
}
}
Err(e) => {
self.pending_queue.push_back(pending);
Err(e)
None => {
// `clear_interval != 0` such that resubmission has been disabled
Ok(None)
}
}
} else {
Expand Down
62 changes: 48 additions & 14 deletions relayer/src/link/relay_path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,27 @@ use crate::util::queue::Queue;

const MAX_RETRIES: usize = 5;

/// Whether or not to resubmit packets when pending transactions
/// fail to process within the given timeout duration.
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum Resubmit {
Yes,
No,
}

impl Resubmit {
/// Packet resubmission is enabled when the clear interval for packets is 0. Otherwise,
/// when the packet clear interval is > 0, the relayer will periodically clear unsent packets
/// such that resubmitting packets is not necessary.
pub fn from_clear_interval(clear_interval: u64) -> Self {
if clear_interval == 0 {
Self::Yes
} else {
Self::No
}
}
}

pub struct RelayPath<ChainA: ChainHandle, ChainB: ChainHandle> {
channel: Channel<ChainA, ChainB>,

Expand Down Expand Up @@ -557,8 +578,8 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> RelayPath<ChainA, ChainB> {
Ok(S::Reply::empty())
}

/// Helper for managing retries of the `relay_from_operational_data` method.
/// Expects as input the initial operational data that failed to send.
/// Generates fresh operational data for a tx given the initial operational data
/// that failed to send.
///
/// Return value:
/// - `Some(..)`: a new operational data from which to retry sending,
Expand All @@ -567,7 +588,7 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> RelayPath<ChainA, ChainB> {
///
/// Side effects: may schedule a new operational data targeting the source chain, comprising
/// new timeout messages.
fn regenerate_operational_data(
pub(crate) fn regenerate_operational_data(
&self,
initial_odata: OperationalData,
) -> Option<OperationalData> {
Expand Down Expand Up @@ -1371,17 +1392,20 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> RelayPath<ChainA, ChainB> {
Ok(())
}

pub fn process_pending_txs(&self) -> RelaySummary {
/// Kicks off the process of relaying pending txs to the source and destination chains.
///
/// See [`Resubmit::from_clear_interval`] for more info about the `resubmit` parameter.
pub fn process_pending_txs(&self, resubmit: Resubmit) -> RelaySummary {
if !self.confirm_txes {
return RelaySummary::empty();
}

let mut summary_src = self.process_pending_txs_src().unwrap_or_else(|e| {
let mut summary_src = self.process_pending_txs_src(resubmit).unwrap_or_else(|e| {
error!("error processing pending events in source chain: {}", e);
RelaySummary::empty()
});

let summary_dst = self.process_pending_txs_dst().unwrap_or_else(|e| {
let summary_dst = self.process_pending_txs_dst(resubmit).unwrap_or_else(|e| {
error!(
"error processing pending events in destination chain: {}",
e
Expand All @@ -1393,23 +1417,33 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> RelayPath<ChainA, ChainB> {
summary_src
}

fn process_pending_txs_src(&self) -> Result<RelaySummary, LinkError> {
fn process_pending_txs_src(&self, resubmit: Resubmit) -> Result<RelaySummary, LinkError> {
let do_resubmit = match resubmit {
Resubmit::Yes => {
Some(|odata| self.relay_from_operational_data::<relay_sender::AsyncSender>(odata))
}
Resubmit::No => None,
};

let res = self
.pending_txs_src
.process_pending(pending::TIMEOUT, |odata| {
self.relay_from_operational_data::<relay_sender::AsyncSender>(odata)
})?
.process_pending(pending::TIMEOUT, self, do_resubmit)?
.unwrap_or_else(RelaySummary::empty);

Ok(res)
}

fn process_pending_txs_dst(&self) -> Result<RelaySummary, LinkError> {
fn process_pending_txs_dst(&self, resubmit: Resubmit) -> Result<RelaySummary, LinkError> {
let do_resubmit = match resubmit {
Resubmit::Yes => {
Some(|odata| self.relay_from_operational_data::<relay_sender::AsyncSender>(odata))
}
Resubmit::No => None,
};

let res = self
.pending_txs_dst
.process_pending(pending::TIMEOUT, |odata| {
self.relay_from_operational_data::<relay_sender::AsyncSender>(odata)
})?
.process_pending(pending::TIMEOUT, self, do_resubmit)?
.unwrap_or_else(RelaySummary::empty);

Ok(res)
Expand Down
12 changes: 6 additions & 6 deletions relayer/src/util/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ use std::sync::{Arc, RwLock};

use crate::util::lock::LockExt;

// A lightweight wrapper type to RefCell<VecDeque<T>> so that
// we can safely mutate it with regular reference instead of
// mutable reference. We only expose subset of VecDeque methods
// that does not return any inner reference, so that the RefCell
// can never panic caused by simultaneous borrow and borrow_mut.
/// A lightweight wrapper type to RefCell<VecDeque<T>> so that
/// we can safely mutate it with regular reference instead of
/// mutable reference. We only expose subset of VecDeque methods
/// that does not return any inner reference, so that the RefCell
/// can never panic caused by simultaneous borrow and borrow_mut.
pub struct Queue<T>(Arc<RwLock<VecDeque<T>>>);

impl<T> Queue<T> {
Expand All @@ -24,7 +24,7 @@ impl<T> Queue<T> {
}

pub fn push_back(&self, val: T) {
self.0.acquire_write().push_front(val)
self.0.acquire_write().push_back(val)
}

pub fn push_front(&self, val: T) {
Expand Down
5 changes: 3 additions & 2 deletions relayer/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::sync::Mutex;
use tracing::error;

use crate::foreign_client::ForeignClient;
use crate::link::{Link, LinkParameters};
use crate::link::{Link, LinkParameters, Resubmit};
use crate::{
chain::handle::{ChainHandle, ChainHandlePair},
config::Config,
Expand Down Expand Up @@ -125,6 +125,7 @@ pub fn spawn_worker_tasks<ChainA: ChainHandle, ChainB: ChainHandle>(
Ok(link) => {
let (cmd_tx, cmd_rx) = crossbeam_channel::unbounded();
let link = Arc::new(Mutex::new(link));
let resubmit = Resubmit::from_clear_interval(packets_config.clear_interval);
let packet_task = packet::spawn_packet_cmd_worker(
cmd_rx,
link.clone(),
Expand All @@ -134,7 +135,7 @@ pub fn spawn_worker_tasks<ChainA: ChainHandle, ChainB: ChainHandle>(
);
task_handles.push(packet_task);

let link_task = packet::spawn_packet_worker(path.clone(), link);
let link_task = packet::spawn_packet_worker(path.clone(), link, resubmit);
task_handles.push(link_task);

(Some(cmd_tx), None)
Expand Down
9 changes: 7 additions & 2 deletions relayer/src/worker/packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use ibc::Height;

use crate::chain::handle::ChainHandle;
use crate::foreign_client::HasExpiredOrFrozenError;
use crate::link::Resubmit;
use crate::link::{error::LinkError, Link};
use crate::object::Packet;
use crate::telemetry;
Expand Down Expand Up @@ -52,10 +53,13 @@ fn handle_link_error_in_task(e: LinkError) -> TaskError<RunError> {
}
}

/// Spawns a packet worker task in the background that handles the work of
/// processing pending txs between `ChainA` and `ChainB`.
pub fn spawn_packet_worker<ChainA: ChainHandle, ChainB: ChainHandle>(
path: Packet,
// Mutex is used to prevent race condition between the packet workers
link: Arc<Mutex<Link<ChainA, ChainB>>>,
resubmit: Resubmit,
) -> TaskHandle {
let span = {
let relay_path = &link.lock().unwrap().a_to_b;
Expand All @@ -79,7 +83,7 @@ pub fn spawn_packet_worker<ChainA: ChainHandle, ChainB: ChainHandle>(
.execute_schedule()
.map_err(handle_link_error_in_task)?;

let summary = relay_path.process_pending_txs();
let summary = relay_path.process_pending_txs(resubmit);

if !summary.is_empty() {
trace!("packet worker produced relay summary: {}", summary);
Expand Down Expand Up @@ -218,7 +222,8 @@ fn handle_packet_cmd<ChainA: ChainHandle, ChainB: ChainHandle>(
}
}

let summary = link.a_to_b.process_pending_txs();
let resubmit = Resubmit::from_clear_interval(clear_interval);
let summary = link.a_to_b.process_pending_txs(resubmit);

if !summary.is_empty() {
trace!("produced relay summary: {:?}", summary);
Expand Down

0 comments on commit 3e7b276

Please sign in to comment.