From eabd7736c99cd0c5ad672bcacbd49784f9690c37 Mon Sep 17 00:00:00 2001 From: John Martin Date: Fri, 6 Sep 2024 08:59:24 -0700 Subject: [PATCH] Push metrics bridge (#19077) ## Description This adds the ability for a bridge node operator to push metrics to an external metrics proxy for aggregation. The implementation is mostly copied from https://github.com/MystenLabs/sui/blob/main/crates/sui-node/src/metrics.rs ## Test plan Tested in testnet --- ## Release notes Check each box that your changes affect. If none of the boxes relate to your changes, release notes aren't required. For each box you select, include information after the relevant heading that describes the impact of your changes that a user might notice and any actions they must take to implement updates. - [ ] Protocol: - [ ] Nodes (Validators and Full nodes): - [ ] Indexer: - [ ] JSON-RPC: - [ ] GraphQL: - [ ] CLI: - [ ] Rust SDK: - [ ] REST API: --- Cargo.lock | 2 + crates/sui-bridge/Cargo.toml | 2 + crates/sui-bridge/src/config.rs | 10 ++ crates/sui-bridge/src/e2e_tests/test_utils.rs | 1 + crates/sui-bridge/src/main.rs | 6 + crates/sui-bridge/src/metrics.rs | 142 +++++++++++++++++- crates/sui-bridge/src/node.rs | 3 + crates/sui-bridge/src/utils.rs | 1 + 8 files changed, 166 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 37646bccaa50e..9336d9ccb9c4f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12889,6 +12889,7 @@ dependencies = [ "serde_json", "serde_with 3.9.0", "shared-crypto", + "snap", "sui-authority-aggregation", "sui-config", "sui-json-rpc-api", @@ -12896,6 +12897,7 @@ dependencies = [ "sui-keys", "sui-sdk 1.34.0", "sui-test-transaction-builder", + "sui-tls", "sui-types", "tap", "telemetry-subscribers", diff --git a/crates/sui-bridge/Cargo.toml b/crates/sui-bridge/Cargo.toml index f17d75df676a9..ff500f371c180 100644 --- a/crates/sui-bridge/Cargo.toml +++ b/crates/sui-bridge/Cargo.toml @@ -8,6 +8,7 @@ edition = "2021" [dependencies] ethers = "2.0" +snap = "1.1.0" tokio = { workspace = true, features = ["full"] } sui-types.workspace = true sui-authority-aggregation.workspace = true @@ -25,6 +26,7 @@ mysten-metrics.workspace = true sui-sdk.workspace = true sui-keys.workspace = true sui-config.workspace = true +sui-tls.workspace = true clap.workspace = true tracing.workspace = true bin-version.workspace = true diff --git a/crates/sui-bridge/src/config.rs b/crates/sui-bridge/src/config.rs index c94a12737acb0..d437409d0797b 100644 --- a/crates/sui-bridge/src/config.rs +++ b/crates/sui-bridge/src/config.rs @@ -117,12 +117,22 @@ pub struct BridgeNodeConfig { /// Network key used for metrics pushing #[serde(default = "default_ed25519_key_pair")] pub metrics_key_pair: NetworkKeyPair, + #[serde(skip_serializing_if = "Option::is_none")] + pub metrics: Option, } pub fn default_ed25519_key_pair() -> NetworkKeyPair { get_key_pair_from_rng(&mut rand::rngs::OsRng).1 } +#[derive(Debug, Clone, Deserialize, Serialize)] +#[serde(rename_all = "kebab-case")] +pub struct MetricsConfig { + #[serde(skip_serializing_if = "Option::is_none")] + pub push_interval_seconds: Option, + pub push_url: String, +} + impl Config for BridgeNodeConfig {} impl BridgeNodeConfig { diff --git a/crates/sui-bridge/src/e2e_tests/test_utils.rs b/crates/sui-bridge/src/e2e_tests/test_utils.rs index 2d3458c44334b..1494fd14e155d 100644 --- a/crates/sui-bridge/src/e2e_tests/test_utils.rs +++ b/crates/sui-bridge/src/e2e_tests/test_utils.rs @@ -777,6 +777,7 @@ pub(crate) async fn start_bridge_cluster( sui_bridge_module_last_processed_event_id_override: None, }, metrics_key_pair: default_ed25519_key_pair(), + metrics: None, }; // Spawn bridge node in memory handles.push( diff --git a/crates/sui-bridge/src/main.rs b/crates/sui-bridge/src/main.rs index 313173ef4d59e..c5501214574b4 100644 --- a/crates/sui-bridge/src/main.rs +++ b/crates/sui-bridge/src/main.rs @@ -9,6 +9,7 @@ use std::{ path::PathBuf, }; use sui_bridge::config::BridgeNodeConfig; +use sui_bridge::metrics::start_metrics_push_task; use sui_bridge::node::run_bridge_node; use sui_bridge::server::BridgeNodePublicMetadata; use sui_config::Config; @@ -48,6 +49,11 @@ async fn main() -> anyhow::Result<()> { let metadata = BridgeNodePublicMetadata::new(VERSION.into(), config.metrics_key_pair.public().clone()); + start_metrics_push_task( + &config.metrics, + config.metrics_key_pair.copy(), + registry_service.clone(), + ); Ok(run_bridge_node(config, metadata, prometheus_registry) .await? .await?) diff --git a/crates/sui-bridge/src/metrics.rs b/crates/sui-bridge/src/metrics.rs index 3f07ed87a9168..2ec82d5723099 100644 --- a/crates/sui-bridge/src/metrics.rs +++ b/crates/sui-bridge/src/metrics.rs @@ -1,12 +1,17 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +use crate::config::MetricsConfig; +use mysten_metrics::RegistryService; use prometheus::{ register_histogram_vec_with_registry, register_int_counter_vec_with_registry, register_int_counter_with_registry, register_int_gauge_vec_with_registry, - register_int_gauge_with_registry, HistogramVec, IntCounter, IntCounterVec, IntGauge, + register_int_gauge_with_registry, Encoder, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry, }; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use sui_types::crypto::NetworkKeyPair; +use tracing::error; const FINE_GRAINED_LATENCY_SEC_BUCKETS: &[f64] = &[ 0.001, 0.005, 0.01, 0.05, 0.1, 0.15, 0.2, 0.25, 0.3, 0.35, 0.4, 0.45, 0.5, 0.6, 0.7, 0.8, 0.9, @@ -15,6 +20,141 @@ const FINE_GRAINED_LATENCY_SEC_BUCKETS: &[f64] = &[ 200., 250., 300., 350., 400., ]; +pub struct MetricsPushClient { + certificate: std::sync::Arc, + client: reqwest::Client, +} + +impl MetricsPushClient { + pub fn new(metrics_key: sui_types::crypto::NetworkKeyPair) -> Self { + use fastcrypto::traits::KeyPair; + let certificate = std::sync::Arc::new(sui_tls::SelfSignedCertificate::new( + metrics_key.private(), + sui_tls::SUI_VALIDATOR_SERVER_NAME, + )); + let identity = certificate.reqwest_identity(); + let client = reqwest::Client::builder() + .identity(identity) + .build() + .unwrap(); + + Self { + certificate, + client, + } + } + + pub fn certificate(&self) -> &sui_tls::SelfSignedCertificate { + &self.certificate + } + + pub fn client(&self) -> &reqwest::Client { + &self.client + } +} + +/// Starts a task to periodically push metrics to a configured endpoint if a metrics push endpoint +/// is configured. +pub fn start_metrics_push_task( + metrics_config: &Option, + metrics_key_pair: NetworkKeyPair, + registry: RegistryService, +) { + use fastcrypto::traits::KeyPair; + + const DEFAULT_METRICS_PUSH_INTERVAL: Duration = Duration::from_secs(60); + + let (interval, url) = match metrics_config { + Some(MetricsConfig { + push_interval_seconds, + push_url: url, + }) => { + let interval = push_interval_seconds + .map(Duration::from_secs) + .unwrap_or(DEFAULT_METRICS_PUSH_INTERVAL); + let url = reqwest::Url::parse(url).expect("unable to parse metrics push url"); + (interval, url) + } + _ => return, + }; + + let mut client = MetricsPushClient::new(metrics_key_pair.copy()); + + // TODO (johnm) split this out into mysten-common + async fn push_metrics( + client: &MetricsPushClient, + url: &reqwest::Url, + registry: &RegistryService, + ) -> Result<(), anyhow::Error> { + // now represents a collection timestamp for all of the metrics we send to the proxy + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() as i64; + + let mut metric_families = registry.gather_all(); + for mf in metric_families.iter_mut() { + for m in mf.mut_metric() { + m.set_timestamp_ms(now); + } + } + + let mut buf: Vec = vec![]; + let encoder = prometheus::ProtobufEncoder::new(); + encoder.encode(&metric_families, &mut buf)?; + + let mut s = snap::raw::Encoder::new(); + let compressed = s.compress_vec(&buf).map_err(|err| { + error!("unable to snappy encode; {err}"); + err + })?; + + let response = client + .client() + .post(url.to_owned()) + .header(reqwest::header::CONTENT_ENCODING, "snappy") + .header(reqwest::header::CONTENT_TYPE, prometheus::PROTOBUF_FORMAT) + .body(compressed) + .send() + .await?; + + if !response.status().is_success() { + let status = response.status(); + let body = match response.text().await { + Ok(body) => body, + Err(error) => format!("couldn't decode response body; {error}"), + }; + return Err(anyhow::anyhow!( + "metrics push failed: [{}]:{}", + status, + body + )); + } + + tracing::debug!("successfully pushed metrics to {url}"); + + Ok(()) + } + + tokio::spawn(async move { + tracing::info!(push_url =% url, interval =? interval, "Started Metrics Push Service"); + + let mut interval = tokio::time::interval(interval); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + loop { + interval.tick().await; + + if let Err(error) = push_metrics(&client, &url, ®istry).await { + tracing::warn!("unable to push metrics: {error}; new client will be created"); + // aggressively recreate our client connection if we hit an error + // since our tick interval is only every min, this should not be racey + client = MetricsPushClient::new(metrics_key_pair.copy()); + } + } + }); +} + #[derive(Clone, Debug)] pub struct BridgeMetrics { pub(crate) err_build_sui_transaction: IntCounter, diff --git a/crates/sui-bridge/src/node.rs b/crates/sui-bridge/src/node.rs index 28f6898cb4429..8393a66029213 100644 --- a/crates/sui-bridge/src/node.rs +++ b/crates/sui-bridge/src/node.rs @@ -434,6 +434,7 @@ mod tests { run_client: false, db_path: None, metrics_key_pair: default_ed25519_key_pair(), + metrics: None, }; // Spawn bridge node in memory let _handle = run_bridge_node( @@ -498,6 +499,7 @@ mod tests { run_client: true, db_path: Some(db_path), metrics_key_pair: default_ed25519_key_pair(), + metrics: None, }; // Spawn bridge node in memory let _handle = run_bridge_node( @@ -573,6 +575,7 @@ mod tests { run_client: true, db_path: Some(db_path), metrics_key_pair: default_ed25519_key_pair(), + metrics: None, }; // Spawn bridge node in memory let _handle = run_bridge_node( diff --git a/crates/sui-bridge/src/utils.rs b/crates/sui-bridge/src/utils.rs index 1d6dc5ba1cdbe..11508554a54e8 100644 --- a/crates/sui-bridge/src/utils.rs +++ b/crates/sui-bridge/src/utils.rs @@ -194,6 +194,7 @@ pub fn generate_bridge_node_config_and_write_to_file( run_client, db_path: None, metrics_key_pair: default_ed25519_key_pair(), + metrics: None, }; if run_client { config.sui.bridge_client_key_path = Some(PathBuf::from("/path/to/your/bridge_client_key"));