Skip to content

Commit

Permalink
Metric updates when timeout occurs (informalsystems#2457)
Browse files Browse the repository at this point in the history
* clears telemetry sequence history on timeout

* change description to incorporate the fact that oldest can also be waiting for a timeout

* Delete json_encoder.rs

* update guide

* replace match by if let and replace e by event

* fmt

* changelog entry

* Rename 2429-fix-oldest-sequence-timeout.md to 2451-fix-oldest-sequence-timeout.md

* Update 2451-fix-oldest-sequence-timeout.md

* Update 2451-fix-oldest-sequence-timeout.md

* Renamed oldest_* metrics to backlog_*, refactored assoc. methods

* Fix for informalsystems#2467 w/ Ali + refactor

Co-authored-by: Adi Seredinschi <adi@informal.systems>
  • Loading branch information
AlianBenabdallah and adizere committed Jul 28, 2022
1 parent e63adea commit 3db5ee0
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 78 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
- Ensure `oldest_sequence` and `oldest_timestamp` metrics are updated when a timeout occurs
([#2451](https://github.com/informalsystems/ibc-rs/issues/2451))
6 changes: 3 additions & 3 deletions guide/src/telemetry.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,9 @@ The following table describes the metrics currently tracked by the telemetry ser
| `acknowledgement_count` | Number of WriteAcknowledgement events processed  | `u64` Counter |
| `cleared_send_packet_count`  | Number of SendPacket events processed during the initial and periodic clearing | `u64` Counter  |
| `cleared_acknowledgment_count` | Number of WriteAcknowledgement events processed during the initial and periodic clearing | `u64` Counter  |
| `oldest_sequence` | The sequence number of the oldest SendPacket event observed without its corresponding WriteAcknowledgement event. If this value is 0, it means Hermes observed a WriteAcknowledgment event for all the SendPacket events | `u64` ValueRecorder |
| `oldest_timestamp` | The timestamp of the oldest sequence number in seconds | `u64` ValueRecorder |
| `backlog_oldest_sequence` | Sequence number of the oldest pending packet in the backlog, per channel | `u64` ValueRecorder |
| `backlog_oldest_timestamp` | Local timestamp for the oldest pending packet in the backlog, per channel | `u64` ValueRecorder |
| `backlog_size` | Total number of pending packets, per channel | `u64` ValueRecorder |

## Integration with Prometheus

Expand Down Expand Up @@ -188,4 +189,3 @@ workers{type="wallet"} 2
ws_events{chain="ibc-0"} 57
ws_events{chain="ibc-1"} 37
```

19 changes: 13 additions & 6 deletions relayer/src/link/relay_path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> RelayPath<ChainA, ChainB> {
// Update telemetry info
telemetry!({
for e in events.events() {
self.record_send_packet_and_acknowledgment_history(e);
self.backlog_update(e);
}
});

Expand Down Expand Up @@ -1737,28 +1737,35 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> RelayPath<ChainA, ChainB> {
}

#[cfg(feature = "telemetry")]
fn record_send_packet_and_acknowledgment_history(&self, event: &IbcEvent) {
fn backlog_update(&self, event: &IbcEvent) {
match event {
IbcEvent::SendPacket(send_packet_ev) => {
ibc_telemetry::global().record_send_history(
ibc_telemetry::global().backlog_insert(
send_packet_ev.packet.sequence.into(),
send_packet_ev.height().revision_height(),
&self.src_chain().id(),
self.src_channel_id(),
self.src_port_id(),
&self.dst_chain().id(),
);
}
IbcEvent::WriteAcknowledgement(write_ack_ev) => {
ibc_telemetry::global().record_ack_history(
ibc_telemetry::global().backlog_remove(
write_ack_ev.packet.sequence.into(),
write_ack_ev.height().revision_height(),
&self.dst_chain().id(),
self.dst_channel_id(),
self.dst_port_id(),
&self.src_chain().id(),
);
}
IbcEvent::TimeoutPacket(timeout_packet) => {
ibc_telemetry::global().backlog_remove(
timeout_packet.packet.sequence.into(),
&self.src_chain().id(),
self.src_channel_id(),
self.src_port_id(),
&self.dst_chain().id(),
);
}
_ => {}
}
}
Expand Down
5 changes: 2 additions & 3 deletions telemetry/src/path_identifier.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
/// Structure used by the telemetry in order to define a UID
/// to track the SendPacket and WriteAcknowledgement for a given
/// to track the SendPacket and WriteAcknowledgement and Timeouts for a given
/// chain, channel and port.

#[derive(Hash, PartialEq, Eq)]
#[derive(Debug, Hash, PartialEq, Eq)]
pub struct PathIdentifier {
chain_id: String,
channel_id: String,
Expand Down
174 changes: 108 additions & 66 deletions telemetry/src/state.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use core::fmt;
use std::time::{Duration, Instant};

use dashmap::DashMap;
use opentelemetry::{
global,
metrics::{Counter, UpDownCounter, ValueRecorder},
Expand All @@ -12,13 +13,11 @@ use prometheus::proto::MetricFamily;
use ibc::core::ics24_host::identifier::{ChainId, ChannelId, ClientId, PortId};
use tendermint::Time;

use dashmap::DashMap;

use crate::path_identifier::PathIdentifier;

const NO_PENDING_PACKETS: u64 = 0;
const HISTORY_SET_CAPACITY: usize = 1000;
const HISTORY_RESET_THRESHOLD: usize = 900;
const EMPTY_BACKLOG_SYMBOL: u64 = 0;
const BACKLOG_CAPACITY: usize = 1000;
const BACKLOG_RESET_THRESHOLD: usize = 900;

#[derive(Copy, Clone, Debug)]
pub enum WorkerType {
Expand Down Expand Up @@ -94,10 +93,10 @@ pub struct TelemetryState {
/// Used for computing the `tx_latency` metric.
in_flight_events: moka::sync::Cache<String, Instant>,

/// Counts the number of SendPacket Hermes transfers.
/// Counts the number of SendPacket Hermes relays.
send_packet_count: Counter<u64>,

/// Counts the number of WriteAcknowledgement Hermes transfers.
/// Counts the number of WriteAcknowledgement Hermes relays.
acknowledgement_count: Counter<u64>,

/// Counts the number of SendPacket events Hermes processes from ClearPendingPackets.
Expand All @@ -106,17 +105,27 @@ pub struct TelemetryState {
/// Counts the number of WriteAcknowledgment events Hermes processes from ClearPendingPackets.
cleared_acknowledgment_count: Counter<u64>,

/// Records the sequence number of the oldest SendPacket for which no
/// WriteAcknowledgement has been received. The value is 0 if all the
/// WriteAcknowledgement were received.
oldest_sequence: ValueRecorder<u64>,
/// Records the sequence number of the oldest pending packet. This corresponds to
/// the sequence number of the oldest SendPacket event for which no
/// WriteAcknowledgement or Timeout events have been received. The value is 0 if all the
/// SendPacket events were relayed.
backlog_oldest_sequence: ValueRecorder<u64>,

/// Record the timestamp related to the oldest sequence number.
/// Record the timestamp related to `backlog_oldest_sequence`.
/// The timestamp is the time passed since since the unix epoch in seconds.
oldest_timestamp: ValueRecorder<u64>,

/// History of SendPacket sequence numbers received and not yet Acknowledged.
sequences_histories: DashMap<PathIdentifier, DashMap<u64, u64>>,
backlog_oldest_timestamp: ValueRecorder<u64>,

/// Records the length of the backlog, i.e., how many packets are pending.
backlog_size: ValueRecorder<u64>,

/// Stores the backlogs for all the paths the relayer is active on.
/// This is a map of multiple inner backlogs, one inner backlog per path.
///
/// Each inner backlog is represented as a [`DashMap`].
/// Each inner backlog captures the sequence numbers & timestamp for all SendPacket events
/// that the relayer observed, and for which there was no associated Acknowledgement or
/// Timeout event.
backlogs: DashMap<PathIdentifier, DashMap<u64, u64>>,
}

impl TelemetryState {
Expand Down Expand Up @@ -398,17 +407,18 @@ impl TelemetryState {
self.cleared_acknowledgment_count.add(1, labels);
}

pub fn record_send_history(
/// Inserts in the backlog a new event for the given sequence number.
/// This happens when the relayer observed a new SendPacket event.
pub fn backlog_insert(
&self,
seq_nr: u64,
_height: u64,
chain_id: &ChainId,
channel_id: &ChannelId,
port_id: &PortId,
counterparty_chain_id: &ChainId,
) {
// Unique Identifier for a chain/channel/port.
let uid: PathIdentifier = PathIdentifier::new(
// Unique identifier for a chain/channel/port.
let path_uid: PathIdentifier = PathIdentifier::new(
chain_id.to_string(),
channel_id.to_string(),
port_id.to_string(),
Expand All @@ -421,53 +431,73 @@ impl TelemetryState {
KeyValue::new("port", port_id.to_string()),
];

// Retrieve timestamp for recieved SendPacket.
// Retrieve local timestamp when this SendPacket event was recorded.
let now = Time::now();
let timestamp = match now.duration_since(Time::unix_epoch()) {
Ok(ts) => ts.as_secs(),
Err(_) => 0,
};

// If there are no HashSet for this uid, create a new one.
// Else update the min value.
if let Some(set) = self.sequences_histories.get(&uid) {
// Avoid having the DashSet growing more than a given threshold, by removing
// Update the backlog with the incoming data and retrieve the oldest values
let (oldest_sn, oldest_ts, total) = if let Some(path_backlog) = self.backlogs.get(&path_uid)
{
// Avoid having the inner backlog map growing more than a given threshold, by removing
// the oldest sequence number entry.
if set.len() > HISTORY_RESET_THRESHOLD {
if let Some(min) = set.iter().map(|v| *v.key()).min() {
set.remove(&min);
if path_backlog.len() > BACKLOG_RESET_THRESHOLD {
if let Some(min) = path_backlog.iter().map(|v| *v.key()).min() {
path_backlog.remove(&min);
}
}
set.insert(seq_nr, timestamp);
// Record the min of the HashSet as the oldest sequence.
if let Some(min) = set.iter().map(|v| *v.key()).min() {
// Updated oldest sequence number and add associated timestamp to labels.
self.oldest_sequence.record(min, labels);
self.oldest_timestamp.record(timestamp, labels);
path_backlog.insert(seq_nr, timestamp);

// Return the oldest event information to be recorded in telemetry
if let Some(min) = path_backlog.iter().map(|v| *v.key()).min() {
if let Some(oldest) = path_backlog.get(&min) {
(min, *oldest.value(), path_backlog.len() as u64)
} else {
// Timestamp was not found, this should not happen, record a 0 ts.
(min, 0, path_backlog.len() as u64)
}
} else {
// We just inserted a new key/value, so this else branch is unlikely to activate,
// but it can happen in case of concurrent updates to the backlog.
(
EMPTY_BACKLOG_SYMBOL,
EMPTY_BACKLOG_SYMBOL,
EMPTY_BACKLOG_SYMBOL,
)
}
} else {
let new_dashmap = DashMap::with_capacity(HISTORY_SET_CAPACITY);
new_dashmap.insert(seq_nr, timestamp);

// Updated oldest sequence number and add associated timestamp to labels.
self.oldest_sequence.record(seq_nr, labels);
self.oldest_timestamp.record(timestamp, labels);
// If there is no inner backlog for this path, create a new map to store it.
let new_path_backlog = DashMap::with_capacity(BACKLOG_CAPACITY);
new_path_backlog.insert(seq_nr, timestamp);
// Record it in the global backlog
self.backlogs.insert(path_uid, new_path_backlog);

// Return the current event information to be recorded in telemetry
(seq_nr, timestamp, 1)
};

self.sequences_histories.insert(uid, new_dashmap);
}
// Update metrics to reflect the new state of the backlog
self.backlog_oldest_sequence.record(oldest_sn, labels);
self.backlog_oldest_timestamp.record(oldest_ts, labels);
self.backlog_size.record(total, labels);
}

pub fn record_ack_history(
/// Evicts from the backlog the event for the given sequence number.
/// Removing events happens when the relayer observed either an acknowledgment
/// or a timeout for a packet sequence number, which means that the corresponding
/// packet was relayed.
pub fn backlog_remove(
&self,
seq_nr: u64,
_height: u64,
chain_id: &ChainId,
channel_id: &ChannelId,
port_id: &PortId,
counterparty_chain_id: &ChainId,
) {
// Unique Identifier for a chain/channel/port.
let uid: PathIdentifier = PathIdentifier::new(
// Unique identifier for a chain/channel/port path.
let path_uid: PathIdentifier = PathIdentifier::new(
chain_id.to_string(),
channel_id.to_string(),
port_id.to_string(),
Expand All @@ -480,23 +510,29 @@ impl TelemetryState {
KeyValue::new("port", port_id.to_string()),
];

// If there are no HashSet for this uid, create a new one.
if let Some(set) = self.sequences_histories.get(&uid) {
match set.remove(&seq_nr) {
if let Some(path_backlog) = self.backlogs.get(&path_uid) {
match path_backlog.remove(&seq_nr) {
Some(_) => {
// Record the min of the HashSet as the oldest sequence.
if let Some(min) = set.iter().map(|v| *v.key()).min() {
if let Some(timestamp) = set.get(&min) {
self.oldest_timestamp.record(*timestamp, labels);
// The oldest pending sequence number is the minimum key in the inner (path) backlog.
if let Some(min_key) = path_backlog.iter().map(|v| *v.key()).min() {
if let Some(oldest) = path_backlog.get(&min_key) {
self.backlog_oldest_timestamp
.record(*oldest.value(), labels);
} else {
self.oldest_timestamp.record(0, labels);
self.backlog_oldest_timestamp.record(0, labels);
}
self.oldest_sequence.record(min, labels);
self.backlog_oldest_sequence.record(min_key, labels);
self.backlog_size.record(path_backlog.len() as u64, labels);
} else {
self.oldest_sequence.record(NO_PENDING_PACKETS, labels);
self.oldest_timestamp.record(0, labels);
// No mimimum found, update the metrics to reflect an empty backlog
self.backlog_oldest_sequence
.record(EMPTY_BACKLOG_SYMBOL, labels);
self.backlog_oldest_timestamp
.record(EMPTY_BACKLOG_SYMBOL, labels);
self.backlog_size.record(EMPTY_BACKLOG_SYMBOL, labels);
}
}
// No change performed to the backlog, no need to update the metrics.
None => {}
}
}
Expand All @@ -516,8 +552,9 @@ impl AggregatorSelector for CustomAggregatorSelector {
fn aggregator_for(&self, descriptor: &Descriptor) -> Option<Arc<dyn Aggregator + Send + Sync>> {
match descriptor.name() {
"wallet_balance" => Some(Arc::new(last_value())),
"oldest_sequence" => Some(Arc::new(last_value())),
"oldest_timestamp" => Some(Arc::new(last_value())),
"backlog_oldest_sequence" => Some(Arc::new(last_value())),
"backlog_oldest_timestamp" => Some(Arc::new(last_value())),
"backlog_size" => Some(Arc::new(last_value())),
// Prometheus' supports only collector for histogram, sum, and last value aggregators.
// https://docs.rs/opentelemetry-prometheus/0.10.0/src/opentelemetry_prometheus/lib.rs.html#411-418
// TODO: Once quantile sketches are supported, replace histograms with that.
Expand Down Expand Up @@ -648,17 +685,22 @@ impl Default for TelemetryState {
.time_to_idle(Duration::from_secs(30 * 60)) // Remove entries if they have been idle for 30 minutes
.build(),

sequences_histories: DashMap::new(),
backlogs: DashMap::new(),

oldest_sequence: meter
.u64_value_recorder("oldest_sequence")
.with_description("The sequence number of the oldest SendPacket event observed without its corresponding WriteAcknowledgement event. If this value is 0, it means Hermes observed a WriteAcknowledgment event for all the SendPacket events")
backlog_oldest_sequence: meter
.u64_value_recorder("backlog_oldest_sequence")
.with_description("Sequence number of the oldest pending packet in the backlog, per channel")
.init(),

oldest_timestamp: meter
.u64_value_recorder("oldest_timestamp")
backlog_oldest_timestamp: meter
.u64_value_recorder("backlog_oldest_timestamp")
.with_unit(Unit::new("seconds"))
.with_description("The timestamp of the oldest sequence number in seconds")
.with_description("Local timestamp for the oldest pending packet in the backlog, per channel")
.init(),

backlog_size: meter
.u64_value_recorder("backlog_size")
.with_description("Total number of pending packets, per channel")
.init(),
}
}
Expand Down

0 comments on commit 3db5ee0

Please sign in to comment.