Skip to content

Commit

Permalink
Push metrics bridge (#19077)
Browse files Browse the repository at this point in the history
## Description 

This adds the ability for a bridge node operator to push metrics to an
external metrics proxy for aggregation. The implementation is mostly
copied from
https://github.com/MystenLabs/sui/blob/main/crates/sui-node/src/metrics.rs

## Test plan 

Tested in testnet

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
johnjmartin authored Sep 6, 2024
1 parent a58ad2e commit eabd773
Show file tree
Hide file tree
Showing 8 changed files with 166 additions and 1 deletion.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/sui-bridge/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ edition = "2021"

[dependencies]
ethers = "2.0"
snap = "1.1.0"
tokio = { workspace = true, features = ["full"] }
sui-types.workspace = true
sui-authority-aggregation.workspace = true
Expand All @@ -25,6 +26,7 @@ mysten-metrics.workspace = true
sui-sdk.workspace = true
sui-keys.workspace = true
sui-config.workspace = true
sui-tls.workspace = true
clap.workspace = true
tracing.workspace = true
bin-version.workspace = true
Expand Down
10 changes: 10 additions & 0 deletions crates/sui-bridge/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,22 @@ pub struct BridgeNodeConfig {
/// Network key used for metrics pushing
#[serde(default = "default_ed25519_key_pair")]
pub metrics_key_pair: NetworkKeyPair,
#[serde(skip_serializing_if = "Option::is_none")]
pub metrics: Option<MetricsConfig>,
}

pub fn default_ed25519_key_pair() -> NetworkKeyPair {
get_key_pair_from_rng(&mut rand::rngs::OsRng).1
}

#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(rename_all = "kebab-case")]
pub struct MetricsConfig {
#[serde(skip_serializing_if = "Option::is_none")]
pub push_interval_seconds: Option<u64>,
pub push_url: String,
}

impl Config for BridgeNodeConfig {}

impl BridgeNodeConfig {
Expand Down
1 change: 1 addition & 0 deletions crates/sui-bridge/src/e2e_tests/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -777,6 +777,7 @@ pub(crate) async fn start_bridge_cluster(
sui_bridge_module_last_processed_event_id_override: None,
},
metrics_key_pair: default_ed25519_key_pair(),
metrics: None,
};
// Spawn bridge node in memory
handles.push(
Expand Down
6 changes: 6 additions & 0 deletions crates/sui-bridge/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::{
path::PathBuf,
};
use sui_bridge::config::BridgeNodeConfig;
use sui_bridge::metrics::start_metrics_push_task;
use sui_bridge::node::run_bridge_node;
use sui_bridge::server::BridgeNodePublicMetadata;
use sui_config::Config;
Expand Down Expand Up @@ -48,6 +49,11 @@ async fn main() -> anyhow::Result<()> {
let metadata =
BridgeNodePublicMetadata::new(VERSION.into(), config.metrics_key_pair.public().clone());

start_metrics_push_task(
&config.metrics,
config.metrics_key_pair.copy(),
registry_service.clone(),
);
Ok(run_bridge_node(config, metadata, prometheus_registry)
.await?
.await?)
Expand Down
142 changes: 141 additions & 1 deletion crates/sui-bridge/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::config::MetricsConfig;
use mysten_metrics::RegistryService;
use prometheus::{
register_histogram_vec_with_registry, register_int_counter_vec_with_registry,
register_int_counter_with_registry, register_int_gauge_vec_with_registry,
register_int_gauge_with_registry, HistogramVec, IntCounter, IntCounterVec, IntGauge,
register_int_gauge_with_registry, Encoder, HistogramVec, IntCounter, IntCounterVec, IntGauge,
IntGaugeVec, Registry,
};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use sui_types::crypto::NetworkKeyPair;
use tracing::error;

const FINE_GRAINED_LATENCY_SEC_BUCKETS: &[f64] = &[
0.001, 0.005, 0.01, 0.05, 0.1, 0.15, 0.2, 0.25, 0.3, 0.35, 0.4, 0.45, 0.5, 0.6, 0.7, 0.8, 0.9,
Expand All @@ -15,6 +20,141 @@ const FINE_GRAINED_LATENCY_SEC_BUCKETS: &[f64] = &[
200., 250., 300., 350., 400.,
];

pub struct MetricsPushClient {
certificate: std::sync::Arc<sui_tls::SelfSignedCertificate>,
client: reqwest::Client,
}

impl MetricsPushClient {
pub fn new(metrics_key: sui_types::crypto::NetworkKeyPair) -> Self {
use fastcrypto::traits::KeyPair;
let certificate = std::sync::Arc::new(sui_tls::SelfSignedCertificate::new(
metrics_key.private(),
sui_tls::SUI_VALIDATOR_SERVER_NAME,
));
let identity = certificate.reqwest_identity();
let client = reqwest::Client::builder()
.identity(identity)
.build()
.unwrap();

Self {
certificate,
client,
}
}

pub fn certificate(&self) -> &sui_tls::SelfSignedCertificate {
&self.certificate
}

pub fn client(&self) -> &reqwest::Client {
&self.client
}
}

/// Starts a task to periodically push metrics to a configured endpoint if a metrics push endpoint
/// is configured.
pub fn start_metrics_push_task(
metrics_config: &Option<MetricsConfig>,
metrics_key_pair: NetworkKeyPair,
registry: RegistryService,
) {
use fastcrypto::traits::KeyPair;

const DEFAULT_METRICS_PUSH_INTERVAL: Duration = Duration::from_secs(60);

let (interval, url) = match metrics_config {
Some(MetricsConfig {
push_interval_seconds,
push_url: url,
}) => {
let interval = push_interval_seconds
.map(Duration::from_secs)
.unwrap_or(DEFAULT_METRICS_PUSH_INTERVAL);
let url = reqwest::Url::parse(url).expect("unable to parse metrics push url");
(interval, url)
}
_ => return,
};

let mut client = MetricsPushClient::new(metrics_key_pair.copy());

// TODO (johnm) split this out into mysten-common
async fn push_metrics(
client: &MetricsPushClient,
url: &reqwest::Url,
registry: &RegistryService,
) -> Result<(), anyhow::Error> {
// now represents a collection timestamp for all of the metrics we send to the proxy
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as i64;

let mut metric_families = registry.gather_all();
for mf in metric_families.iter_mut() {
for m in mf.mut_metric() {
m.set_timestamp_ms(now);
}
}

let mut buf: Vec<u8> = vec![];
let encoder = prometheus::ProtobufEncoder::new();
encoder.encode(&metric_families, &mut buf)?;

let mut s = snap::raw::Encoder::new();
let compressed = s.compress_vec(&buf).map_err(|err| {
error!("unable to snappy encode; {err}");
err
})?;

let response = client
.client()
.post(url.to_owned())
.header(reqwest::header::CONTENT_ENCODING, "snappy")
.header(reqwest::header::CONTENT_TYPE, prometheus::PROTOBUF_FORMAT)
.body(compressed)
.send()
.await?;

if !response.status().is_success() {
let status = response.status();
let body = match response.text().await {
Ok(body) => body,
Err(error) => format!("couldn't decode response body; {error}"),
};
return Err(anyhow::anyhow!(
"metrics push failed: [{}]:{}",
status,
body
));
}

tracing::debug!("successfully pushed metrics to {url}");

Ok(())
}

tokio::spawn(async move {
tracing::info!(push_url =% url, interval =? interval, "Started Metrics Push Service");

let mut interval = tokio::time::interval(interval);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);

loop {
interval.tick().await;

if let Err(error) = push_metrics(&client, &url, &registry).await {
tracing::warn!("unable to push metrics: {error}; new client will be created");
// aggressively recreate our client connection if we hit an error
// since our tick interval is only every min, this should not be racey
client = MetricsPushClient::new(metrics_key_pair.copy());
}
}
});
}

#[derive(Clone, Debug)]
pub struct BridgeMetrics {
pub(crate) err_build_sui_transaction: IntCounter,
Expand Down
3 changes: 3 additions & 0 deletions crates/sui-bridge/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,7 @@ mod tests {
run_client: false,
db_path: None,
metrics_key_pair: default_ed25519_key_pair(),
metrics: None,
};
// Spawn bridge node in memory
let _handle = run_bridge_node(
Expand Down Expand Up @@ -498,6 +499,7 @@ mod tests {
run_client: true,
db_path: Some(db_path),
metrics_key_pair: default_ed25519_key_pair(),
metrics: None,
};
// Spawn bridge node in memory
let _handle = run_bridge_node(
Expand Down Expand Up @@ -573,6 +575,7 @@ mod tests {
run_client: true,
db_path: Some(db_path),
metrics_key_pair: default_ed25519_key_pair(),
metrics: None,
};
// Spawn bridge node in memory
let _handle = run_bridge_node(
Expand Down
1 change: 1 addition & 0 deletions crates/sui-bridge/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ pub fn generate_bridge_node_config_and_write_to_file(
run_client,
db_path: None,
metrics_key_pair: default_ed25519_key_pair(),
metrics: None,
};
if run_client {
config.sui.bridge_client_key_path = Some(PathBuf::from("/path/to/your/bridge_client_key"));
Expand Down

0 comments on commit eabd773

Please sign in to comment.