diff --git a/Cargo.lock b/Cargo.lock index 37646bccaa50e..9336d9ccb9c4f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12889,6 +12889,7 @@ dependencies = [ "serde_json", "serde_with 3.9.0", "shared-crypto", + "snap", "sui-authority-aggregation", "sui-config", "sui-json-rpc-api", @@ -12896,6 +12897,7 @@ dependencies = [ "sui-keys", "sui-sdk 1.34.0", "sui-test-transaction-builder", + "sui-tls", "sui-types", "tap", "telemetry-subscribers", diff --git a/crates/sui-bridge/Cargo.toml b/crates/sui-bridge/Cargo.toml index f17d75df676a9..ff500f371c180 100644 --- a/crates/sui-bridge/Cargo.toml +++ b/crates/sui-bridge/Cargo.toml @@ -8,6 +8,7 @@ edition = "2021" [dependencies] ethers = "2.0" +snap = "1.1.0" tokio = { workspace = true, features = ["full"] } sui-types.workspace = true sui-authority-aggregation.workspace = true @@ -25,6 +26,7 @@ mysten-metrics.workspace = true sui-sdk.workspace = true sui-keys.workspace = true sui-config.workspace = true +sui-tls.workspace = true clap.workspace = true tracing.workspace = true bin-version.workspace = true diff --git a/crates/sui-bridge/src/config.rs b/crates/sui-bridge/src/config.rs index c94a12737acb0..d437409d0797b 100644 --- a/crates/sui-bridge/src/config.rs +++ b/crates/sui-bridge/src/config.rs @@ -117,12 +117,22 @@ pub struct BridgeNodeConfig { /// Network key used for metrics pushing #[serde(default = "default_ed25519_key_pair")] pub metrics_key_pair: NetworkKeyPair, + #[serde(skip_serializing_if = "Option::is_none")] + pub metrics: Option, } pub fn default_ed25519_key_pair() -> NetworkKeyPair { get_key_pair_from_rng(&mut rand::rngs::OsRng).1 } +#[derive(Debug, Clone, Deserialize, Serialize)] +#[serde(rename_all = "kebab-case")] +pub struct MetricsConfig { + #[serde(skip_serializing_if = "Option::is_none")] + pub push_interval_seconds: Option, + pub push_url: String, +} + impl Config for BridgeNodeConfig {} impl BridgeNodeConfig { diff --git a/crates/sui-bridge/src/e2e_tests/test_utils.rs b/crates/sui-bridge/src/e2e_tests/test_utils.rs index 2d3458c44334b..1494fd14e155d 100644 --- a/crates/sui-bridge/src/e2e_tests/test_utils.rs +++ b/crates/sui-bridge/src/e2e_tests/test_utils.rs @@ -777,6 +777,7 @@ pub(crate) async fn start_bridge_cluster( sui_bridge_module_last_processed_event_id_override: None, }, metrics_key_pair: default_ed25519_key_pair(), + metrics: None, }; // Spawn bridge node in memory handles.push( diff --git a/crates/sui-bridge/src/main.rs b/crates/sui-bridge/src/main.rs index 313173ef4d59e..c5501214574b4 100644 --- a/crates/sui-bridge/src/main.rs +++ b/crates/sui-bridge/src/main.rs @@ -9,6 +9,7 @@ use std::{ path::PathBuf, }; use sui_bridge::config::BridgeNodeConfig; +use sui_bridge::metrics::start_metrics_push_task; use sui_bridge::node::run_bridge_node; use sui_bridge::server::BridgeNodePublicMetadata; use sui_config::Config; @@ -48,6 +49,11 @@ async fn main() -> anyhow::Result<()> { let metadata = BridgeNodePublicMetadata::new(VERSION.into(), config.metrics_key_pair.public().clone()); + start_metrics_push_task( + &config.metrics, + config.metrics_key_pair.copy(), + registry_service.clone(), + ); Ok(run_bridge_node(config, metadata, prometheus_registry) .await? .await?) diff --git a/crates/sui-bridge/src/metrics.rs b/crates/sui-bridge/src/metrics.rs index 3f07ed87a9168..2ec82d5723099 100644 --- a/crates/sui-bridge/src/metrics.rs +++ b/crates/sui-bridge/src/metrics.rs @@ -1,12 +1,17 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +use crate::config::MetricsConfig; +use mysten_metrics::RegistryService; use prometheus::{ register_histogram_vec_with_registry, register_int_counter_vec_with_registry, register_int_counter_with_registry, register_int_gauge_vec_with_registry, - register_int_gauge_with_registry, HistogramVec, IntCounter, IntCounterVec, IntGauge, + register_int_gauge_with_registry, Encoder, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry, }; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use sui_types::crypto::NetworkKeyPair; +use tracing::error; const FINE_GRAINED_LATENCY_SEC_BUCKETS: &[f64] = &[ 0.001, 0.005, 0.01, 0.05, 0.1, 0.15, 0.2, 0.25, 0.3, 0.35, 0.4, 0.45, 0.5, 0.6, 0.7, 0.8, 0.9, @@ -15,6 +20,141 @@ const FINE_GRAINED_LATENCY_SEC_BUCKETS: &[f64] = &[ 200., 250., 300., 350., 400., ]; +pub struct MetricsPushClient { + certificate: std::sync::Arc, + client: reqwest::Client, +} + +impl MetricsPushClient { + pub fn new(metrics_key: sui_types::crypto::NetworkKeyPair) -> Self { + use fastcrypto::traits::KeyPair; + let certificate = std::sync::Arc::new(sui_tls::SelfSignedCertificate::new( + metrics_key.private(), + sui_tls::SUI_VALIDATOR_SERVER_NAME, + )); + let identity = certificate.reqwest_identity(); + let client = reqwest::Client::builder() + .identity(identity) + .build() + .unwrap(); + + Self { + certificate, + client, + } + } + + pub fn certificate(&self) -> &sui_tls::SelfSignedCertificate { + &self.certificate + } + + pub fn client(&self) -> &reqwest::Client { + &self.client + } +} + +/// Starts a task to periodically push metrics to a configured endpoint if a metrics push endpoint +/// is configured. +pub fn start_metrics_push_task( + metrics_config: &Option, + metrics_key_pair: NetworkKeyPair, + registry: RegistryService, +) { + use fastcrypto::traits::KeyPair; + + const DEFAULT_METRICS_PUSH_INTERVAL: Duration = Duration::from_secs(60); + + let (interval, url) = match metrics_config { + Some(MetricsConfig { + push_interval_seconds, + push_url: url, + }) => { + let interval = push_interval_seconds + .map(Duration::from_secs) + .unwrap_or(DEFAULT_METRICS_PUSH_INTERVAL); + let url = reqwest::Url::parse(url).expect("unable to parse metrics push url"); + (interval, url) + } + _ => return, + }; + + let mut client = MetricsPushClient::new(metrics_key_pair.copy()); + + // TODO (johnm) split this out into mysten-common + async fn push_metrics( + client: &MetricsPushClient, + url: &reqwest::Url, + registry: &RegistryService, + ) -> Result<(), anyhow::Error> { + // now represents a collection timestamp for all of the metrics we send to the proxy + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() as i64; + + let mut metric_families = registry.gather_all(); + for mf in metric_families.iter_mut() { + for m in mf.mut_metric() { + m.set_timestamp_ms(now); + } + } + + let mut buf: Vec = vec![]; + let encoder = prometheus::ProtobufEncoder::new(); + encoder.encode(&metric_families, &mut buf)?; + + let mut s = snap::raw::Encoder::new(); + let compressed = s.compress_vec(&buf).map_err(|err| { + error!("unable to snappy encode; {err}"); + err + })?; + + let response = client + .client() + .post(url.to_owned()) + .header(reqwest::header::CONTENT_ENCODING, "snappy") + .header(reqwest::header::CONTENT_TYPE, prometheus::PROTOBUF_FORMAT) + .body(compressed) + .send() + .await?; + + if !response.status().is_success() { + let status = response.status(); + let body = match response.text().await { + Ok(body) => body, + Err(error) => format!("couldn't decode response body; {error}"), + }; + return Err(anyhow::anyhow!( + "metrics push failed: [{}]:{}", + status, + body + )); + } + + tracing::debug!("successfully pushed metrics to {url}"); + + Ok(()) + } + + tokio::spawn(async move { + tracing::info!(push_url =% url, interval =? interval, "Started Metrics Push Service"); + + let mut interval = tokio::time::interval(interval); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + loop { + interval.tick().await; + + if let Err(error) = push_metrics(&client, &url, ®istry).await { + tracing::warn!("unable to push metrics: {error}; new client will be created"); + // aggressively recreate our client connection if we hit an error + // since our tick interval is only every min, this should not be racey + client = MetricsPushClient::new(metrics_key_pair.copy()); + } + } + }); +} + #[derive(Clone, Debug)] pub struct BridgeMetrics { pub(crate) err_build_sui_transaction: IntCounter, diff --git a/crates/sui-bridge/src/node.rs b/crates/sui-bridge/src/node.rs index 28f6898cb4429..8393a66029213 100644 --- a/crates/sui-bridge/src/node.rs +++ b/crates/sui-bridge/src/node.rs @@ -434,6 +434,7 @@ mod tests { run_client: false, db_path: None, metrics_key_pair: default_ed25519_key_pair(), + metrics: None, }; // Spawn bridge node in memory let _handle = run_bridge_node( @@ -498,6 +499,7 @@ mod tests { run_client: true, db_path: Some(db_path), metrics_key_pair: default_ed25519_key_pair(), + metrics: None, }; // Spawn bridge node in memory let _handle = run_bridge_node( @@ -573,6 +575,7 @@ mod tests { run_client: true, db_path: Some(db_path), metrics_key_pair: default_ed25519_key_pair(), + metrics: None, }; // Spawn bridge node in memory let _handle = run_bridge_node( diff --git a/crates/sui-bridge/src/utils.rs b/crates/sui-bridge/src/utils.rs index 1d6dc5ba1cdbe..11508554a54e8 100644 --- a/crates/sui-bridge/src/utils.rs +++ b/crates/sui-bridge/src/utils.rs @@ -194,6 +194,7 @@ pub fn generate_bridge_node_config_and_write_to_file( run_client, db_path: None, metrics_key_pair: default_ed25519_key_pair(), + metrics: None, }; if run_client { config.sui.bridge_client_key_path = Some(PathBuf::from("/path/to/your/bridge_client_key"));