Skip to content

Commit

Permalink
Migrate physical GC and scan_metadata to remote_storage (#8673)
Browse files Browse the repository at this point in the history
Migrates most of the remaining parts of the scrubber to remote_storage:

* `pageserver_physical_gc`
* `scan_metadata` for pageservers (safekeepers were done in #8595)
* `download()` in `tenant_snapshot`. The main `tenant_snapshot` is not
migrated as it uses version history to be able to work in the face of
ongoing changes.
 
Part of #7547
  • Loading branch information
arpad-m authored and VladLazar committed Aug 20, 2024
1 parent 1cfe938 commit e2a42ec
Show file tree
Hide file tree
Showing 14 changed files with 368 additions and 343 deletions.
42 changes: 42 additions & 0 deletions libs/remote_storage/src/azure_blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,48 @@ impl RemoteStorage for AzureBlobStorage {
}
}

async fn head_object(
&self,
key: &RemotePath,
cancel: &CancellationToken,
) -> Result<ListingObject, DownloadError> {
let kind = RequestKind::Head;
let _permit = self.permit(kind, cancel).await?;

let started_at = start_measuring_requests(kind);

let blob_client = self.client.blob_client(self.relative_path_to_name(key));
let properties_future = blob_client.get_properties().into_future();

let properties_future = tokio::time::timeout(self.timeout, properties_future);

let res = tokio::select! {
res = properties_future => res,
_ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
};

if let Ok(inner) = &res {
// do not incl. timeouts as errors in metrics but cancellations
let started_at = ScopeGuard::into_inner(started_at);
crate::metrics::BUCKET_METRICS
.req_seconds
.observe_elapsed(kind, inner, started_at);
}

let data = match res {
Ok(Ok(data)) => Ok(data),
Ok(Err(sdk)) => Err(to_download_error(sdk)),
Err(_timeout) => Err(DownloadError::Timeout),
}?;

let properties = data.blob.properties;
Ok(ListingObject {
key: key.to_owned(),
last_modified: SystemTime::from(properties.last_modified),
size: properties.content_length,
})
}

async fn upload(
&self,
from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
Expand Down
24 changes: 23 additions & 1 deletion libs/remote_storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ pub enum ListingMode {
NoDelimiter,
}

#[derive(PartialEq, Eq, Debug)]
#[derive(PartialEq, Eq, Debug, Clone)]
pub struct ListingObject {
pub key: RemotePath,
pub last_modified: SystemTime,
Expand Down Expand Up @@ -215,6 +215,13 @@ pub trait RemoteStorage: Send + Sync + 'static {
Ok(combined)
}

/// Obtain metadata information about an object.
async fn head_object(
&self,
key: &RemotePath,
cancel: &CancellationToken,
) -> Result<ListingObject, DownloadError>;

/// Streams the local file contents into remote into the remote storage entry.
///
/// If the operation fails because of timeout or cancellation, the root cause of the error will be
Expand Down Expand Up @@ -363,6 +370,20 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
}
}

// See [`RemoteStorage::head_object`].
pub async fn head_object(
&self,
key: &RemotePath,
cancel: &CancellationToken,
) -> Result<ListingObject, DownloadError> {
match self {
Self::LocalFs(s) => s.head_object(key, cancel).await,
Self::AwsS3(s) => s.head_object(key, cancel).await,
Self::AzureBlob(s) => s.head_object(key, cancel).await,
Self::Unreliable(s) => s.head_object(key, cancel).await,
}
}

/// See [`RemoteStorage::upload`]
pub async fn upload(
&self,
Expand Down Expand Up @@ -598,6 +619,7 @@ impl ConcurrencyLimiter {
RequestKind::Delete => &self.write,
RequestKind::Copy => &self.write,
RequestKind::TimeTravel => &self.write,
RequestKind::Head => &self.read,
}
}

Expand Down
14 changes: 14 additions & 0 deletions libs/remote_storage/src/local_fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,20 @@ impl RemoteStorage for LocalFs {
}
}

async fn head_object(
&self,
key: &RemotePath,
_cancel: &CancellationToken,
) -> Result<ListingObject, DownloadError> {
let target_file_path = key.with_base(&self.storage_root);
let metadata = file_metadata(&target_file_path).await?;
Ok(ListingObject {
key: key.clone(),
last_modified: metadata.modified()?,
size: metadata.len(),
})
}

async fn upload(
&self,
data: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync,
Expand Down
9 changes: 6 additions & 3 deletions libs/remote_storage/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub(crate) enum RequestKind {
List = 3,
Copy = 4,
TimeTravel = 5,
Head = 6,
}

use scopeguard::ScopeGuard;
Expand All @@ -27,14 +28,16 @@ impl RequestKind {
List => "list_objects",
Copy => "copy_object",
TimeTravel => "time_travel_recover",
Head => "head_object",
}
}
const fn as_index(&self) -> usize {
*self as usize
}
}

pub(crate) struct RequestTyped<C>([C; 6]);
const REQUEST_KIND_COUNT: usize = 7;
pub(crate) struct RequestTyped<C>([C; REQUEST_KIND_COUNT]);

impl<C> RequestTyped<C> {
pub(crate) fn get(&self, kind: RequestKind) -> &C {
Expand All @@ -43,8 +46,8 @@ impl<C> RequestTyped<C> {

fn build_with(mut f: impl FnMut(RequestKind) -> C) -> Self {
use RequestKind::*;
let mut it = [Get, Put, Delete, List, Copy, TimeTravel].into_iter();
let arr = std::array::from_fn::<C, 6, _>(|index| {
let mut it = [Get, Put, Delete, List, Copy, TimeTravel, Head].into_iter();
let arr = std::array::from_fn::<C, REQUEST_KIND_COUNT, _>(|index| {
let next = it.next().unwrap();
assert_eq!(index, next.as_index());
f(next)
Expand Down
74 changes: 73 additions & 1 deletion libs/remote_storage/src/s3_bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use aws_config::{
use aws_sdk_s3::{
config::{AsyncSleep, IdentityCache, Region, SharedAsyncSleep},
error::SdkError,
operation::get_object::GetObjectError,
operation::{get_object::GetObjectError, head_object::HeadObjectError},
types::{Delete, DeleteMarkerEntry, ObjectIdentifier, ObjectVersion, StorageClass},
Client,
};
Expand Down Expand Up @@ -604,6 +604,78 @@ impl RemoteStorage for S3Bucket {
}
}

async fn head_object(
&self,
key: &RemotePath,
cancel: &CancellationToken,
) -> Result<ListingObject, DownloadError> {
let kind = RequestKind::Head;
let _permit = self.permit(kind, cancel).await?;

let started_at = start_measuring_requests(kind);

let head_future = self
.client
.head_object()
.bucket(self.bucket_name())
.key(self.relative_path_to_s3_object(key))
.send();

let head_future = tokio::time::timeout(self.timeout, head_future);

let res = tokio::select! {
res = head_future => res,
_ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
};

let res = res.map_err(|_e| DownloadError::Timeout)?;

// do not incl. timeouts as errors in metrics but cancellations
let started_at = ScopeGuard::into_inner(started_at);
crate::metrics::BUCKET_METRICS
.req_seconds
.observe_elapsed(kind, &res, started_at);

let data = match res {
Ok(object_output) => object_output,
Err(SdkError::ServiceError(e)) if matches!(e.err(), HeadObjectError::NotFound(_)) => {
// Count this in the AttemptOutcome::Ok bucket, because 404 is not
// an error: we expect to sometimes fetch an object and find it missing,
// e.g. when probing for timeline indices.
crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
kind,
AttemptOutcome::Ok,
started_at,
);
return Err(DownloadError::NotFound);
}
Err(e) => {
crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
kind,
AttemptOutcome::Err,
started_at,
);

return Err(DownloadError::Other(
anyhow::Error::new(e).context("s3 head object"),
));
}
};

let (Some(last_modified), Some(size)) = (data.last_modified, data.content_length) else {
return Err(DownloadError::Other(anyhow!(
"head_object doesn't contain last_modified or content_length"
)))?;
};
Ok(ListingObject {
key: key.to_owned(),
last_modified: SystemTime::try_from(last_modified).map_err(|e| {
DownloadError::Other(anyhow!("can't convert time '{last_modified}': {e}"))
})?,
size: size as u64,
})
}

async fn upload(
&self,
from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
Expand Down
11 changes: 11 additions & 0 deletions libs/remote_storage/src/simulate_failures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub struct UnreliableWrapper {
#[derive(Debug, Hash, Eq, PartialEq)]
enum RemoteOp {
ListPrefixes(Option<RemotePath>),
HeadObject(RemotePath),
Upload(RemotePath),
Download(RemotePath),
Delete(RemotePath),
Expand Down Expand Up @@ -137,6 +138,16 @@ impl RemoteStorage for UnreliableWrapper {
self.inner.list(prefix, mode, max_keys, cancel).await
}

async fn head_object(
&self,
key: &RemotePath,
cancel: &CancellationToken,
) -> Result<crate::ListingObject, DownloadError> {
self.attempt(RemoteOp::HeadObject(key.clone()))
.map_err(DownloadError::Other)?;
self.inner.head_object(key, cancel).await
}

async fn upload(
&self,
data: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
Expand Down
Loading

0 comments on commit e2a42ec

Please sign in to comment.