diff --git a/.changelog/unreleased/improvements/2542-remove-ibcevent-height b/.changelog/unreleased/improvements/2542-remove-ibcevent-height new file mode 100644 index 0000000000..1d0c60765a --- /dev/null +++ b/.changelog/unreleased/improvements/2542-remove-ibcevent-height @@ -0,0 +1 @@ +- Remove height attribute from `IbcEvent` and its variants ([#2542](https://github.com/informalsystems/ibc-rs/issues/2542)) diff --git a/e2e/e2e/channel.py b/e2e/e2e/channel.py index b7c32aa2a3..3d2638f9e5 100644 --- a/e2e/e2e/channel.py +++ b/e2e/e2e/channel.py @@ -12,7 +12,6 @@ class TxChanOpenInitRes: connection_id: ConnectionId counterparty_channel_id: Optional[ChannelId] counterparty_port_id: PortId - height: BlockHeight port_id: PortId @@ -49,7 +48,6 @@ class TxChanOpenTryRes: connection_id: ConnectionId counterparty_channel_id: ChannelId counterparty_port_id: ChannelId - height: BlockHeight port_id: PortId @@ -88,7 +86,6 @@ class TxChanOpenAckRes: connection_id: ConnectionId counterparty_channel_id: ChannelId counterparty_port_id: ChannelId - height: BlockHeight port_id: PortId @@ -125,7 +122,6 @@ class TxChanOpenConfirmRes: connection_id: ConnectionId counterparty_channel_id: ChannelId counterparty_port_id: ChannelId - height: BlockHeight port_id: PortId @@ -161,7 +157,6 @@ class TxChanCloseInitRes: connection_id: ConnectionId counterparty_channel_id: ChannelId counterparty_port_id: ChannelId - height: BlockHeight port_id: PortId @@ -198,7 +193,6 @@ class TxChanCloseConfirmRes: connection_id: ConnectionId counterparty_channel_id: ChannelId counterparty_port_id: ChannelId - height: BlockHeight port_id: PortId diff --git a/e2e/e2e/client.py b/e2e/e2e/client.py index e828a06700..d76b7dc6ca 100644 --- a/e2e/e2e/client.py +++ b/e2e/e2e/client.py @@ -9,7 +9,6 @@ class ClientCreated: client_id: ClientId client_type: ClientType consensus_height: Height - height: BlockHeight @dataclass @@ -33,7 +32,6 @@ class ClientUpdated: client_id: ClientId client_type: ClientType consensus_height: Height - height: BlockHeight @dataclass diff --git a/e2e/e2e/packet.py b/e2e/e2e/packet.py index 1572588165..80422c1d7b 100644 --- a/e2e/e2e/packet.py +++ b/e2e/e2e/packet.py @@ -18,7 +18,6 @@ class Packet: @dataclass class TxPacketSendRes: - height: BlockHeight packet: Packet @@ -61,7 +60,6 @@ def process(self, result: Any) -> TxPacketSendRes: @dataclass class TxPacketRecvRes: - height: BlockHeight packet: Packet ack: Hex @@ -86,7 +84,6 @@ def process(self, result: Any) -> TxPacketRecvRes: @dataclass class TxPacketTimeoutRes: - height: BlockHeight packet: Packet @@ -111,7 +108,6 @@ def process(self, result: Any) -> TxPacketTimeoutRes: @dataclass class TxPacketAckRes: - height: BlockHeight packet: Packet diff --git a/relayer-cli/src/commands/listen.rs b/relayer-cli/src/commands/listen.rs index cc73b6abd0..60d3add278 100644 --- a/relayer-cli/src/commands/listen.rs +++ b/relayer-cli/src/commands/listen.rs @@ -117,7 +117,7 @@ pub fn listen( let matching_events = batch .events .into_iter() - .filter(|e| event_match(e, filters)) + .filter(|e| event_match(&e.event, filters)) .collect_vec(); if matching_events.is_empty() { diff --git a/relayer-cli/src/commands/misbehaviour.rs b/relayer-cli/src/commands/misbehaviour.rs index 62e0dfc746..9358d67955 100644 --- a/relayer-cli/src/commands/misbehaviour.rs +++ b/relayer-cli/src/commands/misbehaviour.rs @@ -64,8 +64,8 @@ pub fn monitor_misbehaviour( while let Ok(event_batch) = subscription.recv() { match event_batch.deref() { Ok(event_batch) => { - for event in &event_batch.events { - match event { + for event_with_height in &event_batch.events { + match &event_with_height.event { IbcEvent::UpdateClient(update) => { debug!("{:?}", update); misbehaviour_handling( @@ -82,7 +82,7 @@ pub fn monitor_misbehaviour( IbcEvent::ClientMisbehaviour(ref _misbehaviour) => { // TODO - submit misbehaviour to the witnesses (our full node) - return Ok(Some(event.clone())); + return Ok(Some(event_with_height.event.clone())); } _ => {} @@ -128,7 +128,7 @@ fn misbehaviour_handling( })?; let client = ForeignClient::restore(client_id, chain, counterparty_chain); - let result = client.detect_misbehaviour_and_submit_evidence(update); + let result = client.detect_misbehaviour_and_submit_evidence(update.as_ref()); if let MisbehaviourResults::EvidenceSubmitted(events) = result { info!("evidence submission result {:?}", events); } diff --git a/relayer-cli/src/commands/tx/client.rs b/relayer-cli/src/commands/tx/client.rs index 59430e1a53..84784a2038 100644 --- a/relayer-cli/src/commands/tx/client.rs +++ b/relayer-cli/src/commands/tx/client.rs @@ -13,6 +13,7 @@ use ibc_relayer::chain::requests::{ IncludeProof, PageRequest, QueryClientStateRequest, QueryClientStatesRequest, QueryHeight, }; use ibc_relayer::config::Config; +use ibc_relayer::event::IbcEventWithHeight; use ibc_relayer::foreign_client::{CreateOptions, ForeignClient}; use tendermint_light_client_verifier::types::TrustThreshold; use tracing::debug; @@ -93,12 +94,12 @@ impl Runnable for TxCreateClientCmd { }; // Trigger client creation via the "build" interface, so that we obtain the resulting event - let res: Result = client + let res: Result = client .build_create_client_and_send(options) .map_err(Error::foreign_client); match res { - Ok(receipt) => Output::success(receipt).exit(), + Ok(receipt) => Output::success(receipt.event).exit(), Err(e) => Output::error(format!("{}", e)).exit(), } } diff --git a/relayer/src/chain/cosmos.rs b/relayer/src/chain/cosmos.rs index 6cecf1d3a4..e50c21e7e2 100644 --- a/relayer/src/chain/cosmos.rs +++ b/relayer/src/chain/cosmos.rs @@ -52,8 +52,6 @@ use ibc::{ }; use ibc_proto::cosmos::staking::v1beta1::Params as StakingParams; -use crate::account::Balance; -use crate::chain::client::ClientSettings; use crate::chain::cosmos::batch::{ send_batched_messages_and_wait_check_tx, send_batched_messages_and_wait_commit, }; @@ -67,7 +65,6 @@ use crate::chain::cosmos::query::tx::query_txs; use crate::chain::cosmos::query::{abci_query, fetch_version_specs, packet_query, QueryResponse}; use crate::chain::cosmos::types::account::Account; use crate::chain::cosmos::types::config::TxConfig; -use crate::chain::cosmos::types::events::channel as channel_events; use crate::chain::cosmos::types::gas::{default_gas_from_config, max_gas_from_config}; use crate::chain::endpoint::{ChainEndpoint, ChainStatus, HealthCheck}; use crate::chain::tracking::TrackedMsgs; @@ -78,6 +75,8 @@ use crate::event::monitor::{EventMonitor, EventReceiver, TxMonitorCmd}; use crate::keyring::{KeyEntry, KeyRing}; use crate::light_client::tendermint::LightClient as TmLightClient; use crate::light_client::{LightClient, Verified}; +use crate::{account::Balance, event::IbcEventWithHeight}; +use crate::{chain::client::ClientSettings, event::ibc_event_try_from_abci_event}; use super::requests::{ IncludeProof, QueryBlockRequest, QueryChannelClientStateRequest, QueryChannelRequest, @@ -408,7 +407,7 @@ impl CosmosSdkChain { async fn do_send_messages_and_wait_commit( &mut self, tracked_msgs: TrackedMsgs, - ) -> Result, Error> { + ) -> Result, Error> { crate::time!("send_messages_and_wait_commit"); let _span = @@ -601,7 +600,7 @@ impl ChainEndpoint for CosmosSdkChain { fn send_messages_and_wait_commit( &mut self, tracked_msgs: TrackedMsgs, - ) -> Result, Error> { + ) -> Result, Error> { let runtime = self.rt.clone(); runtime.block_on(self.do_send_messages_and_wait_commit(tracked_msgs)) @@ -1452,7 +1451,7 @@ impl ChainEndpoint for CosmosSdkChain { /// Therefore, for packets we perform one tx_search for each sequence. /// Alternatively, a single query for all packets could be performed but it would return all /// packets ever sent. - fn query_txs(&self, request: QueryTxRequest) -> Result, Error> { + fn query_txs(&self, request: QueryTxRequest) -> Result, Error> { crate::time!("query_txs"); crate::telemetry!(query, self.id(), "query_txs"); @@ -1653,7 +1652,7 @@ fn filter_matching_event( return None; } - let ibc_event = channel_events::try_from_tx(&event)?; + let ibc_event = ibc_event_try_from_abci_event(&event).ok()?; match ibc_event { IbcEvent::SendPacket(ref send_ev) if matches_packet(request, seq, &send_ev.packet) => { Some(ibc_event) diff --git a/relayer/src/chain/cosmos/batch.rs b/relayer/src/chain/cosmos/batch.rs index bac7fba586..9118d27152 100644 --- a/relayer/src/chain/cosmos/batch.rs +++ b/relayer/src/chain/cosmos/batch.rs @@ -1,6 +1,7 @@ use core::mem; use ibc::events::IbcEvent; +use ibc::Height; use ibc_proto::google::protobuf::Any; use prost::Message; use tendermint_rpc::endpoint::broadcast::tx_sync::Response; @@ -12,6 +13,7 @@ use crate::chain::cosmos::types::tx::{TxStatus, TxSyncResult}; use crate::chain::cosmos::wait::wait_for_block_commits; use crate::config::types::{MaxMsgNum, MaxTxSize, Memo}; use crate::error::Error; +use crate::event::IbcEventWithHeight; use crate::keyring::KeyEntry; pub async fn send_batched_messages_and_wait_commit( @@ -22,7 +24,7 @@ pub async fn send_batched_messages_and_wait_commit( account: &mut Account, tx_memo: &Memo, messages: Vec, -) -> Result, Error> { +) -> Result, Error> { if messages.is_empty() { return Ok(Vec::new()); } @@ -106,10 +108,14 @@ async fn send_messages_as_batches( send_tx_with_account_sequence_retry(config, key_entry, account, tx_memo, batch).await?; if response.code.is_err() { - let events_per_tx = vec![IbcEvent::ChainError(format!( + // Note: we don't have any height information in this case. This hack will fix itself + // once we remove the `ChainError` event (which is not actually an event) + let height = Height::new(config.chain_id.version(), 1).unwrap(); + + let events_per_tx = vec![IbcEventWithHeight::new(IbcEvent::ChainError(format!( "check_tx (broadcast_tx_sync) on chain {} for Tx hash {} reports error: code={:?}, log={:?}", config.chain_id, response.hash, response.code, response.log - )); message_count]; + )), height); message_count]; let tx_sync_result = TxSyncResult { response, diff --git a/relayer/src/chain/cosmos/query/tx.rs b/relayer/src/chain/cosmos/query/tx.rs index 764ecd6f0a..27862f4469 100644 --- a/relayer/src/chain/cosmos/query/tx.rs +++ b/relayer/src/chain/cosmos/query/tx.rs @@ -13,6 +13,7 @@ use crate::chain::requests::{ QueryClientEventRequest, QueryHeight, QueryPacketEventDataRequest, QueryTxHash, QueryTxRequest, }; use crate::error::Error; +use crate::event::{ibc_event_try_from_abci_event, IbcEventWithHeight}; /// This function queries transactions for events matching certain criteria. /// 1. Client Update request - returns a vector with at most one update client event @@ -30,7 +31,7 @@ pub async fn query_txs( rpc_client: &HttpClient, rpc_address: &Url, request: QueryTxRequest, -) -> Result, Error> { +) -> Result, Error> { crate::time!("query_txs"); crate::telemetry!(query, chain_id, "query_txs"); @@ -38,7 +39,7 @@ pub async fn query_txs( QueryTxRequest::Packet(request) => { crate::time!("query_txs: query packet events"); - let mut result: Vec = vec![]; + let mut result: Vec = vec![]; for seq in &request.sequences { // query first (and only) Tx that includes the event specified in the query request @@ -144,7 +145,7 @@ fn update_client_from_tx_search_response( chain_id: &ChainId, request: &QueryClientEventRequest, response: TxResponse, -) -> Result, Error> { +) -> Result, Error> { let height = ICSHeight::new(chain_id.version(), u64::from(response.height)) .map_err(|_| Error::invalid_height_no_source())?; @@ -159,19 +160,16 @@ fn update_client_from_tx_search_response( .events .into_iter() .filter(|event| event.type_str == request.event_id.as_str()) - .flat_map(|event| events::client::try_from_tx(&event)) + .flat_map(|event| ibc_event_try_from_abci_event(&event).ok()) .flat_map(|event| match event { - IbcEvent::UpdateClient(mut update) => { - update.common.height = height; - Some(update) - } + IbcEvent::UpdateClient(update) => Some(update), _ => None, }) .find(|update| { update.common.client_id == request.client_id && update.common.consensus_height == request.consensus_height }) - .map(IbcEvent::UpdateClient)) + .map(|update| IbcEventWithHeight::new(IbcEvent::UpdateClient(update), height))) } // Extract the packet events from the query_txs RPC response. For any given @@ -186,7 +184,7 @@ fn packet_from_tx_search_response( request: &QueryPacketEventDataRequest, seq: Sequence, response: TxResponse, -) -> Result, Error> { +) -> Result, Error> { let height = ICSHeight::new(chain_id.version(), u64::from(response.height)) .map_err(|_| Error::invalid_height_no_source())?; @@ -200,7 +198,8 @@ fn packet_from_tx_search_response( .tx_result .events .into_iter() - .find_map(|ev| filter_matching_event(ev, request, seq))) + .find_map(|ev| filter_matching_event(ev, request, seq)) + .map(|ibc_event| IbcEventWithHeight::new(ibc_event, height))) } fn filter_matching_event( @@ -224,7 +223,7 @@ fn filter_matching_event( return None; } - let ibc_event = events::channel::try_from_tx(&event)?; + let ibc_event = ibc_event_try_from_abci_event(&event).ok()?; match ibc_event { IbcEvent::SendPacket(ref send_ev) if matches_packet(request, seq, &send_ev.packet) => { Some(ibc_event) @@ -260,17 +259,20 @@ pub async fn query_tx_response( fn all_ibc_events_from_tx_search_response( chain_id: &ChainId, response: TxResponse, -) -> Vec { +) -> Vec { let height = ICSHeight::new(chain_id.version(), u64::from(response.height)).unwrap(); let deliver_tx_result = response.tx_result; if deliver_tx_result.code.is_err() { // We can only return a single ChainError here because at this point // we have lost information about how many messages were in the transaction - vec![IbcEvent::ChainError(format!( - "deliver_tx for {} reports error: code={:?}, log={:?}", - response.hash, deliver_tx_result.code, deliver_tx_result.log - ))] + vec![IbcEventWithHeight::new( + IbcEvent::ChainError(format!( + "deliver_tx for {} reports error: code={:?}, log={:?}", + response.hash, deliver_tx_result.code, deliver_tx_result.log + )), + height, + )] } else { let result = deliver_tx_result .events diff --git a/relayer/src/chain/cosmos/types/events/channel.rs b/relayer/src/chain/cosmos/types/events/channel.rs index 103f7aba1b..9ead1a2783 100644 --- a/relayer/src/chain/cosmos/types/events/channel.rs +++ b/relayer/src/chain/cosmos/types/events/channel.rs @@ -4,175 +4,19 @@ use ibc::core::ics02_client::height::HeightErrorDetail; use ibc::core::ics04_channel::error::Error; use ibc::core::ics04_channel::events::{ AcknowledgePacket, Attributes, CloseConfirm, CloseInit, EventType, OpenAck, OpenConfirm, - OpenInit, OpenTry, SendPacket, TimeoutPacket, WriteAcknowledgement, CHANNEL_ID_ATTRIBUTE_KEY, - CONNECTION_ID_ATTRIBUTE_KEY, COUNTERPARTY_CHANNEL_ID_ATTRIBUTE_KEY, - COUNTERPARTY_PORT_ID_ATTRIBUTE_KEY, PKT_ACK_ATTRIBUTE_KEY, PKT_DATA_ATTRIBUTE_KEY, - PKT_DST_CHANNEL_ATTRIBUTE_KEY, PKT_DST_PORT_ATTRIBUTE_KEY, PKT_SEQ_ATTRIBUTE_KEY, - PKT_SRC_CHANNEL_ATTRIBUTE_KEY, PKT_SRC_PORT_ATTRIBUTE_KEY, PKT_TIMEOUT_HEIGHT_ATTRIBUTE_KEY, - PKT_TIMEOUT_TIMESTAMP_ATTRIBUTE_KEY, PORT_ID_ATTRIBUTE_KEY, + OpenInit, OpenTry, SendPacket, TimeoutPacket, WriteAcknowledgement, PKT_ACK_ATTRIBUTE_KEY, + PKT_DATA_ATTRIBUTE_KEY, PKT_DST_CHANNEL_ATTRIBUTE_KEY, PKT_DST_PORT_ATTRIBUTE_KEY, + PKT_SEQ_ATTRIBUTE_KEY, PKT_SRC_CHANNEL_ATTRIBUTE_KEY, PKT_SRC_PORT_ATTRIBUTE_KEY, + PKT_TIMEOUT_HEIGHT_ATTRIBUTE_KEY, PKT_TIMEOUT_TIMESTAMP_ATTRIBUTE_KEY, }; use ibc::core::ics04_channel::events::{ReceivePacket, TimeoutOnClosePacket}; use ibc::core::ics04_channel::packet::Packet; use ibc::core::ics04_channel::timeout::TimeoutHeight; -use ibc::events::{Error as EventError, IbcEvent, IbcEventType}; +use ibc::events::Error as EventError; use ibc::Height; -use tendermint::abci::Event as AbciEvent; - -pub fn try_from_tx(event: &AbciEvent) -> Option { - match event.type_str.parse() { - Ok(IbcEventType::OpenInitChannel) => extract_attributes_from_tx(event) - .map(OpenInit::try_from) - .map(|res| res.ok().map(IbcEvent::OpenInitChannel)) - .ok() - .flatten(), - Ok(IbcEventType::OpenTryChannel) => extract_attributes_from_tx(event) - .map(OpenTry::try_from) - .map(|res| res.ok().map(IbcEvent::OpenTryChannel)) - .ok() - .flatten(), - Ok(IbcEventType::OpenAckChannel) => extract_attributes_from_tx(event) - .map(OpenAck::try_from) - .map(|res| res.ok().map(IbcEvent::OpenAckChannel)) - .ok() - .flatten(), - Ok(IbcEventType::OpenConfirmChannel) => extract_attributes_from_tx(event) - .map(OpenConfirm::try_from) - .map(|res| res.ok().map(IbcEvent::OpenConfirmChannel)) - .ok() - .flatten(), - Ok(IbcEventType::CloseInitChannel) => extract_attributes_from_tx(event) - .map(CloseInit::try_from) - .map(|res| res.ok().map(IbcEvent::CloseInitChannel)) - .ok() - .flatten(), - Ok(IbcEventType::CloseConfirmChannel) => extract_attributes_from_tx(event) - .map(CloseConfirm::try_from) - .map(|res| res.ok().map(IbcEvent::CloseConfirmChannel)) - .ok() - .flatten(), - Ok(IbcEventType::SendPacket) => { - extract_packet_and_write_ack_from_tx(event) - .map(|(packet, write_ack)| { - // This event should not have a write ack. - debug_assert_eq!(write_ack.len(), 0); - IbcEvent::SendPacket(SendPacket { - height: Height::new(0, 1).unwrap(), - packet, - }) - }) - .ok() - } - Ok(IbcEventType::WriteAck) => extract_packet_and_write_ack_from_tx(event) - .map(|(packet, write_ack)| { - IbcEvent::WriteAcknowledgement(WriteAcknowledgement { - height: Height::new(0, 1).unwrap(), - packet, - ack: write_ack, - }) - }) - .ok(), - Ok(IbcEventType::AckPacket) => { - extract_packet_and_write_ack_from_tx(event) - .map(|(packet, write_ack)| { - // This event should not have a write ack. - debug_assert_eq!(write_ack.len(), 0); - IbcEvent::AcknowledgePacket(AcknowledgePacket { - height: Height::new(0, 1).unwrap(), - packet, - }) - }) - .ok() - } - Ok(IbcEventType::Timeout) => { - extract_packet_and_write_ack_from_tx(event) - .map(|(packet, write_ack)| { - // This event should not have a write ack. - debug_assert_eq!(write_ack.len(), 0); - IbcEvent::TimeoutPacket(TimeoutPacket { - height: Height::new(0, 1).unwrap(), - packet, - }) - }) - .ok() - } - _ => None, - } -} - -fn extract_attributes_from_tx(event: &AbciEvent) -> Result { - let mut attr = Attributes::default(); - - for tag in &event.attributes { - let key = tag.key.as_ref(); - let value = tag.value.as_ref(); - match key { - PORT_ID_ATTRIBUTE_KEY => attr.port_id = value.parse().map_err(Error::identifier)?, - CHANNEL_ID_ATTRIBUTE_KEY => { - attr.channel_id = value.parse().ok(); - } - CONNECTION_ID_ATTRIBUTE_KEY => { - attr.connection_id = value.parse().map_err(Error::identifier)?; - } - COUNTERPARTY_PORT_ID_ATTRIBUTE_KEY => { - attr.counterparty_port_id = value.parse().map_err(Error::identifier)?; - } - COUNTERPARTY_CHANNEL_ID_ATTRIBUTE_KEY => { - attr.counterparty_channel_id = value.parse().ok(); - } - _ => {} - } - } - - Ok(attr) -} - -fn extract_packet_and_write_ack_from_tx(event: &AbciEvent) -> Result<(Packet, Vec), Error> { - let mut packet = Packet::default(); - let mut write_ack: Vec = Vec::new(); - for tag in &event.attributes { - let key = tag.key.as_ref(); - let value = tag.value.as_ref(); - match key { - PKT_SRC_PORT_ATTRIBUTE_KEY => { - packet.source_port = value.parse().map_err(Error::identifier)?; - } - PKT_SRC_CHANNEL_ATTRIBUTE_KEY => { - packet.source_channel = value.parse().map_err(Error::identifier)?; - } - PKT_DST_PORT_ATTRIBUTE_KEY => { - packet.destination_port = value.parse().map_err(Error::identifier)?; - } - PKT_DST_CHANNEL_ATTRIBUTE_KEY => { - packet.destination_channel = value.parse().map_err(Error::identifier)?; - } - PKT_SEQ_ATTRIBUTE_KEY => { - packet.sequence = value - .parse::() - .map_err(|e| Error::invalid_string_as_sequence(value.to_string(), e))? - .into() - } - PKT_TIMEOUT_HEIGHT_ATTRIBUTE_KEY => { - packet.timeout_height = parse_timeout_height(value)?; - } - PKT_TIMEOUT_TIMESTAMP_ATTRIBUTE_KEY => { - packet.timeout_timestamp = value.parse().unwrap(); - } - PKT_DATA_ATTRIBUTE_KEY => { - packet.data = Vec::from(value.as_bytes()); - } - PKT_ACK_ATTRIBUTE_KEY => { - write_ack = Vec::from(value.as_bytes()); - } - _ => {} - } - } - - Ok((packet, write_ack)) -} fn extract_attributes(object: &RawObject<'_>, namespace: &str) -> Result { Ok(Attributes { - height: object.height, port_id: extract_attribute(object, &format!("{}.port_id", namespace))? .parse() .map_err(EventError::parse)?, @@ -222,13 +66,12 @@ macro_rules! impl_try_from_raw_obj_for_packet { type Error = EventError; fn try_from(obj: RawObject<'_>) -> Result { - let height = obj.height; let data_str: String = extract_attribute(&obj, &format!("{}.{}", obj.action, PKT_DATA_ATTRIBUTE_KEY))?; let mut packet = Packet::try_from(obj)?; packet.data = Vec::from(data_str.as_str().as_bytes()); - Ok(Self { height, packet }) + Ok(Self { packet }) } })+ }; @@ -246,7 +89,6 @@ impl TryFrom> for WriteAcknowledgement { type Error = EventError; fn try_from(obj: RawObject<'_>) -> Result { - let height = obj.height; let data_str: String = extract_attribute(&obj, &format!("{}.{}", obj.action, PKT_DATA_ATTRIBUTE_KEY))?; let ack = extract_attribute(&obj, &format!("{}.{}", obj.action, PKT_ACK_ATTRIBUTE_KEY))? @@ -255,11 +97,7 @@ impl TryFrom> for WriteAcknowledgement { let mut packet = Packet::try_from(obj)?; packet.data = Vec::from(data_str.as_str().as_bytes()); - Ok(Self { - height, - packet, - ack, - }) + Ok(Self { packet, ack }) } } @@ -366,115 +204,3 @@ pub fn extract_attribute(object: &RawObject<'_>, key: &str) -> Result, key: &str) -> Option { object.events.get(key).map(|tags| tags[object.idx].clone()) } - -#[cfg(test)] -mod tests { - use ibc::core::ics04_channel::packet::Sequence; - use ibc::timestamp::Timestamp; - - use super::*; - - #[test] - fn channel_event_to_abci_event() { - let attributes = Attributes { - height: Height::new(0, 1).unwrap(), - port_id: "test_port".parse().unwrap(), - channel_id: Some("channel-0".parse().unwrap()), - connection_id: "test_connection".parse().unwrap(), - counterparty_port_id: "counterparty_test_port".parse().unwrap(), - counterparty_channel_id: Some("channel-1".parse().unwrap()), - }; - let mut abci_events = vec![]; - let open_init = OpenInit::try_from(attributes.clone()).unwrap(); - abci_events.push(AbciEvent::from(open_init.clone())); - let open_try = OpenTry::try_from(attributes.clone()).unwrap(); - abci_events.push(AbciEvent::from(open_try.clone())); - let open_ack = OpenAck::try_from(attributes.clone()).unwrap(); - abci_events.push(AbciEvent::from(open_ack.clone())); - let open_confirm = OpenConfirm::try_from(attributes.clone()).unwrap(); - abci_events.push(AbciEvent::from(open_confirm.clone())); - let close_init = CloseInit::try_from(attributes.clone()).unwrap(); - abci_events.push(AbciEvent::from(close_init.clone())); - let close_confirm = CloseConfirm::try_from(attributes).unwrap(); - abci_events.push(AbciEvent::from(close_confirm.clone())); - - for event in abci_events { - match try_from_tx(&event) { - Some(e) => match e { - IbcEvent::OpenInitChannel(e) => { - assert_eq!(Attributes::from(e), open_init.clone().into()) - } - IbcEvent::OpenTryChannel(e) => { - assert_eq!(Attributes::from(e), open_try.clone().into()) - } - IbcEvent::OpenAckChannel(e) => { - assert_eq!(Attributes::from(e), open_ack.clone().into()) - } - IbcEvent::OpenConfirmChannel(e) => { - assert_eq!(Attributes::from(e), open_confirm.clone().into()) - } - IbcEvent::CloseInitChannel(e) => { - assert_eq!(Attributes::from(e), close_init.clone().into()) - } - IbcEvent::CloseConfirmChannel(e) => { - assert_eq!(Attributes::from(e), close_confirm.clone().into()) - } - _ => panic!("unexpected event type"), - }, - None => panic!("converted event was wrong"), - } - } - } - - #[test] - fn packet_event_to_abci_event() { - let packet = Packet { - sequence: Sequence::from(10), - source_port: "a_test_port".parse().unwrap(), - source_channel: "channel-0".parse().unwrap(), - destination_port: "b_test_port".parse().unwrap(), - destination_channel: "channel-1".parse().unwrap(), - data: "test_data".as_bytes().to_vec(), - timeout_height: Height::new(1, 10).unwrap().into(), - timeout_timestamp: Timestamp::now(), - }; - let mut abci_events = vec![]; - let send_packet = SendPacket { - height: Height::new(0, 1).unwrap(), - packet: packet.clone(), - }; - abci_events.push(AbciEvent::try_from(send_packet.clone()).unwrap()); - let write_ack = WriteAcknowledgement { - height: Height::new(0, 1).unwrap(), - packet: packet.clone(), - ack: "test_ack".as_bytes().to_vec(), - }; - abci_events.push(AbciEvent::try_from(write_ack.clone()).unwrap()); - let ack_packet = AcknowledgePacket { - height: Height::new(0, 1).unwrap(), - packet: packet.clone(), - }; - abci_events.push(AbciEvent::try_from(ack_packet.clone()).unwrap()); - let timeout_packet = TimeoutPacket { - height: Height::new(0, 1).unwrap(), - packet, - }; - abci_events.push(AbciEvent::try_from(timeout_packet.clone()).unwrap()); - - for event in abci_events { - match try_from_tx(&event) { - Some(e) => match e { - IbcEvent::SendPacket(e) => assert_eq!(e.packet, send_packet.packet), - IbcEvent::WriteAcknowledgement(e) => { - assert_eq!(e.packet, write_ack.packet); - assert_eq!(e.ack, write_ack.ack); - } - IbcEvent::AcknowledgePacket(e) => assert_eq!(e.packet, ack_packet.packet), - IbcEvent::TimeoutPacket(e) => assert_eq!(e.packet, timeout_packet.packet), - _ => panic!("unexpected event type"), - }, - None => panic!("converted event was wrong"), - } - } - } -} diff --git a/relayer/src/chain/cosmos/types/events/client.rs b/relayer/src/chain/cosmos/types/events/client.rs deleted file mode 100644 index 4f9bb1d9a6..0000000000 --- a/relayer/src/chain/cosmos/types/events/client.rs +++ /dev/null @@ -1,125 +0,0 @@ -use ibc::core::ics02_client::error::Error; -use ibc::core::ics02_client::events::{ - Attributes, ClientMisbehaviour, CreateClient, UpdateClient, UpgradeClient, - CLIENT_ID_ATTRIBUTE_KEY, CLIENT_TYPE_ATTRIBUTE_KEY, CONSENSUS_HEIGHT_ATTRIBUTE_KEY, - HEADER_ATTRIBUTE_KEY, HEIGHT_ATTRIBUTE_KEY, -}; -use ibc::core::ics02_client::header::AnyHeader; -use ibc::events::{IbcEvent, IbcEventType}; -use tendermint::abci::Event as AbciEvent; - -pub fn try_from_tx(event: &AbciEvent) -> Option { - match event.type_str.parse() { - Ok(IbcEventType::CreateClient) => extract_attributes_from_tx(event) - .map(CreateClient) - .map(IbcEvent::CreateClient) - .ok(), - Ok(IbcEventType::UpdateClient) => match extract_attributes_from_tx(event) { - Ok(attributes) => Some(IbcEvent::UpdateClient(UpdateClient { - common: attributes, - header: extract_header_from_tx(event).ok(), - })), - Err(_) => None, - }, - Ok(IbcEventType::ClientMisbehaviour) => extract_attributes_from_tx(event) - .map(ClientMisbehaviour) - .map(IbcEvent::ClientMisbehaviour) - .ok(), - Ok(IbcEventType::UpgradeClient) => extract_attributes_from_tx(event) - .map(UpgradeClient) - .map(IbcEvent::UpgradeClient) - .ok(), - _ => None, - } -} - -fn extract_attributes_from_tx(event: &AbciEvent) -> Result { - let mut attr = Attributes::default(); - - for tag in &event.attributes { - let key = tag.key.as_ref(); - let value = tag.value.as_ref(); - match key { - HEIGHT_ATTRIBUTE_KEY => { - attr.height = value - .parse() - .map_err(|e| Error::invalid_string_as_height(value.to_string(), e))? - } - CLIENT_ID_ATTRIBUTE_KEY => { - attr.client_id = value.parse().map_err(Error::invalid_client_identifier)? - } - CLIENT_TYPE_ATTRIBUTE_KEY => { - attr.client_type = value - .parse() - .map_err(|_| Error::unknown_client_type(value.to_string()))? - } - CONSENSUS_HEIGHT_ATTRIBUTE_KEY => { - attr.consensus_height = value - .parse() - .map_err(|e| Error::invalid_string_as_height(value.to_string(), e))? - } - _ => {} - } - } - - Ok(attr) -} - -pub fn extract_header_from_tx(event: &AbciEvent) -> Result { - for tag in &event.attributes { - let key = tag.key.as_ref(); - let value = tag.value.as_ref(); - if key == HEADER_ATTRIBUTE_KEY { - return AnyHeader::decode_from_string(value); - } - } - Err(Error::missing_raw_header()) -} - -#[cfg(test)] -mod tests { - use ibc::core::ics02_client::client_type::ClientType; - use ibc::core::ics02_client::header::Header; - use ibc::mock::header::MockHeader; - use ibc::Height; - - use super::*; - - #[test] - fn client_event_to_abci_event() { - let height = Height::new(1, 1).unwrap(); - let attributes = Attributes { - height, - client_id: "test_client".parse().unwrap(), - client_type: ClientType::Tendermint, - consensus_height: height, - }; - let mut abci_events = vec![]; - let create_client = CreateClient::from(attributes.clone()); - abci_events.push(AbciEvent::from(create_client.clone())); - let client_misbehaviour = ClientMisbehaviour::from(attributes.clone()); - abci_events.push(AbciEvent::from(client_misbehaviour.clone())); - let upgrade_client = UpgradeClient::from(attributes.clone()); - abci_events.push(AbciEvent::from(upgrade_client.clone())); - let mut update_client = UpdateClient::from(attributes); - let header = MockHeader::new(height).wrap_any(); - update_client.header = Some(header); - abci_events.push(AbciEvent::from(update_client.clone())); - - for event in abci_events { - match try_from_tx(&event) { - Some(e) => match e { - IbcEvent::CreateClient(e) => assert_eq!(e.0, create_client.0), - IbcEvent::ClientMisbehaviour(e) => assert_eq!(e.0, client_misbehaviour.0), - IbcEvent::UpgradeClient(e) => assert_eq!(e.0, upgrade_client.0), - IbcEvent::UpdateClient(e) => { - assert_eq!(e.common, update_client.common); - assert_eq!(e.header, update_client.header); - } - _ => panic!("unexpected event type"), - }, - None => panic!("converted event was wrong"), - } - } - } -} diff --git a/relayer/src/chain/cosmos/types/events/connection.rs b/relayer/src/chain/cosmos/types/events/connection.rs deleted file mode 100644 index 33a56a8090..0000000000 --- a/relayer/src/chain/cosmos/types/events/connection.rs +++ /dev/null @@ -1,104 +0,0 @@ -use ibc::core::ics02_client::error::Error as Ics02Error; -use ibc::core::ics03_connection::error::Error; -use ibc::core::ics03_connection::events::{ - Attributes, OpenAck, OpenConfirm, OpenInit, OpenTry, CLIENT_ID_ATTRIBUTE_KEY, - CONN_ID_ATTRIBUTE_KEY, COUNTERPARTY_CLIENT_ID_ATTRIBUTE_KEY, - COUNTERPARTY_CONN_ID_ATTRIBUTE_KEY, HEIGHT_ATTRIBUTE_KEY, -}; -use ibc::events::{IbcEvent, IbcEventType}; -use tendermint::abci::Event as AbciEvent; - -pub fn try_from_tx(event: &AbciEvent) -> Option { - match event.type_str.parse() { - Ok(IbcEventType::OpenInitConnection) => extract_attributes_from_tx(event) - .map(OpenInit::from) - .map(IbcEvent::OpenInitConnection) - .ok(), - Ok(IbcEventType::OpenTryConnection) => extract_attributes_from_tx(event) - .map(OpenTry::from) - .map(IbcEvent::OpenTryConnection) - .ok(), - Ok(IbcEventType::OpenAckConnection) => extract_attributes_from_tx(event) - .map(OpenAck::from) - .map(IbcEvent::OpenAckConnection) - .ok(), - Ok(IbcEventType::OpenConfirmConnection) => extract_attributes_from_tx(event) - .map(OpenConfirm::from) - .map(IbcEvent::OpenConfirmConnection) - .ok(), - _ => None, - } -} - -fn extract_attributes_from_tx(event: &AbciEvent) -> Result { - let mut attr = Attributes::default(); - - for tag in &event.attributes { - let key = tag.key.as_ref(); - let value = tag.value.as_ref(); - match key { - HEIGHT_ATTRIBUTE_KEY => { - attr.height = value.parse().map_err(|e| { - Error::ics02_client(Ics02Error::invalid_string_as_height(value.to_string(), e)) - })?; - } - CONN_ID_ATTRIBUTE_KEY => { - attr.connection_id = value.parse().ok(); - } - CLIENT_ID_ATTRIBUTE_KEY => { - attr.client_id = value.parse().map_err(Error::invalid_identifier)?; - } - COUNTERPARTY_CONN_ID_ATTRIBUTE_KEY => { - attr.counterparty_connection_id = value.parse().ok(); - } - COUNTERPARTY_CLIENT_ID_ATTRIBUTE_KEY => { - attr.counterparty_client_id = value.parse().map_err(Error::invalid_identifier)?; - } - _ => {} - } - } - - Ok(attr) -} - -#[cfg(test)] -mod test { - use ibc::core::ics03_connection::events::Attributes; - use ibc::Height; - - use super::*; - - #[test] - fn connection_event_to_abci_event() { - let height = Height::new(1, 1).unwrap(); - let attributes = Attributes { - height, - connection_id: Some("test_connection".parse().unwrap()), - client_id: "test_client".parse().unwrap(), - counterparty_connection_id: Some("counterparty_test_conn".parse().unwrap()), - counterparty_client_id: "counterparty_test_client".parse().unwrap(), - }; - let mut abci_events = vec![]; - let open_init = OpenInit::from(attributes.clone()); - abci_events.push(AbciEvent::from(open_init.clone())); - let open_try = OpenTry::from(attributes.clone()); - abci_events.push(AbciEvent::from(open_try.clone())); - let open_ack = OpenAck::from(attributes.clone()); - abci_events.push(AbciEvent::from(open_ack.clone())); - let open_confirm = OpenConfirm::from(attributes); - abci_events.push(AbciEvent::from(open_confirm.clone())); - - for event in abci_events { - match try_from_tx(&event) { - Some(e) => match e { - IbcEvent::OpenInitConnection(e) => assert_eq!(e, open_init), - IbcEvent::OpenTryConnection(e) => assert_eq!(e, open_try), - IbcEvent::OpenAckConnection(e) => assert_eq!(e, open_ack), - IbcEvent::OpenConfirmConnection(e) => assert_eq!(e, open_confirm), - _ => panic!("unexpected event type"), - }, - None => panic!("converted event was wrong"), - } - } - } -} diff --git a/relayer/src/chain/cosmos/types/events/mod.rs b/relayer/src/chain/cosmos/types/events/mod.rs index 80a7fac416..0bded88c61 100644 --- a/relayer/src/chain/cosmos/types/events/mod.rs +++ b/relayer/src/chain/cosmos/types/events/mod.rs @@ -1,23 +1,12 @@ -use ibc::events::IbcEvent; use ibc::Height; use tendermint::abci::Event as AbciEvent; +use crate::event::{ibc_event_try_from_abci_event, IbcEventWithHeight}; + pub mod channel; -pub mod client; -pub mod connection; -pub fn from_tx_response_event(height: Height, event: &AbciEvent) -> Option { - // Return the first hit we find - if let Some(mut client_res) = client::try_from_tx(event) { - client_res.set_height(height); - Some(client_res) - } else if let Some(mut conn_res) = connection::try_from_tx(event) { - conn_res.set_height(height); - Some(conn_res) - } else if let Some(mut chan_res) = channel::try_from_tx(event) { - chan_res.set_height(height); - Some(chan_res) - } else { - None - } +pub fn from_tx_response_event(height: Height, event: &AbciEvent) -> Option { + ibc_event_try_from_abci_event(event) + .ok() + .map(|ibc_event| IbcEventWithHeight::new(ibc_event, height)) } diff --git a/relayer/src/chain/cosmos/types/tx.rs b/relayer/src/chain/cosmos/types/tx.rs index 6faf2ecde8..e51184baf0 100644 --- a/relayer/src/chain/cosmos/types/tx.rs +++ b/relayer/src/chain/cosmos/types/tx.rs @@ -1,7 +1,8 @@ -use ibc::events::IbcEvent; use ibc_proto::cosmos::tx::v1beta1::{AuthInfo, TxBody}; use tendermint_rpc::endpoint::broadcast::tx_sync::Response; +use crate::event::IbcEventWithHeight; + pub struct SignedTx { pub body: TxBody, pub body_bytes: Vec, @@ -19,6 +20,6 @@ pub struct TxSyncResult { // the broadcast_tx_sync response pub response: Response, // the events generated by a Tx once executed - pub events: Vec, + pub events: Vec, pub status: TxStatus, } diff --git a/relayer/src/chain/cosmos/wait.rs b/relayer/src/chain/cosmos/wait.rs index c39dfe72af..917af94fd4 100644 --- a/relayer/src/chain/cosmos/wait.rs +++ b/relayer/src/chain/cosmos/wait.rs @@ -12,6 +12,7 @@ use crate::chain::cosmos::query::tx::query_tx_response; use crate::chain::cosmos::types::events::from_tx_response_event; use crate::chain::cosmos::types::tx::{TxStatus, TxSyncResult}; use crate::error::Error; +use crate::event::IbcEventWithHeight; const WAIT_BACKOFF: Duration = Duration::from_millis(300); @@ -77,22 +78,24 @@ async fn update_tx_sync_result( if let Some(response) = response { tx_sync_result.status = TxStatus::ReceivedResponse; + let height = Height::new(chain_id.version(), u64::from(response.height)).unwrap(); if response.tx_result.code.is_err() { tx_sync_result.events = vec![ - IbcEvent::ChainError(format!( - "deliver_tx for {} reports error: code={:?}, log={:?}", - response.hash, response.tx_result.code, response.tx_result.log - )); + IbcEventWithHeight::new( + IbcEvent::ChainError(format!( + "deliver_tx for {} reports error: code={:?}, log={:?}", + response.hash, response.tx_result.code, response.tx_result.log + )), + height + ); message_count ]; } else { - let height = Height::new(chain_id.version(), u64::from(response.height)).unwrap(); - tx_sync_result.events = response .tx_result .events .iter() - .flat_map(|event| from_tx_response_event(height, event).into_iter()) + .flat_map(|event| from_tx_response_event(height, event)) .collect::>(); } } diff --git a/relayer/src/chain/endpoint.rs b/relayer/src/chain/endpoint.rs index 7efaf0b613..3647660e9c 100644 --- a/relayer/src/chain/endpoint.rs +++ b/relayer/src/chain/endpoint.rs @@ -43,6 +43,7 @@ use crate::connection::ConnectionMsgType; use crate::denom::DenomTrace; use crate::error::{Error, QUERY_PROOF_EXPECT_MSG}; use crate::event::monitor::{EventReceiver, TxMonitorCmd}; +use crate::event::IbcEventWithHeight; use crate::keyring::{KeyEntry, KeyRing}; use super::requests::{ @@ -127,7 +128,7 @@ pub trait ChainEndpoint: Sized { fn send_messages_and_wait_commit( &mut self, tracked_msgs: TrackedMsgs, - ) -> Result, Error>; + ) -> Result, Error>; /// Sends one or more transactions with `msgs` to chain. /// Non-blocking alternative to `send_messages_and_wait_commit` interface. @@ -335,7 +336,7 @@ pub trait ChainEndpoint: Sized { include_proof: IncludeProof, ) -> Result<(Sequence, Option), Error>; - fn query_txs(&self, request: QueryTxRequest) -> Result, Error>; + fn query_txs(&self, request: QueryTxRequest) -> Result, Error>; fn query_blocks( &self, diff --git a/relayer/src/chain/handle.rs b/relayer/src/chain/handle.rs index 8714cc3058..20d1f1bb84 100644 --- a/relayer/src/chain/handle.rs +++ b/relayer/src/chain/handle.rs @@ -36,7 +36,10 @@ use crate::{ connection::ConnectionMsgType, denom::DenomTrace, error::Error, - event::monitor::{EventBatch, Result as MonitorResult}, + event::{ + monitor::{EventBatch, Result as MonitorResult}, + IbcEventWithHeight, + }, keyring::KeyEntry, }; @@ -121,7 +124,7 @@ pub enum ChainRequest { SendMessagesAndWaitCommit { tracked_msgs: TrackedMsgs, - reply_to: ReplyTo>, + reply_to: ReplyTo>, }, SendMessagesAndWaitCheckTx { @@ -338,7 +341,7 @@ pub enum ChainRequest { QueryPacketEventDataFromTxs { request: QueryTxRequest, - reply_to: ReplyTo>, + reply_to: ReplyTo>, }, QueryPacketEventDataFromBlocks { @@ -372,7 +375,7 @@ pub trait ChainHandle: Clone + Send + Sync + Serialize + Debug + 'static { fn send_messages_and_wait_commit( &self, tracked_msgs: TrackedMsgs, - ) -> Result, Error>; + ) -> Result, Error>; /// Submit messages asynchronously. /// Does not block waiting on the chain to produce the @@ -626,7 +629,7 @@ pub trait ChainHandle: Clone + Send + Sync + Serialize + Debug + 'static { request: QueryUnreceivedAcksRequest, ) -> Result, Error>; - fn query_txs(&self, request: QueryTxRequest) -> Result, Error>; + fn query_txs(&self, request: QueryTxRequest) -> Result, Error>; fn query_blocks( &self, diff --git a/relayer/src/chain/handle/base.rs b/relayer/src/chain/handle/base.rs index 01178638c4..8a2390ce6c 100644 --- a/relayer/src/chain/handle/base.rs +++ b/relayer/src/chain/handle/base.rs @@ -47,6 +47,7 @@ use crate::{ connection::ConnectionMsgType, denom::DenomTrace, error::Error, + event::IbcEventWithHeight, keyring::KeyEntry, }; @@ -109,7 +110,7 @@ impl ChainHandle for BaseChainHandle { fn send_messages_and_wait_commit( &self, tracked_msgs: TrackedMsgs, - ) -> Result, Error> { + ) -> Result, Error> { self.send(|reply_to| ChainRequest::SendMessagesAndWaitCommit { tracked_msgs, reply_to, @@ -459,7 +460,7 @@ impl ChainHandle for BaseChainHandle { self.send(|reply_to| ChainRequest::QueryUnreceivedAcknowledgement { request, reply_to }) } - fn query_txs(&self, request: QueryTxRequest) -> Result, Error> { + fn query_txs(&self, request: QueryTxRequest) -> Result, Error> { self.send(|reply_to| ChainRequest::QueryPacketEventDataFromTxs { request, reply_to }) } diff --git a/relayer/src/chain/handle/cache.rs b/relayer/src/chain/handle/cache.rs index 546be73d4d..f6a93e1b6b 100644 --- a/relayer/src/chain/handle/cache.rs +++ b/relayer/src/chain/handle/cache.rs @@ -44,6 +44,7 @@ use crate::config::ChainConfig; use crate::connection::ConnectionMsgType; use crate::denom::DenomTrace; use crate::error::Error; +use crate::event::IbcEventWithHeight; use crate::keyring::KeyEntry; use crate::telemetry; @@ -101,7 +102,7 @@ impl ChainHandle for CachingChainHandle { fn send_messages_and_wait_commit( &self, tracked_msgs: TrackedMsgs, - ) -> Result, Error> { + ) -> Result, Error> { self.inner().send_messages_and_wait_commit(tracked_msgs) } @@ -467,7 +468,7 @@ impl ChainHandle for CachingChainHandle { self.inner().query_unreceived_acknowledgements(request) } - fn query_txs(&self, request: QueryTxRequest) -> Result, Error> { + fn query_txs(&self, request: QueryTxRequest) -> Result, Error> { self.inner().query_txs(request) } diff --git a/relayer/src/chain/handle/counting.rs b/relayer/src/chain/handle/counting.rs index 21a6118879..22c9e68e95 100644 --- a/relayer/src/chain/handle/counting.rs +++ b/relayer/src/chain/handle/counting.rs @@ -43,6 +43,7 @@ use crate::chain::tracking::TrackedMsgs; use crate::config::ChainConfig; use crate::denom::DenomTrace; use crate::error::Error; +use crate::event::IbcEventWithHeight; use crate::util::lock::LockExt; use crate::{connection::ConnectionMsgType, keyring::KeyEntry}; @@ -119,7 +120,7 @@ impl ChainHandle for CountingChainHandle { fn send_messages_and_wait_commit( &self, tracked_msgs: TrackedMsgs, - ) -> Result, Error> { + ) -> Result, Error> { self.inc_metric("send_messages_and_wait_commit"); self.inner().send_messages_and_wait_commit(tracked_msgs) } @@ -450,7 +451,7 @@ impl ChainHandle for CountingChainHandle { self.inner().query_unreceived_acknowledgements(request) } - fn query_txs(&self, request: QueryTxRequest) -> Result, Error> { + fn query_txs(&self, request: QueryTxRequest) -> Result, Error> { self.inc_metric("query_txs"); self.inner().query_txs(request) } diff --git a/relayer/src/chain/mock.rs b/relayer/src/chain/mock.rs index 3653a87c79..6eec7cfe06 100644 --- a/relayer/src/chain/mock.rs +++ b/relayer/src/chain/mock.rs @@ -40,6 +40,7 @@ use crate::config::ChainConfig; use crate::denom::DenomTrace; use crate::error::Error; use crate::event::monitor::{EventReceiver, EventSender, TxMonitorCmd}; +use crate::event::IbcEventWithHeight; use crate::keyring::{KeyEntry, KeyRing}; use crate::light_client::Verified; use crate::light_client::{mock::LightClient as MockLightClient, LightClient}; @@ -159,11 +160,14 @@ impl ChainEndpoint for MockChain { fn send_messages_and_wait_commit( &mut self, tracked_msgs: TrackedMsgs, - ) -> Result, Error> { + ) -> Result, Error> { // Use the ICS18Context interface to submit the set of messages. let events = self.context.send(tracked_msgs.msgs).map_err(Error::ics18)?; - Ok(events) + Ok(events + .into_iter() + .map(|ev| IbcEventWithHeight::new(ev, Height::new(0, 1).unwrap())) + .collect()) } fn send_messages_and_wait_check_tx( @@ -351,7 +355,7 @@ impl ChainEndpoint for MockChain { unimplemented!() } - fn query_txs(&self, _request: QueryTxRequest) -> Result, Error> { + fn query_txs(&self, _request: QueryTxRequest) -> Result, Error> { unimplemented!() } diff --git a/relayer/src/chain/runtime.rs b/relayer/src/chain/runtime.rs index c8f2096caf..3699fec38a 100644 --- a/relayer/src/chain/runtime.rs +++ b/relayer/src/chain/runtime.rs @@ -40,6 +40,7 @@ use crate::{ event::{ bus::EventBus, monitor::{EventBatch, EventReceiver, MonitorCmd, Result as MonitorResult, TxMonitorCmd}, + IbcEventWithHeight, }, keyring::KeyEntry, }; @@ -449,7 +450,7 @@ where fn send_messages_and_wait_commit( &mut self, tracked_msgs: TrackedMsgs, - reply_to: ReplyTo>, + reply_to: ReplyTo>, ) -> Result<(), Error> { let result = self.chain.send_messages_and_wait_commit(tracked_msgs); reply_to.send(result).map_err(Error::send) @@ -855,7 +856,7 @@ where fn query_txs( &self, request: QueryTxRequest, - reply_to: ReplyTo>, + reply_to: ReplyTo>, ) -> Result<(), Error> { let result = self.chain.query_txs(request); reply_to.send(result).map_err(Error::send) diff --git a/relayer/src/channel.rs b/relayer/src/channel.rs index f8895e0502..52dca0277a 100644 --- a/relayer/src/channel.rs +++ b/relayer/src/channel.rs @@ -762,7 +762,7 @@ impl Channel { } } - pub fn step_event(&mut self, event: IbcEvent, index: u64) -> RetryResult { + pub fn step_event(&mut self, event: &IbcEvent, index: u64) -> RetryResult { let state = match event { IbcEvent::OpenInitChannel(_) => State::Init, IbcEvent::OpenTryChannel(_) => State::TryOpen, @@ -843,21 +843,21 @@ impl Channel { // Find the relevant event for channel open init let result = events .into_iter() - .find(|event| { - matches!(event, IbcEvent::OpenInitChannel(_)) - || matches!(event, IbcEvent::ChainError(_)) + .find(|event_with_height| { + matches!(event_with_height.event, IbcEvent::OpenInitChannel(_)) + || matches!(event_with_height.event, IbcEvent::ChainError(_)) }) .ok_or_else(|| { ChannelError::missing_event("no chan init event was in the response".to_string()) })?; - match result { + match &result.event { IbcEvent::OpenInitChannel(_) => { info!("🎊 {} => {:#?}\n", self.dst_chain().id(), result); - Ok(result) + Ok(result.event) } - IbcEvent::ChainError(e) => Err(ChannelError::tx_response(e)), - _ => Err(ChannelError::invalid_event(result)), + IbcEvent::ChainError(e) => Err(ChannelError::tx_response(e.clone())), + _ => Err(ChannelError::invalid_event(result.event)), } } @@ -1029,21 +1029,21 @@ impl Channel { // Find the relevant event for channel open try let result = events .into_iter() - .find(|event| { - matches!(event, IbcEvent::OpenTryChannel(_)) - || matches!(event, IbcEvent::ChainError(_)) + .find(|events_with_height| { + matches!(events_with_height.event, IbcEvent::OpenTryChannel(_)) + || matches!(events_with_height.event, IbcEvent::ChainError(_)) }) .ok_or_else(|| { ChannelError::missing_event("no chan try event was in the response".to_string()) })?; - match result { + match &result.event { IbcEvent::OpenTryChannel(_) => { info!("🎊 {} => {:#?}\n", self.dst_chain().id(), result); - Ok(result) + Ok(result.event) } - IbcEvent::ChainError(e) => Err(ChannelError::tx_response(e)), - _ => Err(ChannelError::invalid_event(result)), + IbcEvent::ChainError(e) => Err(ChannelError::tx_response(e.clone())), + _ => Err(ChannelError::invalid_event(result.event)), } } @@ -1132,21 +1132,21 @@ impl Channel { // Find the relevant event for channel open ack let result = events .into_iter() - .find(|event| { - matches!(event, IbcEvent::OpenAckChannel(_)) - || matches!(event, IbcEvent::ChainError(_)) + .find(|event_with_height| { + matches!(event_with_height.event, IbcEvent::OpenAckChannel(_)) + || matches!(event_with_height.event, IbcEvent::ChainError(_)) }) .ok_or_else(|| { ChannelError::missing_event("no chan ack event was in the response".to_string()) })?; - match result { + match &result.event { IbcEvent::OpenAckChannel(_) => { info!("🎊 {} => {:#?}\n", channel.dst_chain().id(), result); - Ok(result) + Ok(result.event) } - IbcEvent::ChainError(e) => Err(ChannelError::tx_response(e)), - _ => Err(ChannelError::invalid_event(result)), + IbcEvent::ChainError(e) => Err(ChannelError::tx_response(e.clone())), + _ => Err(ChannelError::invalid_event(result.event)), } } @@ -1237,9 +1237,9 @@ impl Channel { // Find the relevant event for channel open confirm let result = events .into_iter() - .find(|event| { - matches!(event, IbcEvent::OpenConfirmChannel(_)) - || matches!(event, IbcEvent::ChainError(_)) + .find(|event_with_height| { + matches!(event_with_height.event, IbcEvent::OpenConfirmChannel(_)) + || matches!(event_with_height.event, IbcEvent::ChainError(_)) }) .ok_or_else(|| { ChannelError::missing_event( @@ -1247,13 +1247,13 @@ impl Channel { ) })?; - match result { + match &result.event { IbcEvent::OpenConfirmChannel(_) => { info!("🎊 {} => {:#?}\n", channel.dst_chain().id(), result); - Ok(result) + Ok(result.event) } - IbcEvent::ChainError(e) => Err(ChannelError::tx_response(e)), - _ => Err(ChannelError::invalid_event(result)), + IbcEvent::ChainError(e) => Err(ChannelError::tx_response(e.clone())), + _ => Err(ChannelError::invalid_event(result.event)), } } @@ -1309,21 +1309,21 @@ impl Channel { // Find the relevant event for channel close init let result = events .into_iter() - .find(|event| { - matches!(event, IbcEvent::CloseInitChannel(_)) - || matches!(event, IbcEvent::ChainError(_)) + .find(|event_with_height| { + matches!(event_with_height.event, IbcEvent::CloseInitChannel(_)) + || matches!(event_with_height.event, IbcEvent::ChainError(_)) }) .ok_or_else(|| { ChannelError::missing_event("no chan init event was in the response".to_string()) })?; - match result { + match &result.event { IbcEvent::CloseInitChannel(_) => { info!("👋 {} => {:#?}\n", self.dst_chain().id(), result); - Ok(result) + Ok(result.event) } - IbcEvent::ChainError(e) => Err(ChannelError::tx_response(e)), - _ => Err(ChannelError::invalid_event(result)), + IbcEvent::ChainError(e) => Err(ChannelError::tx_response(e.clone())), + _ => Err(ChannelError::invalid_event(result.event)), } } @@ -1406,21 +1406,21 @@ impl Channel { // Find the relevant event for channel close confirm let result = events .into_iter() - .find(|event| { - matches!(event, IbcEvent::CloseConfirmChannel(_)) - || matches!(event, IbcEvent::ChainError(_)) + .find(|event_with_height| { + matches!(event_with_height.event, IbcEvent::CloseConfirmChannel(_)) + || matches!(event_with_height.event, IbcEvent::ChainError(_)) }) .ok_or_else(|| { ChannelError::missing_event("no chan confirm event was in the response".to_string()) })?; - match result { + match &result.event { IbcEvent::CloseConfirmChannel(_) => { info!("👋 {} => {:#?}\n", self.dst_chain().id(), result); - Ok(result) + Ok(result.event) } - IbcEvent::ChainError(e) => Err(ChannelError::tx_response(e)), - _ => Err(ChannelError::invalid_event(result)), + IbcEvent::ChainError(e) => Err(ChannelError::tx_response(e.clone())), + _ => Err(ChannelError::invalid_event(result.event)), } } diff --git a/relayer/src/connection.rs b/relayer/src/connection.rs index e4c7948157..d24d580d28 100644 --- a/relayer/src/connection.rs +++ b/relayer/src/connection.rs @@ -182,7 +182,7 @@ impl Connection { pub fn restore_from_event( chain: ChainA, counterparty_chain: ChainB, - connection_open_event: IbcEvent, + connection_open_event: &IbcEvent, ) -> Result, ConnectionError> { let connection_event_attributes = connection_open_event .connection_attributes() @@ -749,7 +749,7 @@ impl Connection { } } - pub fn step_event(&mut self, event: IbcEvent, index: u64) -> RetryResult { + pub fn step_event(&mut self, event: &IbcEvent, index: u64) -> RetryResult { let state = match event { IbcEvent::OpenInitConnection(_) => State::Init, IbcEvent::OpenTryConnection(_) => State::TryOpen, @@ -900,20 +900,20 @@ impl Connection { // Find the relevant event for connection init let result = events .into_iter() - .find(|event| { - matches!(event, IbcEvent::OpenInitConnection(_)) - || matches!(event, IbcEvent::ChainError(_)) + .find(|event_with_height| { + matches!(event_with_height.event, IbcEvent::OpenInitConnection(_)) + || matches!(event_with_height.event, IbcEvent::ChainError(_)) }) .ok_or_else(ConnectionError::missing_connection_init_event)?; // TODO - make chainError an actual error - match result { + match &result.event { IbcEvent::OpenInitConnection(_) => { info!("🥂 {} => {:#?}\n", self.dst_chain().id(), result); - Ok(result) + Ok(result.event) } - IbcEvent::ChainError(e) => Err(ConnectionError::tx_response(e)), - _ => Err(ConnectionError::invalid_event(result)), + IbcEvent::ChainError(e) => Err(ConnectionError::tx_response(e.clone())), + _ => Err(ConnectionError::invalid_event(result.event)), } } @@ -1073,19 +1073,19 @@ impl Connection { // Find the relevant event for connection try transaction let result = events .into_iter() - .find(|event| { - matches!(event, IbcEvent::OpenTryConnection(_)) - || matches!(event, IbcEvent::ChainError(_)) + .find(|event_with_height| { + matches!(event_with_height.event, IbcEvent::OpenTryConnection(_)) + || matches!(event_with_height.event, IbcEvent::ChainError(_)) }) .ok_or_else(ConnectionError::missing_connection_try_event)?; - match result { + match &result.event { IbcEvent::OpenTryConnection(_) => { info!("🥂 {} => {:#?}\n", self.dst_chain().id(), result); - Ok(result) + Ok(result.event) } - IbcEvent::ChainError(e) => Err(ConnectionError::tx_response(e)), - _ => Err(ConnectionError::invalid_event(result)), + IbcEvent::ChainError(e) => Err(ConnectionError::tx_response(e.clone())), + _ => Err(ConnectionError::invalid_event(result.event)), } } @@ -1188,19 +1188,19 @@ impl Connection { // Find the relevant event for connection ack let result = events .into_iter() - .find(|event| { - matches!(event, IbcEvent::OpenAckConnection(_)) - || matches!(event, IbcEvent::ChainError(_)) + .find(|event_with_height| { + matches!(event_with_height.event, IbcEvent::OpenAckConnection(_)) + || matches!(event_with_height.event, IbcEvent::ChainError(_)) }) .ok_or_else(ConnectionError::missing_connection_ack_event)?; - match result { + match &result.event { IbcEvent::OpenAckConnection(_) => { info!("🥂 {} => {:#?}\n", self.dst_chain().id(), result); - Ok(result) + Ok(result.event) } - IbcEvent::ChainError(e) => Err(ConnectionError::tx_response(e)), - _ => Err(ConnectionError::invalid_event(result)), + IbcEvent::ChainError(e) => Err(ConnectionError::tx_response(e.clone())), + _ => Err(ConnectionError::invalid_event(result.event)), } } @@ -1276,19 +1276,19 @@ impl Connection { // Find the relevant event for connection confirm let result = events .into_iter() - .find(|event| { - matches!(event, IbcEvent::OpenConfirmConnection(_)) - || matches!(event, IbcEvent::ChainError(_)) + .find(|event_with_height| { + matches!(event_with_height.event, IbcEvent::OpenConfirmConnection(_)) + || matches!(event_with_height.event, IbcEvent::ChainError(_)) }) .ok_or_else(ConnectionError::missing_connection_confirm_event)?; - match result { + match &result.event { IbcEvent::OpenConfirmConnection(_) => { info!("🥂 {} => {:#?}\n", self.dst_chain().id(), result); - Ok(result) + Ok(result.event) } - IbcEvent::ChainError(e) => Err(ConnectionError::tx_response(e)), - _ => Err(ConnectionError::invalid_event(result)), + IbcEvent::ChainError(e) => Err(ConnectionError::tx_response(e.clone())), + _ => Err(ConnectionError::invalid_event(result.event)), } } diff --git a/relayer/src/event.rs b/relayer/src/event.rs index c5f87abc85..d78d858f99 100644 --- a/relayer/src/event.rs +++ b/relayer/src/event.rs @@ -1,3 +1,629 @@ +use core::fmt::{self, Display, Formatter}; +use ibc::{ + core::ics02_client::{ + error::Error as ClientError, + events::{self as client_events, Attributes as ClientAttributes}, + }, + core::{ + ics02_client::height::HeightErrorDetail, + ics04_channel::{ + error::Error as ChannelError, + events::{self as channel_events, Attributes as ChannelAttributes}, + packet::Packet, + timeout::TimeoutHeight, + }, + }, + core::{ + ics02_client::{events::HEADER_ATTRIBUTE_KEY, header::AnyHeader}, + ics03_connection::{ + error::Error as ConnectionError, + events::{self as connection_events, Attributes as ConnectionAttributes}, + }, + }, + events::{Error as IbcEventError, IbcEvent, IbcEventType}, + Height, +}; +use serde::Serialize; +use tendermint::abci::Event as AbciEvent; + pub mod bus; pub mod monitor; pub mod rpc; + +#[derive(Clone, Debug, Serialize)] +pub struct IbcEventWithHeight { + pub event: IbcEvent, + pub height: Height, +} + +impl IbcEventWithHeight { + pub fn new(event: IbcEvent, height: Height) -> Self { + Self { event, height } + } +} + +impl Display for IbcEventWithHeight { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + write!(f, "{} at height {}", self.event, self.height) + } +} + +/// For use in debug messages +pub struct PrettyEvents<'a>(pub &'a [IbcEventWithHeight]); +impl<'a> Display for PrettyEvents<'a> { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + writeln!(f, "events:")?; + for v in self.0 { + writeln!(f, "\t{}", v)?; + } + Ok(()) + } +} + +/// Note: This function, as well as other helpers, are needed as a workaround to +/// Rust's orphan rule. That is, we want the AbciEvent -> IbcEvent to be defined +/// in the relayer crate, but can't because neither AbciEvent nor IbcEvent are +/// defined in this crate. Hence, we are forced to make an ad-hoc function for +/// it. +pub fn ibc_event_try_from_abci_event(abci_event: &AbciEvent) -> Result { + match abci_event.type_str.parse() { + Ok(IbcEventType::CreateClient) => Ok(IbcEvent::CreateClient( + create_client_try_from_abci_event(abci_event).map_err(IbcEventError::client)?, + )), + Ok(IbcEventType::UpdateClient) => Ok(IbcEvent::UpdateClient( + update_client_try_from_abci_event(abci_event).map_err(IbcEventError::client)?, + )), + Ok(IbcEventType::UpgradeClient) => Ok(IbcEvent::UpgradeClient( + upgrade_client_try_from_abci_event(abci_event).map_err(IbcEventError::client)?, + )), + Ok(IbcEventType::ClientMisbehaviour) => Ok(IbcEvent::ClientMisbehaviour( + client_misbehaviour_try_from_abci_event(abci_event).map_err(IbcEventError::client)?, + )), + Ok(IbcEventType::OpenInitConnection) => Ok(IbcEvent::OpenInitConnection( + connection_open_init_try_from_abci_event(abci_event) + .map_err(IbcEventError::connection)?, + )), + Ok(IbcEventType::OpenTryConnection) => Ok(IbcEvent::OpenTryConnection( + connection_open_try_try_from_abci_event(abci_event) + .map_err(IbcEventError::connection)?, + )), + Ok(IbcEventType::OpenAckConnection) => Ok(IbcEvent::OpenAckConnection( + connection_open_ack_try_from_abci_event(abci_event) + .map_err(IbcEventError::connection)?, + )), + Ok(IbcEventType::OpenConfirmConnection) => Ok(IbcEvent::OpenConfirmConnection( + connection_open_confirm_try_from_abci_event(abci_event) + .map_err(IbcEventError::connection)?, + )), + Ok(IbcEventType::OpenInitChannel) => Ok(IbcEvent::OpenInitChannel( + channel_open_init_try_from_abci_event(abci_event).map_err(IbcEventError::channel)?, + )), + Ok(IbcEventType::OpenTryChannel) => Ok(IbcEvent::OpenTryChannel( + channel_open_try_try_from_abci_event(abci_event).map_err(IbcEventError::channel)?, + )), + Ok(IbcEventType::OpenAckChannel) => Ok(IbcEvent::OpenAckChannel( + channel_open_ack_try_from_abci_event(abci_event).map_err(IbcEventError::channel)?, + )), + Ok(IbcEventType::OpenConfirmChannel) => Ok(IbcEvent::OpenConfirmChannel( + channel_open_confirm_try_from_abci_event(abci_event).map_err(IbcEventError::channel)?, + )), + Ok(IbcEventType::CloseInitChannel) => Ok(IbcEvent::CloseInitChannel( + channel_close_init_try_from_abci_event(abci_event).map_err(IbcEventError::channel)?, + )), + Ok(IbcEventType::CloseConfirmChannel) => Ok(IbcEvent::CloseConfirmChannel( + channel_close_confirm_try_from_abci_event(abci_event) + .map_err(IbcEventError::channel)?, + )), + Ok(IbcEventType::SendPacket) => Ok(IbcEvent::SendPacket( + send_packet_try_from_abci_event(abci_event).map_err(IbcEventError::channel)?, + )), + Ok(IbcEventType::WriteAck) => Ok(IbcEvent::WriteAcknowledgement( + write_acknowledgement_try_from_abci_event(abci_event) + .map_err(IbcEventError::channel)?, + )), + Ok(IbcEventType::AckPacket) => Ok(IbcEvent::AcknowledgePacket( + acknowledge_packet_try_from_abci_event(abci_event).map_err(IbcEventError::channel)?, + )), + Ok(IbcEventType::Timeout) => Ok(IbcEvent::TimeoutPacket( + timeout_packet_try_from_abci_event(abci_event).map_err(IbcEventError::channel)?, + )), + _ => Err(IbcEventError::unsupported_abci_event( + abci_event.type_str.to_owned(), + )), + } +} + +pub fn create_client_try_from_abci_event( + abci_event: &AbciEvent, +) -> Result { + client_extract_attributes_from_tx(abci_event).map(client_events::CreateClient) +} + +pub fn update_client_try_from_abci_event( + abci_event: &AbciEvent, +) -> Result { + client_extract_attributes_from_tx(abci_event).map(|attributes| client_events::UpdateClient { + common: attributes, + header: extract_header_from_tx(abci_event).ok(), + }) +} + +pub fn upgrade_client_try_from_abci_event( + abci_event: &AbciEvent, +) -> Result { + client_extract_attributes_from_tx(abci_event).map(client_events::UpgradeClient) +} + +pub fn client_misbehaviour_try_from_abci_event( + abci_event: &AbciEvent, +) -> Result { + client_extract_attributes_from_tx(abci_event).map(client_events::ClientMisbehaviour) +} + +pub fn connection_open_init_try_from_abci_event( + abci_event: &AbciEvent, +) -> Result { + connection_extract_attributes_from_tx(abci_event).map(connection_events::OpenInit) +} + +pub fn connection_open_try_try_from_abci_event( + abci_event: &AbciEvent, +) -> Result { + connection_extract_attributes_from_tx(abci_event).map(connection_events::OpenTry) +} + +pub fn connection_open_ack_try_from_abci_event( + abci_event: &AbciEvent, +) -> Result { + connection_extract_attributes_from_tx(abci_event).map(connection_events::OpenAck) +} + +pub fn connection_open_confirm_try_from_abci_event( + abci_event: &AbciEvent, +) -> Result { + connection_extract_attributes_from_tx(abci_event).map(connection_events::OpenConfirm) +} + +pub fn channel_open_init_try_from_abci_event( + abci_event: &AbciEvent, +) -> Result { + match channel_extract_attributes_from_tx(abci_event) { + Ok(attrs) => channel_events::OpenInit::try_from(attrs) + .map_err(|_| ChannelError::implementation_specific()), + Err(e) => Err(e), + } +} + +pub fn channel_open_try_try_from_abci_event( + abci_event: &AbciEvent, +) -> Result { + match channel_extract_attributes_from_tx(abci_event) { + Ok(attrs) => channel_events::OpenTry::try_from(attrs) + .map_err(|_| ChannelError::implementation_specific()), + Err(e) => Err(e), + } +} + +pub fn channel_open_ack_try_from_abci_event( + abci_event: &AbciEvent, +) -> Result { + match channel_extract_attributes_from_tx(abci_event) { + Ok(attrs) => channel_events::OpenAck::try_from(attrs) + .map_err(|_| ChannelError::implementation_specific()), + Err(e) => Err(e), + } +} + +pub fn channel_open_confirm_try_from_abci_event( + abci_event: &AbciEvent, +) -> Result { + match channel_extract_attributes_from_tx(abci_event) { + Ok(attrs) => channel_events::OpenConfirm::try_from(attrs) + .map_err(|_| ChannelError::implementation_specific()), + Err(e) => Err(e), + } +} + +pub fn channel_close_init_try_from_abci_event( + abci_event: &AbciEvent, +) -> Result { + match channel_extract_attributes_from_tx(abci_event) { + Ok(attrs) => channel_events::CloseInit::try_from(attrs) + .map_err(|_| ChannelError::implementation_specific()), + Err(e) => Err(e), + } +} + +pub fn channel_close_confirm_try_from_abci_event( + abci_event: &AbciEvent, +) -> Result { + match channel_extract_attributes_from_tx(abci_event) { + Ok(attrs) => channel_events::CloseConfirm::try_from(attrs) + .map_err(|_| ChannelError::implementation_specific()), + Err(e) => Err(e), + } +} + +pub fn send_packet_try_from_abci_event( + abci_event: &AbciEvent, +) -> Result { + extract_packet_and_write_ack_from_tx(abci_event) + .map(|(packet, write_ack)| { + // This event should not have a write ack. + debug_assert_eq!(write_ack.len(), 0); + channel_events::SendPacket { packet } + }) + .map_err(|_| ChannelError::abci_conversion_failed(abci_event.type_str.to_owned())) +} + +pub fn write_acknowledgement_try_from_abci_event( + abci_event: &AbciEvent, +) -> Result { + extract_packet_and_write_ack_from_tx(abci_event) + .map(|(packet, write_ack)| channel_events::WriteAcknowledgement { + packet, + ack: write_ack, + }) + .map_err(|_| ChannelError::abci_conversion_failed(abci_event.type_str.to_owned())) +} + +pub fn acknowledge_packet_try_from_abci_event( + abci_event: &AbciEvent, +) -> Result { + extract_packet_and_write_ack_from_tx(abci_event) + .map(|(packet, write_ack)| { + // This event should not have a write ack. + debug_assert_eq!(write_ack.len(), 0); + channel_events::AcknowledgePacket { packet } + }) + .map_err(|_| ChannelError::abci_conversion_failed(abci_event.type_str.to_owned())) +} + +pub fn timeout_packet_try_from_abci_event( + abci_event: &AbciEvent, +) -> Result { + extract_packet_and_write_ack_from_tx(abci_event) + .map(|(packet, write_ack)| { + // This event should not have a write ack. + debug_assert_eq!(write_ack.len(), 0); + channel_events::TimeoutPacket { packet } + }) + .map_err(|_| ChannelError::abci_conversion_failed(abci_event.type_str.to_owned())) +} + +fn client_extract_attributes_from_tx(event: &AbciEvent) -> Result { + let mut attr = ClientAttributes::default(); + + for tag in &event.attributes { + let key = tag.key.as_ref(); + let value = tag.value.as_ref(); + match key { + client_events::CLIENT_ID_ATTRIBUTE_KEY => { + attr.client_id = value + .parse() + .map_err(ClientError::invalid_client_identifier)? + } + client_events::CLIENT_TYPE_ATTRIBUTE_KEY => { + attr.client_type = value + .parse() + .map_err(|_| ClientError::unknown_client_type(value.to_string()))? + } + client_events::CONSENSUS_HEIGHT_ATTRIBUTE_KEY => { + attr.consensus_height = value + .parse() + .map_err(|e| ClientError::invalid_string_as_height(value.to_string(), e))? + } + _ => {} + } + } + + Ok(attr) +} + +pub fn extract_header_from_tx(event: &AbciEvent) -> Result { + for tag in &event.attributes { + let key = tag.key.as_ref(); + let value = tag.value.as_ref(); + if key == HEADER_ATTRIBUTE_KEY { + return AnyHeader::decode_from_string(value); + } + } + Err(ClientError::missing_raw_header()) +} + +fn connection_extract_attributes_from_tx( + event: &AbciEvent, +) -> Result { + let mut attr = ConnectionAttributes::default(); + + for tag in &event.attributes { + let key = tag.key.as_ref(); + let value = tag.value.as_ref(); + match key { + connection_events::CONN_ID_ATTRIBUTE_KEY => { + attr.connection_id = value.parse().ok(); + } + connection_events::CLIENT_ID_ATTRIBUTE_KEY => { + attr.client_id = value.parse().map_err(ConnectionError::invalid_identifier)?; + } + connection_events::COUNTERPARTY_CONN_ID_ATTRIBUTE_KEY => { + attr.counterparty_connection_id = value.parse().ok(); + } + connection_events::COUNTERPARTY_CLIENT_ID_ATTRIBUTE_KEY => { + attr.counterparty_client_id = + value.parse().map_err(ConnectionError::invalid_identifier)?; + } + _ => {} + } + } + + Ok(attr) +} + +fn channel_extract_attributes_from_tx( + event: &AbciEvent, +) -> Result { + let mut attr = ChannelAttributes::default(); + + for tag in &event.attributes { + let key = tag.key.as_ref(); + let value = tag.value.as_ref(); + match key { + channel_events::PORT_ID_ATTRIBUTE_KEY => { + attr.port_id = value.parse().map_err(ChannelError::identifier)? + } + channel_events::CHANNEL_ID_ATTRIBUTE_KEY => { + attr.channel_id = value.parse().ok(); + } + channel_events::CONNECTION_ID_ATTRIBUTE_KEY => { + attr.connection_id = value.parse().map_err(ChannelError::identifier)?; + } + channel_events::COUNTERPARTY_PORT_ID_ATTRIBUTE_KEY => { + attr.counterparty_port_id = value.parse().map_err(ChannelError::identifier)?; + } + channel_events::COUNTERPARTY_CHANNEL_ID_ATTRIBUTE_KEY => { + attr.counterparty_channel_id = value.parse().ok(); + } + _ => {} + } + } + + Ok(attr) +} + +fn extract_packet_and_write_ack_from_tx( + event: &AbciEvent, +) -> Result<(Packet, Vec), ChannelError> { + let mut packet = Packet::default(); + let mut write_ack: Vec = Vec::new(); + for tag in &event.attributes { + let key = tag.key.as_ref(); + let value = tag.value.as_ref(); + match key { + channel_events::PKT_SRC_PORT_ATTRIBUTE_KEY => { + packet.source_port = value.parse().map_err(ChannelError::identifier)?; + } + channel_events::PKT_SRC_CHANNEL_ATTRIBUTE_KEY => { + packet.source_channel = value.parse().map_err(ChannelError::identifier)?; + } + channel_events::PKT_DST_PORT_ATTRIBUTE_KEY => { + packet.destination_port = value.parse().map_err(ChannelError::identifier)?; + } + channel_events::PKT_DST_CHANNEL_ATTRIBUTE_KEY => { + packet.destination_channel = value.parse().map_err(ChannelError::identifier)?; + } + channel_events::PKT_SEQ_ATTRIBUTE_KEY => { + packet.sequence = value + .parse::() + .map_err(|e| ChannelError::invalid_string_as_sequence(value.to_string(), e))? + .into() + } + channel_events::PKT_TIMEOUT_HEIGHT_ATTRIBUTE_KEY => { + packet.timeout_height = parse_timeout_height(value)?; + } + channel_events::PKT_TIMEOUT_TIMESTAMP_ATTRIBUTE_KEY => { + packet.timeout_timestamp = value.parse().unwrap(); + } + channel_events::PKT_DATA_ATTRIBUTE_KEY => { + packet.data = Vec::from(value.as_bytes()); + } + channel_events::PKT_ACK_ATTRIBUTE_KEY => { + write_ack = Vec::from(value.as_bytes()); + } + _ => {} + } + } + + Ok((packet, write_ack)) +} + +/// Parse a string into a timeout height expected to be stored in +/// `Packet.timeout_height`. We need to parse the timeout height differently +/// because of a quirk introduced in ibc-go. See comment in +/// `TryFrom for Packet`. +pub fn parse_timeout_height(s: &str) -> Result { + match s.parse::() { + Ok(height) => Ok(TimeoutHeight::from(height)), + Err(e) => match e.into_detail() { + HeightErrorDetail::ZeroHeight(_) => Ok(TimeoutHeight::no_timeout()), + _ => Err(ChannelError::invalid_timeout_height()), + }, + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use ibc::core::ics02_client::client_type::ClientType; + use ibc::core::ics02_client::header::Header; + use ibc::core::ics04_channel::packet::Sequence; + use ibc::mock::header::MockHeader; + use ibc::timestamp::Timestamp; + + #[test] + fn client_event_to_abci_event() { + let consensus_height = Height::new(1, 1).unwrap(); + let attributes = ClientAttributes { + client_id: "test_client".parse().unwrap(), + client_type: ClientType::Tendermint, + consensus_height, + }; + let mut abci_events = vec![]; + let create_client = client_events::CreateClient::from(attributes.clone()); + abci_events.push(AbciEvent::from(create_client.clone())); + let client_misbehaviour = client_events::ClientMisbehaviour::from(attributes.clone()); + abci_events.push(AbciEvent::from(client_misbehaviour.clone())); + let upgrade_client = client_events::UpgradeClient::from(attributes.clone()); + abci_events.push(AbciEvent::from(upgrade_client.clone())); + let mut update_client = client_events::UpdateClient::from(attributes); + let header = MockHeader::new(consensus_height).wrap_any(); + update_client.header = Some(header); + abci_events.push(AbciEvent::from(update_client.clone())); + + for abci_event in abci_events { + match ibc_event_try_from_abci_event(&abci_event).ok() { + Some(ibc_event) => match ibc_event { + IbcEvent::CreateClient(e) => assert_eq!(e.0, create_client.0), + IbcEvent::ClientMisbehaviour(e) => assert_eq!(e.0, client_misbehaviour.0), + IbcEvent::UpgradeClient(e) => assert_eq!(e.0, upgrade_client.0), + IbcEvent::UpdateClient(e) => { + assert_eq!(e.common, update_client.common); + assert_eq!(e.header, update_client.header); + } + _ => panic!("unexpected event type"), + }, + None => panic!("converted event was wrong"), + } + } + } + + #[test] + fn connection_event_to_abci_event() { + let attributes = ConnectionAttributes { + connection_id: Some("test_connection".parse().unwrap()), + client_id: "test_client".parse().unwrap(), + counterparty_connection_id: Some("counterparty_test_conn".parse().unwrap()), + counterparty_client_id: "counterparty_test_client".parse().unwrap(), + }; + let mut abci_events = vec![]; + let open_init = connection_events::OpenInit::from(attributes.clone()); + abci_events.push(AbciEvent::from(open_init.clone())); + let open_try = connection_events::OpenTry::from(attributes.clone()); + abci_events.push(AbciEvent::from(open_try.clone())); + let open_ack = connection_events::OpenAck::from(attributes.clone()); + abci_events.push(AbciEvent::from(open_ack.clone())); + let open_confirm = connection_events::OpenConfirm::from(attributes); + abci_events.push(AbciEvent::from(open_confirm.clone())); + + for abci_event in abci_events { + match ibc_event_try_from_abci_event(&abci_event).ok() { + Some(ibc_event) => match ibc_event { + IbcEvent::OpenInitConnection(e) => assert_eq!(e, open_init), + IbcEvent::OpenTryConnection(e) => assert_eq!(e, open_try), + IbcEvent::OpenAckConnection(e) => assert_eq!(e, open_ack), + IbcEvent::OpenConfirmConnection(e) => assert_eq!(e, open_confirm), + _ => panic!("unexpected event type"), + }, + None => panic!("converted event was wrong"), + } + } + } + + #[test] + fn channel_event_to_abci_event() { + let attributes = ChannelAttributes { + port_id: "test_port".parse().unwrap(), + channel_id: Some("channel-0".parse().unwrap()), + connection_id: "test_connection".parse().unwrap(), + counterparty_port_id: "counterparty_test_port".parse().unwrap(), + counterparty_channel_id: Some("channel-1".parse().unwrap()), + }; + let mut abci_events = vec![]; + let open_init = channel_events::OpenInit::try_from(attributes.clone()).unwrap(); + abci_events.push(AbciEvent::from(open_init.clone())); + let open_try = channel_events::OpenTry::try_from(attributes.clone()).unwrap(); + abci_events.push(AbciEvent::from(open_try.clone())); + let open_ack = channel_events::OpenAck::try_from(attributes.clone()).unwrap(); + abci_events.push(AbciEvent::from(open_ack.clone())); + let open_confirm = channel_events::OpenConfirm::try_from(attributes.clone()).unwrap(); + abci_events.push(AbciEvent::from(open_confirm.clone())); + let close_init = channel_events::CloseInit::try_from(attributes.clone()).unwrap(); + abci_events.push(AbciEvent::from(close_init.clone())); + let close_confirm = channel_events::CloseConfirm::try_from(attributes).unwrap(); + abci_events.push(AbciEvent::from(close_confirm.clone())); + + for abci_event in abci_events { + match ibc_event_try_from_abci_event(&abci_event).ok() { + Some(ibc_event) => match ibc_event { + IbcEvent::OpenInitChannel(e) => { + assert_eq!(ChannelAttributes::from(e), open_init.clone().into()) + } + IbcEvent::OpenTryChannel(e) => { + assert_eq!(ChannelAttributes::from(e), open_try.clone().into()) + } + IbcEvent::OpenAckChannel(e) => { + assert_eq!(ChannelAttributes::from(e), open_ack.clone().into()) + } + IbcEvent::OpenConfirmChannel(e) => { + assert_eq!(ChannelAttributes::from(e), open_confirm.clone().into()) + } + IbcEvent::CloseInitChannel(e) => { + assert_eq!(ChannelAttributes::from(e), close_init.clone().into()) + } + IbcEvent::CloseConfirmChannel(e) => { + assert_eq!(ChannelAttributes::from(e), close_confirm.clone().into()) + } + _ => panic!("unexpected event type"), + }, + None => panic!("converted event was wrong"), + } + } + } + + #[test] + fn packet_event_to_abci_event() { + let packet = Packet { + sequence: Sequence::from(10), + source_port: "a_test_port".parse().unwrap(), + source_channel: "channel-0".parse().unwrap(), + destination_port: "b_test_port".parse().unwrap(), + destination_channel: "channel-1".parse().unwrap(), + data: "test_data".as_bytes().to_vec(), + timeout_height: Height::new(1, 10).unwrap().into(), + timeout_timestamp: Timestamp::now(), + }; + let mut abci_events = vec![]; + let send_packet = channel_events::SendPacket { + packet: packet.clone(), + }; + abci_events.push(AbciEvent::try_from(send_packet.clone()).unwrap()); + let write_ack = channel_events::WriteAcknowledgement { + packet: packet.clone(), + ack: "test_ack".as_bytes().to_vec(), + }; + abci_events.push(AbciEvent::try_from(write_ack.clone()).unwrap()); + let ack_packet = channel_events::AcknowledgePacket { + packet: packet.clone(), + }; + abci_events.push(AbciEvent::try_from(ack_packet.clone()).unwrap()); + let timeout_packet = channel_events::TimeoutPacket { packet }; + abci_events.push(AbciEvent::try_from(timeout_packet.clone()).unwrap()); + + for abci_event in abci_events { + match ibc_event_try_from_abci_event(&abci_event).ok() { + Some(ibc_event) => match ibc_event { + IbcEvent::SendPacket(e) => assert_eq!(e.packet, send_packet.packet), + IbcEvent::WriteAcknowledgement(e) => { + assert_eq!(e.packet, write_ack.packet); + assert_eq!(e.ack, write_ack.ack); + } + IbcEvent::AcknowledgePacket(e) => assert_eq!(e.packet, ack_packet.packet), + IbcEvent::TimeoutPacket(e) => assert_eq!(e.packet, timeout_packet.packet), + _ => panic!("unexpected event type"), + }, + None => panic!("converted event was wrong"), + } + } + } +} diff --git a/relayer/src/event/monitor.rs b/relayer/src/event/monitor.rs index 70f12cfcf1..c23a2f68d6 100644 --- a/relayer/src/event/monitor.rs +++ b/relayer/src/event/monitor.rs @@ -32,6 +32,8 @@ use crate::{ mod error; pub use error::*; +use super::IbcEventWithHeight; + pub type Result = core::result::Result; mod retry_strategy { @@ -55,7 +57,7 @@ pub struct EventBatch { pub chain_id: ChainId, pub tracking_id: TrackingId, pub height: Height, - pub events: Vec, + pub events: Vec, } type SubscriptionResult = core::result::Result; @@ -420,7 +422,7 @@ impl EventMonitor { fn collect_events( chain_id: &ChainId, event: RpcEvent, -) -> impl Stream> { +) -> impl Stream> { let events = crate::event::rpc::get_all_events(chain_id, event).unwrap_or_default(); stream::iter(events).map(Ok) } @@ -439,22 +441,20 @@ fn stream_batches( .try_flatten(); // Group events by height - let grouped = try_group_while(events, |(h0, _), (h1, _)| h0 == h1); + let grouped = try_group_while(events, |ev0, ev1| ev0.height == ev1.height); // Convert each group to a batch - grouped.map_ok(move |events| { - let height = events + grouped.map_ok(move |mut events_with_heights| { + let height = events_with_heights .first() - .map(|(h, _)| h) - .copied() + .map(|ev_with_height| ev_with_height.height) .expect("internal error: found empty group"); // SAFETY: upheld by `group_while` - let mut events = events.into_iter().map(|(_, e)| e).collect::>(); - sort_events(&mut events); + sort_events(&mut events_with_heights); EventBatch { height, - events, + events: events_with_heights, chain_id: chain_id.clone(), tracking_id: TrackingId::new_uuid(), } @@ -463,8 +463,8 @@ fn stream_batches( /// Sort the given events by putting the NewBlock event first, /// and leaving the other events as is. -fn sort_events(events: &mut [IbcEvent]) { - events.sort_by(|a, b| match (a, b) { +fn sort_events(events: &mut [IbcEventWithHeight]) { + events.sort_by(|a, b| match (&a.event, &b.event) { (IbcEvent::NewBlock(_), _) => Ordering::Less, _ => Ordering::Equal, }) diff --git a/relayer/src/event/rpc.rs b/relayer/src/event/rpc.rs index cbf592a48b..4026da80ab 100644 --- a/relayer/src/event/rpc.rs +++ b/relayer/src/event/rpc.rs @@ -8,9 +8,11 @@ use ibc::core::ics04_channel::events as ChannelEvents; use ibc::core::ics24_host::identifier::ChainId; use ibc::events::IbcEvent; -use crate::chain::cosmos::types::events::{self, channel::RawObject}; +use crate::chain::cosmos::types::events::channel::RawObject; use crate::event::monitor::queries; +use super::{ibc_event_try_from_abci_event, IbcEventWithHeight}; + /// Extract IBC events from Tendermint RPC events /// /// Events originate from the following ABCI methods -> @@ -114,8 +116,8 @@ use crate::event::monitor::queries; pub fn get_all_events( chain_id: &ChainId, result: RpcEvent, -) -> Result, String> { - let mut vals: Vec<(Height, IbcEvent)> = vec![]; +) -> Result, String> { + let mut events_with_height: Vec = vec![]; let RpcEvent { data, events, @@ -131,8 +133,11 @@ pub fn get_all_events( ) .map_err(|_| String::from("tx.height: invalid header height of 0"))?; - vals.push((height, ClientEvents::NewBlock::new(height).into())); - vals.append(&mut extract_block_events(height, &events)); + events_with_height.push(IbcEventWithHeight::new( + ClientEvents::NewBlock::new(height).into(), + height, + )); + events_with_height.append(&mut extract_block_events(height, &events)); } RpcEventData::Tx { tx_result } => { let height = Height::new( @@ -142,26 +147,23 @@ pub fn get_all_events( .map_err(|_| String::from("tx_result.height: invalid header height of 0"))?; for abci_event in &tx_result.result.events { - if query == queries::ibc_client().to_string() { - if let Some(mut client_event) = events::client::try_from_tx(abci_event) { - client_event.set_height(height); - tracing::trace!("extracted ibc_client event {}", client_event); - vals.push((height, client_event)); - } - } - if query == queries::ibc_connection().to_string() { - if let Some(mut conn_event) = events::connection::try_from_tx(abci_event) { - conn_event.set_height(height); - tracing::trace!("extracted ibc_connection event {}", conn_event); - vals.push((height, conn_event)); - } - } - if query == queries::ibc_channel().to_string() { - if let Some(mut chan_event) = events::channel::try_from_tx(abci_event) { - chan_event.set_height(height); + if let Ok(ibc_event) = ibc_event_try_from_abci_event(abci_event) { + if query == queries::ibc_client().to_string() + && event_is_type_client(&ibc_event) + { + tracing::trace!("extracted ibc_client event {}", ibc_event); + events_with_height.push(IbcEventWithHeight::new(ibc_event, height)); + } else if query == queries::ibc_connection().to_string() + && event_is_type_connection(&ibc_event) + { + tracing::trace!("extracted ibc_connection event {}", ibc_event); + events_with_height.push(IbcEventWithHeight::new(ibc_event, height)); + } else if query == queries::ibc_channel().to_string() + && event_is_type_channel(&ibc_event) + { let _span = tracing::trace_span!("ibc_channel event").entered(); - tracing::trace!("extracted {}", chan_event); - if matches!(chan_event, IbcEvent::SendPacket(_)) { + tracing::trace!("extracted {}", ibc_event); + if matches!(ibc_event, IbcEvent::SendPacket(_)) { // Should be the same as the hash of tx_result.tx? if let Some(hash) = events.get("tx.hash").and_then(|values| values.get(0)) @@ -169,7 +171,8 @@ pub fn get_all_events( tracing::trace!(event = "SendPacket", "tx hash: {}", hash); } } - vals.push((height, chan_event)); + + events_with_height.push(IbcEventWithHeight::new(ibc_event, height)); } } } @@ -177,13 +180,51 @@ pub fn get_all_events( _ => {} } - Ok(vals) + Ok(events_with_height) +} + +fn event_is_type_client(ev: &IbcEvent) -> bool { + matches!( + ev, + IbcEvent::CreateClient(_) + | IbcEvent::UpdateClient(_) + | IbcEvent::UpgradeClient(_) + | IbcEvent::ClientMisbehaviour(_) + ) +} + +fn event_is_type_connection(ev: &IbcEvent) -> bool { + matches!( + ev, + IbcEvent::OpenInitConnection(_) + | IbcEvent::OpenTryConnection(_) + | IbcEvent::OpenAckConnection(_) + | IbcEvent::OpenConfirmConnection(_) + ) +} + +fn event_is_type_channel(ev: &IbcEvent) -> bool { + matches!( + ev, + IbcEvent::OpenInitChannel(_) + | IbcEvent::OpenTryChannel(_) + | IbcEvent::OpenAckChannel(_) + | IbcEvent::OpenConfirmChannel(_) + | IbcEvent::CloseInitChannel(_) + | IbcEvent::CloseConfirmChannel(_) + | IbcEvent::SendPacket(_) + | IbcEvent::ReceivePacket(_) + | IbcEvent::WriteAcknowledgement(_) + | IbcEvent::AcknowledgePacket(_) + | IbcEvent::TimeoutPacket(_) + | IbcEvent::TimeoutOnClosePacket(_) + ) } fn extract_block_events( height: Height, block_events: &HashMap>, -) -> Vec<(Height, IbcEvent)> { +) -> Vec { #[inline] fn extract_events<'a, T: TryFrom>>( height: Height, @@ -205,19 +246,19 @@ fn extract_block_events( #[inline] fn append_events>( - events: &mut Vec<(Height, IbcEvent)>, + events: &mut Vec, chan_events: Vec, height: Height, ) { events.append( &mut chan_events .into_iter() - .map(|ev| (height, ev.into())) + .map(|ev| IbcEventWithHeight::new(ev.into(), height)) .collect(), ); } - let mut events: Vec<(Height, IbcEvent)> = vec![]; + let mut events: Vec = vec![]; append_events::( &mut events, extract_events(height, block_events, "channel_open_init", "channel_id"), diff --git a/relayer/src/foreign_client.rs b/relayer/src/foreign_client.rs index 582e13cfb7..d26057237e 100644 --- a/relayer/src/foreign_client.rs +++ b/relayer/src/foreign_client.rs @@ -43,6 +43,7 @@ use crate::chain::requests::{ }; use crate::chain::tracking::TrackedMsgs; use crate::error::Error as RelayerError; +use crate::event::IbcEventWithHeight; use crate::telemetry; const MAX_MISBEHAVIOUR_CHECK_DURATION: Duration = Duration::from_secs(120); @@ -531,7 +532,10 @@ impl ForeignClient ForeignClient Result { + ) -> Result { let new_msg = self.build_create_client(options)?; let res = self @@ -654,15 +658,15 @@ impl ForeignClient Result<(), ForeignClientError> { - let event = self + let event_with_height = self .build_create_client_and_send(CreateOptions::default()) .map_err(|e| { error!("[{}] failed CreateClient: {}", self, e); e })?; - self.id = extract_client_id(&event)?.clone(); - info!("🍭 [{}] => {:#?}\n", self, event); + self.id = extract_client_id(&event_with_height.event)?.clone(); + info!("🍭 [{}] => {:#?}\n", self, event_with_height); Ok(()) } @@ -1141,7 +1145,7 @@ impl ForeignClient ForeignClient Result, ForeignClientError> { - let mut events = vec![]; + let mut events_with_heights = vec![]; for i in 0..MAX_RETRIES { thread::sleep(Duration::from_millis(100)); let result = self @@ -1192,14 +1196,14 @@ impl ForeignClient { - events = result; + events_with_heights = result; // Should break to prevent retrying uselessly. break; } } } - if events.is_empty() { + if events_with_heights.is_empty() { return Ok(None); } @@ -1207,7 +1211,7 @@ impl ForeignClient IbcEvent::UpdateClient).ok_or_else(|| { ForeignClientError::unexpected_event( self.id().clone(), @@ -1309,7 +1313,7 @@ impl ForeignClient, + mut update: Option<&UpdateClient>, ) -> Result, ForeignClientError> { thread::sleep(Duration::from_millis(100)); let span_guard = update.as_ref().map(|ev| ev.consensus_height()); @@ -1338,7 +1342,7 @@ impl ForeignClient ForeignClient ForeignClient, + update_event: Option<&UpdateClient>, ) -> MisbehaviourResults { // check evidence of misbehaviour for all updates or one - let result = match self.detect_misbehaviour(update_event.clone()) { + let result = match self.detect_misbehaviour(update_event) { Err(e) => Err(e), Ok(None) => Ok(vec![]), // no evidence found Ok(Some(detected)) => { @@ -1662,7 +1666,7 @@ mod test { "build_create_client_and_send failed (chain a) with error {:?}", res ); - assert!(matches!(res.unwrap(), IbcEvent::CreateClient(_))); + assert!(matches!(res.unwrap().event, IbcEvent::CreateClient(_))); // Create the client on chain b let res = b_client.build_create_client_and_send(Default::default()); @@ -1671,7 +1675,7 @@ mod test { "build_create_client_and_send failed (chain b) with error {:?}", res ); - assert!(matches!(res.unwrap(), IbcEvent::CreateClient(_))); + assert!(matches!(res.unwrap().event, IbcEvent::CreateClient(_))); } /// Basic test for the `build_update_client_and_send` & `build_create_client_and_send` methods. diff --git a/relayer/src/link/operational_data.rs b/relayer/src/link/operational_data.rs index c38f445348..b3767fbaf4 100644 --- a/relayer/src/link/operational_data.rs +++ b/relayer/src/link/operational_data.rs @@ -7,7 +7,6 @@ use tracing::{debug, info}; use ibc::core::ics02_client::client_state::ClientState; use ibc::core::ics04_channel::context::calculate_block_delay; -use ibc::events::IbcEvent; use ibc::Height; use crate::chain::handle::ChainHandle; @@ -16,6 +15,7 @@ use crate::chain::requests::QueryClientStateRequest; use crate::chain::requests::QueryHeight; use crate::chain::tracking::TrackedMsgs; use crate::chain::tracking::TrackingId; +use crate::event::IbcEventWithHeight; use crate::link::error::LinkError; use crate::link::RelayPath; @@ -37,15 +37,15 @@ impl fmt::Display for OperationalDataTarget { } } -/// A set of [`IbcEvent`]s that have an associated +/// A set of [`IbcEventWithHeight`]s that have an associated /// tracking number to ensure better observability. pub struct TrackedEvents { - events: Vec, + events: Vec, tracking_id: TrackingId, } impl TrackedEvents { - pub fn new(events: Vec, tracking_id: TrackingId) -> Self { + pub fn new(events: Vec, tracking_id: TrackingId) -> Self { Self { events, tracking_id, @@ -56,7 +56,7 @@ impl TrackedEvents { self.events.is_empty() } - pub fn events(&self) -> &[IbcEvent] { + pub fn events(&self) -> &[IbcEventWithHeight] { &self.events } @@ -76,7 +76,7 @@ impl TrackedEvents { /// alongside the event which generated it. #[derive(Clone)] pub struct TransitMessage { - pub event: IbcEvent, + pub event_with_height: IbcEventWithHeight, pub msg: Any, } @@ -139,7 +139,11 @@ impl OperationalData { /// Transforms `self` into the list of events accompanied with the tracking ID. pub fn into_events(self) -> TrackedEvents { - let events = self.batch.into_iter().map(|gm| gm.event).collect(); + let events = self + .batch + .into_iter() + .map(|gm| gm.event_with_height) + .collect(); TrackedEvents { events, @@ -223,7 +227,9 @@ impl OperationalData { /// Returns true iff the batch contains a packet event fn has_packet_msgs(&self) -> bool { - self.batch.iter().any(|msg| msg.event.packet().is_some()) + self.batch + .iter() + .any(|msg| msg.event_with_height.event.packet().is_some()) } /// Returns the `connection_delay` iff the connection delay for this relaying path is non-zero diff --git a/relayer/src/link/packet_events.rs b/relayer/src/link/packet_events.rs index 9b086400b0..8717c8a7ed 100644 --- a/relayer/src/link/packet_events.rs +++ b/relayer/src/link/packet_events.rs @@ -10,6 +10,7 @@ use crate::chain::handle::ChainHandle; use crate::chain::requests::{ QueryBlockRequest, QueryHeight, QueryPacketEventDataRequest, QueryTxRequest, }; +use crate::event::IbcEventWithHeight; use crate::link::error::LinkError; use crate::path::PathIdentifiers; @@ -24,7 +25,7 @@ pub fn query_packet_events_with<'a, ChainA>( path: &'a PathIdentifiers, query_fn: impl Fn(&ChainA, &PathIdentifiers, Vec, Height) -> Result, LinkError> + 'a, -) -> impl Iterator> + 'a +) -> impl Iterator> + 'a where ChainA: ChainHandle, { @@ -36,15 +37,11 @@ where .map_while(move |c| { let sequences_nrs_chunk = c.to_vec(); match query_fn(src_chain, path, sequences_nrs_chunk, query_height) { - Ok(mut events) => { + Ok(events) => { events_left_count -= c.len(); info!(events_total = %events_total_count, events_left = %events_left_count, "pulled packet data for {} events;", events.len()); - for event in events.iter_mut() { - event.set_height(query_height); - } - - Some(events) + Some(events.into_iter().map(|ev| IbcEventWithHeight::new(ev, query_height)).collect()) }, Err(e) => { warn!("encountered query failure while pulling packet data: {}", e); @@ -75,9 +72,12 @@ pub fn query_send_packet_events( height: QueryHeight::Specific(src_query_height), }; - let tx_events = src_chain + let tx_events: Vec = src_chain .query_txs(QueryTxRequest::Packet(query.clone())) - .map_err(LinkError::relayer)?; + .map_err(LinkError::relayer)? + .into_iter() + .map(|ev_with_height| ev_with_height.event) + .collect(); let recvd_sequences: Vec = tx_events .iter() @@ -132,5 +132,5 @@ pub fn query_write_ack_events( })) .map_err(|e| LinkError::query(src_chain.id(), e))?; - Ok(events_result) + Ok(events_result.into_iter().map(|ev| ev.event).collect()) } diff --git a/relayer/src/link/pending.rs b/relayer/src/link/pending.rs index 7d482c97db..63bac4ea98 100644 --- a/relayer/src/link/pending.rs +++ b/relayer/src/link/pending.rs @@ -139,7 +139,7 @@ impl PendingTxs { all_events.append(&mut events) } } - Ok(Some(all_events)) + Ok(Some(all_events.into_iter().map(|ev| ev.event).collect())) } /// Try and process one pending transaction within the given timeout duration if one diff --git a/relayer/src/link/relay_path.rs b/relayer/src/link/relay_path.rs index e72adecdba..fba2601d4d 100644 --- a/relayer/src/link/relay_path.rs +++ b/relayer/src/link/relay_path.rs @@ -26,6 +26,8 @@ use crate::chain::tracking::TrackingId; use crate::channel::error::ChannelError; use crate::channel::Channel; use crate::event::monitor::EventBatch; +use crate::event::IbcEventWithHeight; +use crate::event::PrettyEvents; use crate::foreign_client::{ForeignClient, ForeignClientError}; use crate::link::error::{self, LinkError}; use crate::link::operational_data::{ @@ -43,10 +45,7 @@ use crate::telemetry; use crate::util::queue::Queue; use ibc::{ core::{ - ics02_client::{ - events::ClientMisbehaviour as ClientMisbehaviourEvent, - events::UpdateClient as UpdateClientEvent, - }, + ics02_client::events::ClientMisbehaviour as ClientMisbehaviourEvent, ics04_channel::{ channel::{ChannelEnd, Order, State as ChannelState}, events::{SendPacket, WriteAcknowledgement}, @@ -59,7 +58,7 @@ use ibc::{ }, ics24_host::identifier::{ChannelId, ClientId, ConnectionId, PortId}, }, - events::{IbcEvent, PrettyEvents, WithBlockDataType}, + events::{IbcEvent, WithBlockDataType}, signer::Signer, timestamp::Timestamp, tx_msg::Msg, @@ -331,11 +330,14 @@ impl RelayPath { .map_err(LinkError::client) } - fn build_chan_close_confirm_from_event(&self, event: &IbcEvent) -> Result { + fn build_chan_close_confirm_from_event( + &self, + event: &IbcEventWithHeight, + ) -> Result { let src_channel_id = self.src_channel_id(); let proofs = self .src_chain() - .build_channel_proofs(self.src_port_id(), src_channel_id, event.height()) + .build_channel_proofs(self.src_port_id(), src_channel_id, event.height) .map_err(|e| LinkError::channel(ChannelError::channel_proof(e)))?; // Build the domain type message @@ -353,41 +355,41 @@ impl RelayPath { /// Only events for a port/channel matching one of the channel ends should be processed. fn filter_relaying_events( &self, - events: Vec, + events: Vec, tracking_id: TrackingId, ) -> TrackedEvents { let src_channel_id = self.src_channel_id(); let mut result = vec![]; - for event in events.into_iter() { - match &event { + for event_with_height in events.into_iter() { + match &event_with_height.event { IbcEvent::SendPacket(send_packet_ev) => { if src_channel_id == send_packet_ev.src_channel_id() && self.src_port_id() == send_packet_ev.src_port_id() { - result.push(event); + result.push(event_with_height); } } IbcEvent::WriteAcknowledgement(write_ack_ev) => { if src_channel_id == write_ack_ev.dst_channel_id() && self.src_port_id() == write_ack_ev.dst_port_id() { - result.push(event); + result.push(event_with_height); } } IbcEvent::CloseInitChannel(chan_close_ev) => { if src_channel_id == chan_close_ev.channel_id() && self.src_port_id() == chan_close_ev.port_id() { - result.push(event); + result.push(event_with_height); } } IbcEvent::TimeoutPacket(timeout_ev) => { if src_channel_id == timeout_ev.src_channel_id() && self.channel.src_port_id() == timeout_ev.src_port_id() { - result.push(event); + result.push(event_with_height); } } _ => {} @@ -442,8 +444,8 @@ impl RelayPath { // Update telemetry info telemetry!({ - for e in events.events() { - self.backlog_update(e); + for event_with_height in events.events() { + self.backlog_update(&event_with_height.event); } }); @@ -488,7 +490,7 @@ impl RelayPath { let input = events.events(); let src_height = match input.get(0) { None => return Ok((None, None)), - Some(ev) => ev.height(), + Some(ev) => ev.height, }; let dst_latest_info = self @@ -514,13 +516,14 @@ impl RelayPath { self.channel.connection_delay, ); - for event in input { - trace!("processing event: {}", event); - let (dst_msg, src_msg) = match event { - IbcEvent::CloseInitChannel(_) => { - (Some(self.build_chan_close_confirm_from_event(event)?), None) - } - IbcEvent::TimeoutPacket(ref timeout_ev) => { + for event_with_height in input { + trace!("processing event: {}", event_with_height); + let (dst_msg, src_msg) = match &event_with_height.event { + IbcEvent::CloseInitChannel(_) => ( + Some(self.build_chan_close_confirm_from_event(event_with_height)?), + None, + ), + IbcEvent::TimeoutPacket(_) => { // When a timeout packet for an ordered channel is processed on-chain (src here) // the chain closes the channel but no close init event is emitted, instead // we get a timeout packet event (this happens for both unordered and ordered channels) @@ -528,10 +531,13 @@ impl RelayPath { // to the counterparty. if self.ordered_channel() && self - .src_channel(QueryHeight::Specific(timeout_ev.height))? + .src_channel(QueryHeight::Specific(event_with_height.height))? .state_matches(&ChannelState::Closed) { - (Some(self.build_chan_close_confirm_from_event(event)?), None) + ( + Some(self.build_chan_close_confirm_from_event(event_with_height)?), + None, + ) } else { (None, None) } @@ -544,6 +550,7 @@ impl RelayPath { self.build_recv_or_timeout_from_send_packet_event( send_packet_ev, &dst_latest_info, + event_with_height.height, )? } } @@ -557,7 +564,10 @@ impl RelayPath { debug!("{} already handled", write_ack_ev); (None, None) } else { - (self.build_ack_from_recv_event(write_ack_ev)?, None) + ( + self.build_ack_from_recv_event(write_ack_ev, event_with_height.height)?, + None, + ) } } _ => (None, None), @@ -565,9 +575,9 @@ impl RelayPath { // Collect messages to be sent to the destination chain (e.g., RecvPacket) if let Some(msg) = dst_msg { - debug!("{} from {}", msg.type_url, event); + debug!("{} from {}", msg.type_url, event_with_height); dst_od.batch.push(TransitMessage { - event: event.clone(), + event_with_height: event_with_height.clone(), msg, }); } @@ -577,9 +587,9 @@ impl RelayPath { // For Ordered channels a single timeout event should be sent as this closes the channel. // Otherwise a multi message transaction will fail. if self.unordered_channel() || src_od.batch.is_empty() { - debug!("{} from {}", msg.type_url, event); + debug!("{} from {}", msg.type_url, event_with_height); src_od.batch.push(TransitMessage { - event: event.clone(), + event_with_height: event_with_height.clone(), msg, }); } @@ -843,7 +853,7 @@ impl RelayPath { client_id: ClientId, consensus_height: Height, ) -> Result { - let events = chain + let events_with_heights = chain .query_txs(QueryTxRequest::Client(QueryClientEventRequest { query_height: QueryHeight::Latest, event_id: WithBlockDataType::UpdateClient, @@ -856,9 +866,14 @@ impl RelayPath { // but the `processed_height` is the height at which the first `UpdateClient` event for this // consensus state/height was emitted. We expect that these events are received in the exact // same order in which they were emitted. - match events.first() { - Some(IbcEvent::UpdateClient(event)) => Ok(event.height()), - Some(event) => Err(LinkError::unexpected_event(event.clone())), + match events_with_heights.first() { + Some(event_with_height) => { + if matches!(&event_with_height.event, IbcEvent::UpdateClient(_)) { + Ok(event_with_height.height) + } else { + Err(LinkError::unexpected_event(event_with_height.event.clone())) + } + } None => Err(LinkError::update_client_event_not_found()), } } @@ -869,20 +884,20 @@ impl RelayPath { /// multiple variants with a single pass. #[inline] fn event_per_type( - mut tx_events: Vec, + mut tx_events: Vec, ) -> ( Option, - Option, + Option, Option, ) { let mut error = None; let mut update = None; let mut misbehaviour = None; - while let Some(event) = tx_events.pop() { - match event { - IbcEvent::ChainError(_) => error = Some(event), - IbcEvent::UpdateClient(event) => update = Some(event), + while let Some(event_with_height) = tx_events.pop() { + match event_with_height.event { + IbcEvent::ChainError(_) => error = Some(event_with_height.event), + IbcEvent::UpdateClient(_) => update = Some(event_with_height.height), IbcEvent::ClientMisbehaviour(event) => misbehaviour = Some(event), _ => {} } @@ -941,12 +956,12 @@ impl RelayPath { .send_messages_and_wait_commit(tm) .map_err(LinkError::relayer)?; - info!("result: {}", PrettyEvents(&dst_tx_events)); + info!("result: {}", PrettyEvents(dst_tx_events.as_slice())); let (error, update, misbehaviour) = Self::event_per_type(dst_tx_events); match (error, update, misbehaviour) { // All updates were successful, no errors and no misbehaviour. - (None, Some(update_event), None) => Ok(update_event.height()), + (None, Some(update_event_height), None) => Ok(update_event_height), (Some(chain_error), _, _) => { // Atleast one chain-error so retry if possible. if retries_left == 0 { @@ -1008,12 +1023,12 @@ impl RelayPath { .send_messages_and_wait_commit(tm) .map_err(LinkError::relayer)?; - info!("result: {}", PrettyEvents(&src_tx_events)); + info!("result: {}", PrettyEvents(src_tx_events.as_slice())); let (error, update, misbehaviour) = Self::event_per_type(src_tx_events); match (error, update, misbehaviour) { // All updates were successful, no errors and no misbehaviour. - (None, Some(update_event), None) => Ok(update_event.height()), + (None, Some(update_event_height), None) => Ok(update_event_height), (Some(chain_error), _, _) => { // Atleast one chain-error so retry if possible. if retries_left == 0 { @@ -1090,8 +1105,8 @@ impl RelayPath { ) { // Update telemetry info telemetry!({ - for e in events_chunk.clone() { - self.record_cleared_send_packet(e); + for event_with_height in events_chunk.iter() { + self.record_cleared_send_packet(event_with_height); } }); self.events_to_operational_data(TrackedEvents::new(events_chunk, tracking_id))?; @@ -1138,7 +1153,7 @@ impl RelayPath { &self.path_id, query_write_ack_events, ) { - telemetry!(self.record_cleared_acknowledgments(events_chunk.clone())); + telemetry!(self.record_cleared_acknowledgments(events_chunk.iter())); self.events_to_operational_data(TrackedEvents::new(events_chunk, tracking_id))?; } @@ -1171,6 +1186,7 @@ impl RelayPath { fn build_ack_from_recv_event( &self, event: &WriteAcknowledgement, + height: Height, ) -> Result, LinkError> { let packet = event.packet.clone(); @@ -1181,7 +1197,7 @@ impl RelayPath { &packet.destination_port, &packet.destination_channel, packet.sequence, - event.height, + height, ) .map_err(|e| LinkError::packet_proofs_constructor(self.src_chain().id(), e))?; @@ -1308,12 +1324,13 @@ impl RelayPath { &self, event: &SendPacket, dst_info: &ChainStatus, + height: Height, ) -> Result<(Option, Option), LinkError> { let timeout = self.build_timeout_from_send_packet_event(event, dst_info)?; if timeout.is_some() { Ok((None, timeout)) } else { - Ok((self.build_recv_packet(&event.packet, event.height)?, None)) + Ok((self.build_recv_packet(&event.packet, height)?, None)) } } @@ -1524,9 +1541,11 @@ impl RelayPath { let mut retain_batch = vec![]; for gm in odata.batch.iter() { - let TransitMessage { event, .. } = gm; + let TransitMessage { + event_with_height, .. + } = gm; - match event { + match &event_with_height.event { IbcEvent::SendPacket(e) => { // Catch any SendPacket event that timed-out if self.send_packet_event_handled(e)? { @@ -1546,7 +1565,7 @@ impl RelayPath { ) }) .push(TransitMessage { - event: event.clone(), + event_with_height: event_with_height.clone(), msg: new_msg, }); } else { @@ -1771,11 +1790,11 @@ impl RelayPath { } #[cfg(feature = "telemetry")] - fn record_cleared_send_packet(&self, event: IbcEvent) { - if let IbcEvent::SendPacket(send_packet_ev) = event { + fn record_cleared_send_packet(&self, event_with_height: &IbcEventWithHeight) { + if let IbcEvent::SendPacket(send_packet_ev) = &event_with_height.event { ibc_telemetry::global().send_packet_events( send_packet_ev.packet.sequence.into(), - send_packet_ev.height().revision_height(), + event_with_height.height.revision_height(), &self.src_chain().id(), self.src_channel_id(), self.src_port_id(), @@ -1783,7 +1802,7 @@ impl RelayPath { ); ibc_telemetry::global().cleared_send_packet_events( send_packet_ev.packet.sequence.into(), - send_packet_ev.height().revision_height(), + event_with_height.height.revision_height(), &self.src_chain().id(), self.src_channel_id(), self.src_port_id(), @@ -1793,12 +1812,15 @@ impl RelayPath { } #[cfg(feature = "telemetry")] - fn record_cleared_acknowledgments(&self, events: Vec) { - for e in events { - if let IbcEvent::WriteAcknowledgement(write_ack_ev) = e { + fn record_cleared_acknowledgments<'a>( + &self, + events_with_heights: impl Iterator, + ) { + for event_with_height in events_with_heights { + if let IbcEvent::WriteAcknowledgement(write_ack_ev) = &event_with_height.event { ibc_telemetry::global().cleared_acknowledgment_events( write_ack_ev.packet.sequence.into(), - write_ack_ev.height().revision_height(), + event_with_height.height.revision_height(), &self.dst_chain().id(), self.src_channel_id(), self.src_port_id(), diff --git a/relayer/src/link/relay_sender.rs b/relayer/src/link/relay_sender.rs index 28b34cd3fc..a6e45b78b7 100644 --- a/relayer/src/link/relay_sender.rs +++ b/relayer/src/link/relay_sender.rs @@ -3,10 +3,11 @@ use core::fmt; use tendermint_rpc::endpoint::broadcast::tx_sync; use tracing::info; -use ibc::events::{IbcEvent, PrettyEvents}; +use ibc::events::IbcEvent; use crate::chain::handle::ChainHandle; use crate::chain::tracking::TrackedMsgs; +use crate::event::PrettyEvents; use crate::link::error::LinkError; use crate::link::RelaySummary; @@ -52,17 +53,22 @@ impl Submit for SyncSender { info!( "[Sync->{}] result {}\n", target.id(), - PrettyEvents(&tx_events) + PrettyEvents(tx_events.as_slice()) ); let ev = tx_events .clone() .into_iter() - .find(|event| matches!(event, IbcEvent::ChainError(_))); + .find(|event_with_height| matches!(event_with_height.event, IbcEvent::ChainError(_))); match ev { - Some(ev) => Err(LinkError::send(ev)), - None => Ok(RelaySummary::from_events(tx_events)), + Some(ev) => Err(LinkError::send(ev.event)), + None => Ok(RelaySummary::from_events( + tx_events + .into_iter() + .map(|event_with_height| event_with_height.event) + .collect(), + )), } } } diff --git a/relayer/src/supervisor.rs b/relayer/src/supervisor.rs index cb14caa23d..b58d241548 100644 --- a/relayer/src/supervisor.rs +++ b/relayer/src/supervisor.rs @@ -18,7 +18,10 @@ use ibc::{ use crate::{ chain::{endpoint::HealthCheck, handle::ChainHandle, tracking::TrackingId}, config::Config, - event::monitor::{self, Error as EventError, ErrorDetail as EventErrorDetail, EventBatch}, + event::{ + monitor::{self, Error as EventError, ErrorDetail as EventErrorDetail, EventBatch}, + IbcEventWithHeight, + }, object::Object, registry::{Registry, SharedRegistry}, rest, @@ -356,7 +359,7 @@ fn relay_on_object( /// and add the given `event` to the `collected` events for this `object`. fn collect_event( collected: &mut CollectedEvents, - event: &IbcEvent, + event_with_height: IbcEventWithHeight, enabled: bool, object_ctor: F, ) where @@ -368,7 +371,7 @@ fn collect_event( .per_object .entry(object) .or_default() - .push(event.clone()); + .push(event_with_height); } } } @@ -384,75 +387,118 @@ pub fn collect_events( let mode = config.mode; - for event in &batch.events { - match event { + for event_with_height in &batch.events { + match &event_with_height.event { IbcEvent::NewBlock(_) => { - collected.new_block = Some(event.clone()); + collected.new_block = Some(event_with_height.event.clone()); } - IbcEvent::UpdateClient(ref update) => { - collect_event(&mut collected, event, mode.clients.enabled, || { - // Collect update client events only if the worker exists - if let Ok(object) = Object::for_update_client(update, src_chain) { - workers.contains(&object).then(|| object) - } else { - None - } - }); + IbcEvent::UpdateClient(update) => { + collect_event( + &mut collected, + event_with_height.clone(), + mode.clients.enabled, + || { + // Collect update client events only if the worker exists + if let Ok(object) = Object::for_update_client(update, src_chain) { + workers.contains(&object).then(|| object) + } else { + None + } + }, + ); } IbcEvent::OpenInitConnection(..) | IbcEvent::OpenTryConnection(..) | IbcEvent::OpenAckConnection(..) => { - collect_event(&mut collected, event, mode.connections.enabled, || { - event.connection_attributes().and_then(|attr| { - Object::connection_from_conn_open_events(attr, src_chain).ok() - }) - }); + collect_event( + &mut collected, + event_with_height.clone(), + mode.connections.enabled, + || { + event_with_height + .event + .connection_attributes() + .and_then(|attr| { + Object::connection_from_conn_open_events(attr, src_chain).ok() + }) + }, + ); } IbcEvent::OpenInitChannel(..) | IbcEvent::OpenTryChannel(..) => { - collect_event(&mut collected, event, mode.channels.enabled, || { - event.clone().channel_attributes().and_then(|attr| { - Object::channel_from_chan_open_events(&attr, src_chain).ok() - }) - }); + collect_event( + &mut collected, + event_with_height.clone(), + mode.channels.enabled, + || { + event_with_height + .event + .clone() + .channel_attributes() + .and_then(|attr| { + Object::channel_from_chan_open_events(&attr, src_chain).ok() + }) + }, + ); } IbcEvent::OpenAckChannel(open_ack) => { // Create client and packet workers here as channel end must be opened let attributes = open_ack.clone().into(); - collect_event(&mut collected, event, mode.clients.enabled, || { - Object::client_from_chan_open_events(&attributes, src_chain).ok() - }); + collect_event( + &mut collected, + event_with_height.clone(), + mode.clients.enabled, + || Object::client_from_chan_open_events(&attributes, src_chain).ok(), + ); // If handshake message relaying is enabled create worker to send the MsgChannelOpenConfirm message - collect_event(&mut collected, event, mode.channels.enabled, || { - Object::channel_from_chan_open_events(&attributes, src_chain).ok() - }); + collect_event( + &mut collected, + event_with_height.clone(), + mode.channels.enabled, + || Object::channel_from_chan_open_events(&attributes, src_chain).ok(), + ); } IbcEvent::OpenConfirmChannel(open_confirm) => { let attributes = open_confirm.clone().into(); // Create client worker here as channel end must be opened - collect_event(&mut collected, event, mode.clients.enabled, || { - Object::client_from_chan_open_events(&attributes, src_chain).ok() - }); + collect_event( + &mut collected, + event_with_height.clone(), + mode.clients.enabled, + || Object::client_from_chan_open_events(&attributes, src_chain).ok(), + ); } IbcEvent::SendPacket(ref packet) => { - collect_event(&mut collected, event, mode.packets.enabled, || { - Object::for_send_packet(packet, src_chain).ok() - }); + collect_event( + &mut collected, + event_with_height.clone(), + mode.packets.enabled, + || Object::for_send_packet(packet, src_chain).ok(), + ); } IbcEvent::TimeoutPacket(ref packet) => { - collect_event(&mut collected, event, mode.packets.enabled, || { - Object::for_timeout_packet(packet, src_chain).ok() - }); + collect_event( + &mut collected, + event_with_height.clone(), + mode.packets.enabled, + || Object::for_timeout_packet(packet, src_chain).ok(), + ); } IbcEvent::WriteAcknowledgement(ref packet) => { - collect_event(&mut collected, event, mode.packets.enabled, || { - Object::for_write_ack(packet, src_chain).ok() - }); + collect_event( + &mut collected, + event_with_height.clone(), + mode.packets.enabled, + || Object::for_write_ack(packet, src_chain).ok(), + ); } IbcEvent::CloseInitChannel(ref packet) => { - collect_event(&mut collected, event, mode.packets.enabled, || { - Object::for_close_init_channel(packet, src_chain).ok() - }); + collect_event( + &mut collected, + event_with_height.clone(), + mode.packets.enabled, + || Object::for_close_init_channel(packet, src_chain).ok(), + ); } _ => (), } @@ -619,7 +665,7 @@ fn process_batch( } // Forward the IBC events. - for (object, events) in collected.per_object.into_iter() { + for (object, events_with_heights) in collected.per_object.into_iter() { if !relay_on_object( config, registry, @@ -636,7 +682,7 @@ fn process_batch( continue; } - if events.is_empty() { + if events_with_heights.is_empty() { continue; } @@ -651,12 +697,12 @@ fn process_batch( if let Object::Packet(_path) = object.clone() { // Update telemetry info telemetry!({ - for e in events.clone() { - match e { + for event_with_height in events_with_heights.iter() { + match &event_with_height.event { IbcEvent::SendPacket(send_packet_ev) => { ibc_telemetry::global().send_packet_events( send_packet_ev.packet.sequence.into(), - send_packet_ev.height().revision_height(), + event_with_height.height.revision_height(), &src.id(), &_path.src_channel_id, &_path.src_port_id, @@ -666,7 +712,7 @@ fn process_batch( IbcEvent::WriteAcknowledgement(write_ack_ev) => { ibc_telemetry::global().acknowledgement_events( write_ack_ev.packet.sequence.into(), - write_ack_ev.height().revision_height(), + event_with_height.height.revision_height(), &dst.id(), &_path.src_channel_id, &_path.src_port_id, @@ -691,7 +737,7 @@ fn process_batch( worker.send_events( batch.height, - events, + events_with_heights, batch.chain_id.clone(), batch.tracking_id, ); @@ -747,7 +793,7 @@ pub struct CollectedEvents { /// collected from the [`EventBatch`]. pub new_block: Option, /// Mapping between [`Object`]s and their associated [`IbcEvent`]s. - pub per_object: HashMap>, + pub per_object: HashMap>, /// Unique identifier for tracking this event batch pub tracking_id: TrackingId, } diff --git a/relayer/src/transfer.rs b/relayer/src/transfer.rs index cc345681c1..dee74b46d5 100644 --- a/relayer/src/transfer.rs +++ b/relayer/src/transfer.rs @@ -190,19 +190,19 @@ pub fn build_and_send_transfer_messages Ok(events), + None => Ok(events_with_heights.into_iter().map(|ev| ev.event).collect()), Some(err) => { - if let IbcEvent::ChainError(err) = err { + if let IbcEvent::ChainError(ref err) = err.event { Err(TransferError::tx_response(err.clone())) } else { panic!( diff --git a/relayer/src/worker/channel.rs b/relayer/src/worker/channel.rs index aed739e1aa..ffb31da0e0 100644 --- a/relayer/src/worker/channel.rs +++ b/relayer/src/worker/channel.rs @@ -31,16 +31,16 @@ pub fn spawn_channel_worker( let last_event = batch.events.last(); debug!("starts processing {:#?}", last_event); - if let Some(event) = last_event { + if let Some(event_with_height) = last_event { let mut handshake_channel = RelayChannel::restore_from_event( chains.a.clone(), chains.b.clone(), - event.clone(), + event_with_height.event.clone(), ) .map_err(|e| TaskError::Fatal(RunError::channel(e)))?; retry_with_index(retry_strategy::worker_default_strategy(), |index| { - handshake_channel.step_event(event.clone(), index) + handshake_channel.step_event(&event_with_height.event, index) }) .map_err(|e| TaskError::Fatal(RunError::retry(e))) } else { diff --git a/relayer/src/worker/client.rs b/relayer/src/worker/client.rs index ab26bae4e3..5dcf160964 100644 --- a/relayer/src/worker/client.rs +++ b/relayer/src/worker/client.rs @@ -91,8 +91,8 @@ pub fn detect_misbehavior_task( WorkerCmd::IbcEvents { batch } => { trace!("received batch: {:?}", batch); - for event in batch.events { - if let IbcEvent::UpdateClient(update) = event { + for event_with_height in batch.events { + if let IbcEvent::UpdateClient(ref update) = event_with_height.event { debug!("checking misbehavior for updated client"); let misbehavior_result = client.detect_misbehaviour_and_submit_evidence(Some(update)); diff --git a/relayer/src/worker/connection.rs b/relayer/src/worker/connection.rs index 1ddf8912f7..0e144de5a7 100644 --- a/relayer/src/worker/connection.rs +++ b/relayer/src/worker/connection.rs @@ -28,20 +28,20 @@ pub fn spawn_connection_worker( WorkerCmd::IbcEvents { batch } => { // there can be up to two event for this connection, e.g. init and try. // process the last event, the one with highest "rank". - let last_event = batch.events.last(); + let last_event_with_height = batch.events.last(); - debug!("starts processing {:#?}", last_event); + debug!("starts processing {:#?}", last_event_with_height); - if let Some(event) = last_event { + if let Some(event_with_height) = last_event_with_height { let mut handshake_connection = RelayConnection::restore_from_event( chains.a.clone(), chains.b.clone(), - event.clone(), + &event_with_height.event, ) .map_err(|e| TaskError::Fatal(RunError::connection(e)))?; retry_with_index(retry_strategy::worker_default_strategy(), |index| { - handshake_connection.step_event(event.clone(), index) + handshake_connection.step_event(&event_with_height.event, index) }) .map_err(|e| TaskError::Fatal(RunError::retry(e))) } else { diff --git a/relayer/src/worker/handle.rs b/relayer/src/worker/handle.rs index 2ef8565bf5..2871373905 100644 --- a/relayer/src/worker/handle.rs +++ b/relayer/src/worker/handle.rs @@ -8,11 +8,11 @@ use tracing::{debug, trace}; use ibc::{ core::{ics02_client::events::NewBlock, ics24_host::identifier::ChainId}, - events::IbcEvent, Height, }; use crate::chain::tracking::TrackingId; +use crate::event::IbcEventWithHeight; use crate::util::lock::{LockExt, RwArc}; use crate::util::task::TaskHandle; use crate::{event::monitor::EventBatch, object::Object}; @@ -67,7 +67,7 @@ impl WorkerHandle { pub fn send_events( &self, height: Height, - events: Vec, + events: Vec, chain_id: ChainId, tracking_id: TrackingId, ) { diff --git a/tools/integration-test/src/tests/error_events.rs b/tools/integration-test/src/tests/error_events.rs index 5d84c72d36..73299c94d1 100644 --- a/tools/integration-test/src/tests/error_events.rs +++ b/tools/integration-test/src/tests/error_events.rs @@ -52,8 +52,8 @@ impl BinaryChannelTest for ErrorEventsTest { assert_eq!(events.len(), 4); - for event in events { - match event { + for event_with_height in events { + match event_with_height.event { IbcEvent::ChainError(_) => {} _ => { panic!("expect all events to be error events"); diff --git a/tools/test-framework/src/bootstrap/binary/chain.rs b/tools/test-framework/src/bootstrap/binary/chain.rs index fed3347c3e..5ac5be8cc9 100644 --- a/tools/test-framework/src/bootstrap/binary/chain.rs +++ b/tools/test-framework/src/bootstrap/binary/chain.rs @@ -123,7 +123,7 @@ pub fn bootstrap_foreign_client( ForeignClient::restore(ClientId::default(), chain_b.clone(), chain_a.clone()); let event = foreign_client.build_create_client_and_send(client_options)?; - let client_id = extract_client_id(&event)?.clone(); + let client_id = extract_client_id(&event.event)?.clone(); info!( "created foreign client from chain {} to chain {} with client id {} on chain {}", diff --git a/tools/test-framework/src/ibc/denom.rs b/tools/test-framework/src/ibc/denom.rs index 55e1debd0b..f3b1e44227 100644 --- a/tools/test-framework/src/ibc/denom.rs +++ b/tools/test-framework/src/ibc/denom.rs @@ -35,8 +35,7 @@ pub type TaggedDenom = MonoTagged; pub type TaggedDenomRef<'a, Chain> = MonoTagged; /** - A tagged version of [`derive_ibc_denom`](token_transfer::derive_ibc_denom) - from the [`ibc`] module. + A tagged version of `derive_ibc_denom` from the [`ibc`] module. Derives the denom on `ChainB` based on a denom on `ChainA` that has been transferred to `ChainB` via IBC. diff --git a/tools/test-framework/src/relayer/chain.rs b/tools/test-framework/src/relayer/chain.rs index f63ecb437e..77bbf52753 100644 --- a/tools/test-framework/src/relayer/chain.rs +++ b/tools/test-framework/src/relayer/chain.rs @@ -63,6 +63,7 @@ use ibc_relayer::config::ChainConfig; use ibc_relayer::connection::ConnectionMsgType; use ibc_relayer::denom::DenomTrace; use ibc_relayer::error::Error; +use ibc_relayer::event::IbcEventWithHeight; use ibc_relayer::keyring::KeyEntry; use crate::types::tagged::*; @@ -100,7 +101,7 @@ where fn send_messages_and_wait_commit( &self, tracked_msgs: TrackedMsgs, - ) -> Result, Error> { + ) -> Result, Error> { self.value().send_messages_and_wait_commit(tracked_msgs) } @@ -379,7 +380,7 @@ where self.value().query_unreceived_acknowledgements(request) } - fn query_txs(&self, request: QueryTxRequest) -> Result, Error> { + fn query_txs(&self, request: QueryTxRequest) -> Result, Error> { self.value().query_txs(request) } diff --git a/tools/test-framework/src/relayer/connection.rs b/tools/test-framework/src/relayer/connection.rs index 738d9112fc..2207093336 100644 --- a/tools/test-framework/src/relayer/connection.rs +++ b/tools/test-framework/src/relayer/connection.rs @@ -79,7 +79,7 @@ pub fn init_connection( let connection_id = extract_connection_id(&event)?.clone(); - let connection2 = Connection::restore_from_event(handle_b.clone(), handle_a.clone(), event)?; + let connection2 = Connection::restore_from_event(handle_b.clone(), handle_a.clone(), &event)?; Ok((DualTagged::new(connection_id), connection2)) } diff --git a/tools/test-framework/src/relayer/tx.rs b/tools/test-framework/src/relayer/tx.rs index 9f5c1e0171..1cce4ca4f3 100644 --- a/tools/test-framework/src/relayer/tx.rs +++ b/tools/test-framework/src/relayer/tx.rs @@ -118,7 +118,7 @@ pub async fn simple_send_tx( for result in tx_sync_results.iter() { for event in result.events.iter() { - if let IbcEvent::ChainError(e) = event { + if let IbcEvent::ChainError(ref e) = event.event { return Err(Error::generic(eyre!("send_tx result in error: {}", e))); } }