Skip to content

Commit

Permalink
Add metrics for Azure blob storage (#7933)
Browse files Browse the repository at this point in the history
In issue #5590 it was proposed to implement metrics for Azure blob
storage. This PR implements them except for the part that performs the
rename, which is left for a followup.

Closes #5590
  • Loading branch information
arpad-m committed Jun 2, 2024
1 parent a345cf3 commit db477c0
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 78 deletions.
85 changes: 63 additions & 22 deletions libs/remote_storage/src/azure_blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,14 @@ use futures::stream::Stream;
use futures_util::StreamExt;
use futures_util::TryStreamExt;
use http_types::{StatusCode, Url};
use scopeguard::ScopeGuard;
use tokio_util::sync::CancellationToken;
use tracing::debug;

use crate::metrics::{start_measuring_requests, AttemptOutcome, RequestKind};
use crate::{
error::Cancelled, s3_bucket::RequestKind, AzureConfig, ConcurrencyLimiter, Download,
DownloadError, Listing, ListingMode, RemotePath, RemoteStorage, StorageMetadata,
TimeTravelError, TimeoutOrCancel,
error::Cancelled, AzureConfig, ConcurrencyLimiter, Download, DownloadError, Listing,
ListingMode, RemotePath, RemoteStorage, StorageMetadata, TimeTravelError, TimeoutOrCancel,
};

pub struct AzureBlobStorage {
Expand Down Expand Up @@ -137,6 +138,8 @@ impl AzureBlobStorage {
let mut last_modified = None;
let mut metadata = HashMap::new();

let started_at = start_measuring_requests(kind);

let download = async {
let response = builder
// convert to concrete Pageable
Expand Down Expand Up @@ -200,13 +203,22 @@ impl AzureBlobStorage {
})
};

tokio::select! {
let download = tokio::select! {
bufs = download => bufs,
cancel_or_timeout = cancel_or_timeout => match cancel_or_timeout {
TimeoutOrCancel::Timeout => Err(DownloadError::Timeout),
TimeoutOrCancel::Cancel => Err(DownloadError::Cancelled),
TimeoutOrCancel::Timeout => return Err(DownloadError::Timeout),
TimeoutOrCancel::Cancel => return Err(DownloadError::Cancelled),
},
}
};
let started_at = ScopeGuard::into_inner(started_at);
let outcome = match &download {
Ok(_) => AttemptOutcome::Ok,
Err(_) => AttemptOutcome::Err,
};
crate::metrics::BUCKET_METRICS
.req_seconds
.observe_elapsed(kind, outcome, started_at);
download
}

async fn permit(
Expand Down Expand Up @@ -340,7 +352,10 @@ impl RemoteStorage for AzureBlobStorage {
metadata: Option<StorageMetadata>,
cancel: &CancellationToken,
) -> anyhow::Result<()> {
let _permit = self.permit(RequestKind::Put, cancel).await?;
let kind = RequestKind::Put;
let _permit = self.permit(kind, cancel).await?;

let started_at = start_measuring_requests(kind);

let op = async {
let blob_client = self.client.blob_client(self.relative_path_to_name(to));
Expand All @@ -364,14 +379,25 @@ impl RemoteStorage for AzureBlobStorage {
match fut.await {
Ok(Ok(_response)) => Ok(()),
Ok(Err(azure)) => Err(azure.into()),
Err(_timeout) => Err(TimeoutOrCancel::Cancel.into()),
Err(_timeout) => Err(TimeoutOrCancel::Timeout.into()),
}
};

tokio::select! {
let res = tokio::select! {
res = op => res,
_ = cancel.cancelled() => Err(TimeoutOrCancel::Cancel.into()),
}
_ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
};

let outcome = match res {
Ok(_) => AttemptOutcome::Ok,
Err(_) => AttemptOutcome::Err,
};
let started_at = ScopeGuard::into_inner(started_at);
crate::metrics::BUCKET_METRICS
.req_seconds
.observe_elapsed(kind, outcome, started_at);

res
}

async fn download(
Expand Down Expand Up @@ -417,12 +443,13 @@ impl RemoteStorage for AzureBlobStorage {
paths: &'a [RemotePath],
cancel: &CancellationToken,
) -> anyhow::Result<()> {
let _permit = self.permit(RequestKind::Delete, cancel).await?;
let kind = RequestKind::Delete;
let _permit = self.permit(kind, cancel).await?;
let started_at = start_measuring_requests(kind);

let op = async {
// TODO batch requests are also not supported by the SDK
// TODO batch requests are not supported by the SDK
// https://github.com/Azure/azure-sdk-for-rust/issues/1068
// https://github.com/Azure/azure-sdk-for-rust/issues/1249
for path in paths {
let blob_client = self.client.blob_client(self.relative_path_to_name(path));

Expand All @@ -447,10 +474,16 @@ impl RemoteStorage for AzureBlobStorage {
Ok(())
};

tokio::select! {
let res = tokio::select! {
res = op => res,
_ = cancel.cancelled() => Err(TimeoutOrCancel::Cancel.into()),
}
_ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
};

let started_at = ScopeGuard::into_inner(started_at);
crate::metrics::BUCKET_METRICS
.req_seconds
.observe_elapsed(kind, &res, started_at);
res
}

async fn copy(
Expand All @@ -459,7 +492,9 @@ impl RemoteStorage for AzureBlobStorage {
to: &RemotePath,
cancel: &CancellationToken,
) -> anyhow::Result<()> {
let _permit = self.permit(RequestKind::Copy, cancel).await?;
let kind = RequestKind::Copy;
let _permit = self.permit(kind, cancel).await?;
let started_at = start_measuring_requests(kind);

let timeout = tokio::time::sleep(self.timeout);

Expand Down Expand Up @@ -503,15 +538,21 @@ impl RemoteStorage for AzureBlobStorage {
}
};

tokio::select! {
let res = tokio::select! {
res = op => res,
_ = cancel.cancelled() => Err(anyhow::Error::new(TimeoutOrCancel::Cancel)),
_ = cancel.cancelled() => return Err(anyhow::Error::new(TimeoutOrCancel::Cancel)),
_ = timeout => {
let e = anyhow::Error::new(TimeoutOrCancel::Timeout);
let e = e.context(format!("Timeout, last status: {copy_status:?}"));
Err(e)
},
}
};

let started_at = ScopeGuard::into_inner(started_at);
crate::metrics::BUCKET_METRICS
.req_seconds
.observe_elapsed(kind, &res, started_at);
res
}

async fn time_travel_recover(
Expand Down
1 change: 1 addition & 0 deletions libs/remote_storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
mod azure_blob;
mod error;
mod local_fs;
mod metrics;
mod s3_bucket;
mod simulate_failures;
mod support;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pub(crate) enum RequestKind {
TimeTravel = 5,
}

use scopeguard::ScopeGuard;
use RequestKind::*;

impl RequestKind {
Expand All @@ -33,10 +34,10 @@ impl RequestKind {
}
}

pub(super) struct RequestTyped<C>([C; 6]);
pub(crate) struct RequestTyped<C>([C; 6]);

impl<C> RequestTyped<C> {
pub(super) fn get(&self, kind: RequestKind) -> &C {
pub(crate) fn get(&self, kind: RequestKind) -> &C {
&self.0[kind.as_index()]
}

Expand All @@ -58,19 +59,19 @@ impl<C> RequestTyped<C> {
}

impl RequestTyped<Histogram> {
pub(super) fn observe_elapsed(&self, kind: RequestKind, started_at: std::time::Instant) {
pub(crate) fn observe_elapsed(&self, kind: RequestKind, started_at: std::time::Instant) {
self.get(kind).observe(started_at.elapsed().as_secs_f64())
}
}

pub(super) struct PassFailCancelledRequestTyped<C> {
pub(crate) struct PassFailCancelledRequestTyped<C> {
success: RequestTyped<C>,
fail: RequestTyped<C>,
cancelled: RequestTyped<C>,
}

#[derive(Debug, Clone, Copy)]
pub(super) enum AttemptOutcome {
pub(crate) enum AttemptOutcome {
Ok,
Err,
Cancelled,
Expand All @@ -86,7 +87,7 @@ impl<T, E> From<&Result<T, E>> for AttemptOutcome {
}

impl AttemptOutcome {
pub(super) fn as_str(&self) -> &'static str {
pub(crate) fn as_str(&self) -> &'static str {
match self {
AttemptOutcome::Ok => "ok",
AttemptOutcome::Err => "err",
Expand All @@ -96,7 +97,7 @@ impl AttemptOutcome {
}

impl<C> PassFailCancelledRequestTyped<C> {
pub(super) fn get(&self, kind: RequestKind, outcome: AttemptOutcome) -> &C {
pub(crate) fn get(&self, kind: RequestKind, outcome: AttemptOutcome) -> &C {
let target = match outcome {
AttemptOutcome::Ok => &self.success,
AttemptOutcome::Err => &self.fail,
Expand All @@ -119,7 +120,7 @@ impl<C> PassFailCancelledRequestTyped<C> {
}

impl PassFailCancelledRequestTyped<Histogram> {
pub(super) fn observe_elapsed(
pub(crate) fn observe_elapsed(
&self,
kind: RequestKind,
outcome: impl Into<AttemptOutcome>,
Expand All @@ -130,19 +131,44 @@ impl PassFailCancelledRequestTyped<Histogram> {
}
}

pub(super) struct BucketMetrics {
/// On drop (cancellation) count towards [`BucketMetrics::cancelled_waits`].
pub(crate) fn start_counting_cancelled_wait(
kind: RequestKind,
) -> ScopeGuard<std::time::Instant, impl FnOnce(std::time::Instant), scopeguard::OnSuccess> {
scopeguard::guard_on_success(std::time::Instant::now(), move |_| {
crate::metrics::BUCKET_METRICS
.cancelled_waits
.get(kind)
.inc()
})
}

/// On drop (cancellation) add time to [`BucketMetrics::req_seconds`].
pub(crate) fn start_measuring_requests(
kind: RequestKind,
) -> ScopeGuard<std::time::Instant, impl FnOnce(std::time::Instant), scopeguard::OnSuccess> {
scopeguard::guard_on_success(std::time::Instant::now(), move |started_at| {
crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
kind,
AttemptOutcome::Cancelled,
started_at,
)
})
}

pub(crate) struct BucketMetrics {
/// Full request duration until successful completion, error or cancellation.
pub(super) req_seconds: PassFailCancelledRequestTyped<Histogram>,
pub(crate) req_seconds: PassFailCancelledRequestTyped<Histogram>,
/// Total amount of seconds waited on queue.
pub(super) wait_seconds: RequestTyped<Histogram>,
pub(crate) wait_seconds: RequestTyped<Histogram>,

/// Track how many semaphore awaits were cancelled per request type.
///
/// This is in case cancellations are happening more than expected.
pub(super) cancelled_waits: RequestTyped<IntCounter>,
pub(crate) cancelled_waits: RequestTyped<IntCounter>,

/// Total amount of deleted objects in batches or single requests.
pub(super) deleted_objects_total: IntCounter,
pub(crate) deleted_objects_total: IntCounter,
}

impl Default for BucketMetrics {
Expand Down
Loading

1 comment on commit db477c0

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3244 tests run: 3091 passed, 1 failed, 152 skipped (full report)


Failures on Postgres 14

  • test_basebackup_with_high_slru_count[github-actions-selfhosted-sequential-10-13-30]: release
# Run all failed tests locally:
scripts/pytest -vv -n $(nproc) -k "test_basebackup_with_high_slru_count[release-pg14-github-actions-selfhosted-sequential-10-13-30]"
Flaky tests (1)

Postgres 15

  • test_vm_bit_clear_on_heap_lock: debug

Code coverage* (full report)

  • functions: 31.4% (6532 of 20798 functions)
  • lines: 48.3% (50419 of 104319 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
db477c0 at 2024-06-02T15:30:55.854Z :recycle:

Please sign in to comment.