diff --git a/libs/remote_storage/src/azure_blob.rs b/libs/remote_storage/src/azure_blob.rs index 24c1248304e1..aca22c6b3eef 100644 --- a/libs/remote_storage/src/azure_blob.rs +++ b/libs/remote_storage/src/azure_blob.rs @@ -26,13 +26,14 @@ use futures::stream::Stream; use futures_util::StreamExt; use futures_util::TryStreamExt; use http_types::{StatusCode, Url}; +use scopeguard::ScopeGuard; use tokio_util::sync::CancellationToken; use tracing::debug; +use crate::metrics::{start_measuring_requests, AttemptOutcome, RequestKind}; use crate::{ - error::Cancelled, s3_bucket::RequestKind, AzureConfig, ConcurrencyLimiter, Download, - DownloadError, Listing, ListingMode, RemotePath, RemoteStorage, StorageMetadata, - TimeTravelError, TimeoutOrCancel, + error::Cancelled, AzureConfig, ConcurrencyLimiter, Download, DownloadError, Listing, + ListingMode, RemotePath, RemoteStorage, StorageMetadata, TimeTravelError, TimeoutOrCancel, }; pub struct AzureBlobStorage { @@ -137,6 +138,8 @@ impl AzureBlobStorage { let mut last_modified = None; let mut metadata = HashMap::new(); + let started_at = start_measuring_requests(kind); + let download = async { let response = builder // convert to concrete Pageable @@ -200,13 +203,22 @@ impl AzureBlobStorage { }) }; - tokio::select! { + let download = tokio::select! { bufs = download => bufs, cancel_or_timeout = cancel_or_timeout => match cancel_or_timeout { - TimeoutOrCancel::Timeout => Err(DownloadError::Timeout), - TimeoutOrCancel::Cancel => Err(DownloadError::Cancelled), + TimeoutOrCancel::Timeout => return Err(DownloadError::Timeout), + TimeoutOrCancel::Cancel => return Err(DownloadError::Cancelled), }, - } + }; + let started_at = ScopeGuard::into_inner(started_at); + let outcome = match &download { + Ok(_) => AttemptOutcome::Ok, + Err(_) => AttemptOutcome::Err, + }; + crate::metrics::BUCKET_METRICS + .req_seconds + .observe_elapsed(kind, outcome, started_at); + download } async fn permit( @@ -340,7 +352,10 @@ impl RemoteStorage for AzureBlobStorage { metadata: Option, cancel: &CancellationToken, ) -> anyhow::Result<()> { - let _permit = self.permit(RequestKind::Put, cancel).await?; + let kind = RequestKind::Put; + let _permit = self.permit(kind, cancel).await?; + + let started_at = start_measuring_requests(kind); let op = async { let blob_client = self.client.blob_client(self.relative_path_to_name(to)); @@ -364,14 +379,25 @@ impl RemoteStorage for AzureBlobStorage { match fut.await { Ok(Ok(_response)) => Ok(()), Ok(Err(azure)) => Err(azure.into()), - Err(_timeout) => Err(TimeoutOrCancel::Cancel.into()), + Err(_timeout) => Err(TimeoutOrCancel::Timeout.into()), } }; - tokio::select! { + let res = tokio::select! { res = op => res, - _ = cancel.cancelled() => Err(TimeoutOrCancel::Cancel.into()), - } + _ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()), + }; + + let outcome = match res { + Ok(_) => AttemptOutcome::Ok, + Err(_) => AttemptOutcome::Err, + }; + let started_at = ScopeGuard::into_inner(started_at); + crate::metrics::BUCKET_METRICS + .req_seconds + .observe_elapsed(kind, outcome, started_at); + + res } async fn download( @@ -417,12 +443,13 @@ impl RemoteStorage for AzureBlobStorage { paths: &'a [RemotePath], cancel: &CancellationToken, ) -> anyhow::Result<()> { - let _permit = self.permit(RequestKind::Delete, cancel).await?; + let kind = RequestKind::Delete; + let _permit = self.permit(kind, cancel).await?; + let started_at = start_measuring_requests(kind); let op = async { - // TODO batch requests are also not supported by the SDK + // TODO batch requests are not supported by the SDK // https://github.com/Azure/azure-sdk-for-rust/issues/1068 - // https://github.com/Azure/azure-sdk-for-rust/issues/1249 for path in paths { let blob_client = self.client.blob_client(self.relative_path_to_name(path)); @@ -447,10 +474,16 @@ impl RemoteStorage for AzureBlobStorage { Ok(()) }; - tokio::select! { + let res = tokio::select! { res = op => res, - _ = cancel.cancelled() => Err(TimeoutOrCancel::Cancel.into()), - } + _ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()), + }; + + let started_at = ScopeGuard::into_inner(started_at); + crate::metrics::BUCKET_METRICS + .req_seconds + .observe_elapsed(kind, &res, started_at); + res } async fn copy( @@ -459,7 +492,9 @@ impl RemoteStorage for AzureBlobStorage { to: &RemotePath, cancel: &CancellationToken, ) -> anyhow::Result<()> { - let _permit = self.permit(RequestKind::Copy, cancel).await?; + let kind = RequestKind::Copy; + let _permit = self.permit(kind, cancel).await?; + let started_at = start_measuring_requests(kind); let timeout = tokio::time::sleep(self.timeout); @@ -503,15 +538,21 @@ impl RemoteStorage for AzureBlobStorage { } }; - tokio::select! { + let res = tokio::select! { res = op => res, - _ = cancel.cancelled() => Err(anyhow::Error::new(TimeoutOrCancel::Cancel)), + _ = cancel.cancelled() => return Err(anyhow::Error::new(TimeoutOrCancel::Cancel)), _ = timeout => { let e = anyhow::Error::new(TimeoutOrCancel::Timeout); let e = e.context(format!("Timeout, last status: {copy_status:?}")); Err(e) }, - } + }; + + let started_at = ScopeGuard::into_inner(started_at); + crate::metrics::BUCKET_METRICS + .req_seconds + .observe_elapsed(kind, &res, started_at); + res } async fn time_travel_recover( diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index cb3df0985d61..8c984abed201 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -12,6 +12,7 @@ mod azure_blob; mod error; mod local_fs; +mod metrics; mod s3_bucket; mod simulate_failures; mod support; diff --git a/libs/remote_storage/src/s3_bucket/metrics.rs b/libs/remote_storage/src/metrics.rs similarity index 76% rename from libs/remote_storage/src/s3_bucket/metrics.rs rename to libs/remote_storage/src/metrics.rs index beca75592052..bbb51590f344 100644 --- a/libs/remote_storage/src/s3_bucket/metrics.rs +++ b/libs/remote_storage/src/metrics.rs @@ -15,6 +15,7 @@ pub(crate) enum RequestKind { TimeTravel = 5, } +use scopeguard::ScopeGuard; use RequestKind::*; impl RequestKind { @@ -33,10 +34,10 @@ impl RequestKind { } } -pub(super) struct RequestTyped([C; 6]); +pub(crate) struct RequestTyped([C; 6]); impl RequestTyped { - pub(super) fn get(&self, kind: RequestKind) -> &C { + pub(crate) fn get(&self, kind: RequestKind) -> &C { &self.0[kind.as_index()] } @@ -58,19 +59,19 @@ impl RequestTyped { } impl RequestTyped { - pub(super) fn observe_elapsed(&self, kind: RequestKind, started_at: std::time::Instant) { + pub(crate) fn observe_elapsed(&self, kind: RequestKind, started_at: std::time::Instant) { self.get(kind).observe(started_at.elapsed().as_secs_f64()) } } -pub(super) struct PassFailCancelledRequestTyped { +pub(crate) struct PassFailCancelledRequestTyped { success: RequestTyped, fail: RequestTyped, cancelled: RequestTyped, } #[derive(Debug, Clone, Copy)] -pub(super) enum AttemptOutcome { +pub(crate) enum AttemptOutcome { Ok, Err, Cancelled, @@ -86,7 +87,7 @@ impl From<&Result> for AttemptOutcome { } impl AttemptOutcome { - pub(super) fn as_str(&self) -> &'static str { + pub(crate) fn as_str(&self) -> &'static str { match self { AttemptOutcome::Ok => "ok", AttemptOutcome::Err => "err", @@ -96,7 +97,7 @@ impl AttemptOutcome { } impl PassFailCancelledRequestTyped { - pub(super) fn get(&self, kind: RequestKind, outcome: AttemptOutcome) -> &C { + pub(crate) fn get(&self, kind: RequestKind, outcome: AttemptOutcome) -> &C { let target = match outcome { AttemptOutcome::Ok => &self.success, AttemptOutcome::Err => &self.fail, @@ -119,7 +120,7 @@ impl PassFailCancelledRequestTyped { } impl PassFailCancelledRequestTyped { - pub(super) fn observe_elapsed( + pub(crate) fn observe_elapsed( &self, kind: RequestKind, outcome: impl Into, @@ -130,19 +131,44 @@ impl PassFailCancelledRequestTyped { } } -pub(super) struct BucketMetrics { +/// On drop (cancellation) count towards [`BucketMetrics::cancelled_waits`]. +pub(crate) fn start_counting_cancelled_wait( + kind: RequestKind, +) -> ScopeGuard { + scopeguard::guard_on_success(std::time::Instant::now(), move |_| { + crate::metrics::BUCKET_METRICS + .cancelled_waits + .get(kind) + .inc() + }) +} + +/// On drop (cancellation) add time to [`BucketMetrics::req_seconds`]. +pub(crate) fn start_measuring_requests( + kind: RequestKind, +) -> ScopeGuard { + scopeguard::guard_on_success(std::time::Instant::now(), move |started_at| { + crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed( + kind, + AttemptOutcome::Cancelled, + started_at, + ) + }) +} + +pub(crate) struct BucketMetrics { /// Full request duration until successful completion, error or cancellation. - pub(super) req_seconds: PassFailCancelledRequestTyped, + pub(crate) req_seconds: PassFailCancelledRequestTyped, /// Total amount of seconds waited on queue. - pub(super) wait_seconds: RequestTyped, + pub(crate) wait_seconds: RequestTyped, /// Track how many semaphore awaits were cancelled per request type. /// /// This is in case cancellations are happening more than expected. - pub(super) cancelled_waits: RequestTyped, + pub(crate) cancelled_waits: RequestTyped, /// Total amount of deleted objects in batches or single requests. - pub(super) deleted_objects_total: IntCounter, + pub(crate) deleted_objects_total: IntCounter, } impl Default for BucketMetrics { diff --git a/libs/remote_storage/src/s3_bucket.rs b/libs/remote_storage/src/s3_bucket.rs index c3d6c75e20da..76cf3eac80ef 100644 --- a/libs/remote_storage/src/s3_bucket.rs +++ b/libs/remote_storage/src/s3_bucket.rs @@ -46,15 +46,16 @@ use utils::backoff; use super::StorageMetadata; use crate::{ - error::Cancelled, support::PermitCarrying, ConcurrencyLimiter, Download, DownloadError, - Listing, ListingMode, RemotePath, RemoteStorage, S3Config, TimeTravelError, TimeoutOrCancel, - MAX_KEYS_PER_DELETE, REMOTE_STORAGE_PREFIX_SEPARATOR, + error::Cancelled, + metrics::{start_counting_cancelled_wait, start_measuring_requests}, + support::PermitCarrying, + ConcurrencyLimiter, Download, DownloadError, Listing, ListingMode, RemotePath, RemoteStorage, + S3Config, TimeTravelError, TimeoutOrCancel, MAX_KEYS_PER_DELETE, + REMOTE_STORAGE_PREFIX_SEPARATOR, }; -pub(super) mod metrics; - -use self::metrics::AttemptOutcome; -pub(super) use self::metrics::RequestKind; +use crate::metrics::AttemptOutcome; +pub(super) use crate::metrics::RequestKind; /// AWS S3 storage. pub struct S3Bucket { @@ -227,7 +228,7 @@ impl S3Bucket { }; let started_at = ScopeGuard::into_inner(started_at); - metrics::BUCKET_METRICS + crate::metrics::BUCKET_METRICS .wait_seconds .observe_elapsed(kind, started_at); @@ -248,7 +249,7 @@ impl S3Bucket { }; let started_at = ScopeGuard::into_inner(started_at); - metrics::BUCKET_METRICS + crate::metrics::BUCKET_METRICS .wait_seconds .observe_elapsed(kind, started_at); Ok(permit) @@ -287,7 +288,7 @@ impl S3Bucket { // Count this in the AttemptOutcome::Ok bucket, because 404 is not // an error: we expect to sometimes fetch an object and find it missing, // e.g. when probing for timeline indices. - metrics::BUCKET_METRICS.req_seconds.observe_elapsed( + crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed( kind, AttemptOutcome::Ok, started_at, @@ -295,7 +296,7 @@ impl S3Bucket { return Err(DownloadError::NotFound); } Err(e) => { - metrics::BUCKET_METRICS.req_seconds.observe_elapsed( + crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed( kind, AttemptOutcome::Err, started_at, @@ -371,12 +372,12 @@ impl S3Bucket { }; let started_at = ScopeGuard::into_inner(started_at); - metrics::BUCKET_METRICS + crate::metrics::BUCKET_METRICS .req_seconds .observe_elapsed(kind, &resp, started_at); let resp = resp.context("request deletion")?; - metrics::BUCKET_METRICS + crate::metrics::BUCKET_METRICS .deleted_objects_total .inc_by(chunk.len() as u64); @@ -435,14 +436,14 @@ pin_project_lite::pin_project! { /// Times and tracks the outcome of the request. struct TimedDownload { started_at: std::time::Instant, - outcome: metrics::AttemptOutcome, + outcome: AttemptOutcome, #[pin] inner: S } impl PinnedDrop for TimedDownload { fn drop(mut this: Pin<&mut Self>) { - metrics::BUCKET_METRICS.req_seconds.observe_elapsed(RequestKind::Get, this.outcome, this.started_at); + crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(RequestKind::Get, this.outcome, this.started_at); } } } @@ -451,7 +452,7 @@ impl TimedDownload { fn new(started_at: std::time::Instant, inner: S) -> Self { TimedDownload { started_at, - outcome: metrics::AttemptOutcome::Cancelled, + outcome: AttemptOutcome::Cancelled, inner, } } @@ -468,8 +469,8 @@ impl>> Stream for TimedDownload { let res = ready!(this.inner.poll_next(cx)); match &res { Some(Ok(_)) => {} - Some(Err(_)) => *this.outcome = metrics::AttemptOutcome::Err, - None => *this.outcome = metrics::AttemptOutcome::Ok, + Some(Err(_)) => *this.outcome = AttemptOutcome::Err, + None => *this.outcome = AttemptOutcome::Ok, } Poll::Ready(res) @@ -543,7 +544,7 @@ impl RemoteStorage for S3Bucket { let started_at = ScopeGuard::into_inner(started_at); - metrics::BUCKET_METRICS + crate::metrics::BUCKET_METRICS .req_seconds .observe_elapsed(kind, &response, started_at); @@ -625,7 +626,7 @@ impl RemoteStorage for S3Bucket { if let Ok(inner) = &res { // do not incl. timeouts as errors in metrics but cancellations let started_at = ScopeGuard::into_inner(started_at); - metrics::BUCKET_METRICS + crate::metrics::BUCKET_METRICS .req_seconds .observe_elapsed(kind, inner, started_at); } @@ -673,7 +674,7 @@ impl RemoteStorage for S3Bucket { }; let started_at = ScopeGuard::into_inner(started_at); - metrics::BUCKET_METRICS + crate::metrics::BUCKET_METRICS .req_seconds .observe_elapsed(kind, &res, started_at); @@ -977,28 +978,6 @@ impl RemoteStorage for S3Bucket { } } -/// On drop (cancellation) count towards [`metrics::BucketMetrics::cancelled_waits`]. -fn start_counting_cancelled_wait( - kind: RequestKind, -) -> ScopeGuard { - scopeguard::guard_on_success(std::time::Instant::now(), move |_| { - metrics::BUCKET_METRICS.cancelled_waits.get(kind).inc() - }) -} - -/// On drop (cancellation) add time to [`metrics::BucketMetrics::req_seconds`]. -fn start_measuring_requests( - kind: RequestKind, -) -> ScopeGuard { - scopeguard::guard_on_success(std::time::Instant::now(), move |started_at| { - metrics::BUCKET_METRICS.req_seconds.observe_elapsed( - kind, - AttemptOutcome::Cancelled, - started_at, - ) - }) -} - // Save RAM and only store the needed data instead of the entire ObjectVersion/DeleteMarkerEntry struct VerOrDelete { kind: VerOrDeleteKind,