Skip to content

Commit

Permalink
Add concurrency to the find-large-objects scrubber subcommand
Browse files Browse the repository at this point in the history
  • Loading branch information
arpad-m committed Jul 5, 2024
1 parent c9fd8d7 commit a6975f0
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 41 deletions.
113 changes: 75 additions & 38 deletions storage_scrubber/src/find_large_objects.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
use futures::StreamExt;
use std::sync::{
atomic::{AtomicU64, Ordering},
Arc,
};

use futures::{StreamExt, TryStreamExt};
use pageserver::tenant::storage_layer::LayerName;
use serde::{Deserialize, Serialize};

Expand Down Expand Up @@ -29,7 +34,7 @@ impl LargeObjectKind {
}
}

#[derive(Serialize, Deserialize)]
#[derive(Serialize, Deserialize, Clone)]
pub struct LargeObject {
pub key: String,
pub size: u64,
Expand All @@ -45,53 +50,85 @@ pub async fn find_large_objects(
bucket_config: BucketConfig,
min_size: u64,
ignore_deltas: bool,
concurrency: usize,
) -> anyhow::Result<LargeObjectListing> {
let (s3_client, target) = init_remote(bucket_config.clone(), NodeKind::Pageserver)?;
let mut tenants = std::pin::pin!(stream_tenants(&s3_client, &target));
let mut objects = Vec::new();
let mut tenant_ctr = 0u64;
let mut object_ctr = 0u64;
while let Some(tenant_shard_id) = tenants.next().await {
let tenant_shard_id = tenant_shard_id?;
let tenants = std::pin::pin!(stream_tenants(&s3_client, &target));

let tenant_ctr = Arc::new(AtomicU64::new(0));
let object_ctr = Arc::new(AtomicU64::new(0));

let objects_stream = tenants.map_ok(|tenant_shard_id| {
let mut tenant_root = target.tenant_root(&tenant_shard_id);
// We want the objects and not just common prefixes
tenant_root.delimiter.clear();
let mut continuation_token = None;
loop {
let fetch_response =
list_objects_with_retries(&s3_client, &tenant_root, continuation_token.clone())
.await?;
for obj in fetch_response.contents().iter().filter(|o| {
if let Some(obj_size) = o.size {
min_size as i64 <= obj_size
} else {
false
let tenant_ctr = tenant_ctr.clone();
let object_ctr = object_ctr.clone();
let s3_client = s3_client.clone();
async move {
let mut objects = Vec::new();
// We want the objects and not just common prefixes
tenant_root.delimiter.clear();
let mut continuation_token = None;
loop {
let fetch_response =
list_objects_with_retries(&s3_client, &tenant_root, continuation_token.clone())
.await
.expect("couldn't list objects");
for obj in fetch_response.contents().iter().filter(|o| {
if let Some(obj_size) = o.size {
min_size as i64 <= obj_size
} else {
false
}
}) {
let key = obj.key().expect("couldn't get key").to_owned();
let kind = LargeObjectKind::from_key(&key);
if ignore_deltas && kind == LargeObjectKind::DeltaLayer {
continue;
}
objects.push(LargeObject {
key,
size: obj.size.unwrap() as u64,
kind,
})
}
}) {
let key = obj.key().expect("couldn't get key").to_owned();
let kind = LargeObjectKind::from_key(&key);
if ignore_deltas && kind == LargeObjectKind::DeltaLayer {
continue;
object_ctr.fetch_add(fetch_response.contents().len() as u64, Ordering::Relaxed);
match fetch_response.next_continuation_token {
Some(new_token) => continuation_token = Some(new_token),
None => break,
}
objects.push(LargeObject {
key,
size: obj.size.unwrap() as u64,
kind,
})
}
object_ctr += fetch_response.contents().len() as u64;
match fetch_response.next_continuation_token {
Some(new_token) => continuation_token = Some(new_token),
None => break,
}

tenant_ctr.fetch_add(1, Ordering::Relaxed);

Ok((tenant_shard_id, objects))
}
});
let mut objects_stream = std::pin::pin!(objects_stream.try_buffer_unordered(concurrency));

//let mut objects_stream = objects_stream.flatten();

let mut objects = Vec::new();
while let Some(res) = objects_stream.next().await {
let (tenant_shard_id, objects_slice) = res?;
objects.extend_from_slice(&objects_slice);

let tenants_count = tenant_ctr.load(Ordering::Relaxed);

if tenants_count % 100 == 0 {
let objects_count = object_ctr.load(Ordering::Relaxed);

tenant_ctr += 1;
if tenant_ctr % 50 == 0 {
tracing::info!(
"Scanned {tenant_ctr} shards. objects={object_ctr}, found={}, current={tenant_shard_id}.", objects.len()
"Scanned {tenants_count} shards. objects={objects_count}, found={}, current={tenant_shard_id}.",
objects.len()
);
}
}

let tenants_count = tenant_ctr.load(Ordering::Relaxed);
let objects_count = object_ctr.load(Ordering::Relaxed);
tracing::info!(
"Scan finished. Scanned {tenants_count} shards. objects={objects_count}, found={}.",
objects.len()
);
Ok(LargeObjectListing { objects })
}
13 changes: 10 additions & 3 deletions storage_scrubber/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ enum Command {
min_size: u64,
#[arg(short, long, default_value_t = false)]
ignore_deltas: bool,
#[arg(long = "concurrency", short = 'j', default_value_t = 64)]
concurrency: usize,
},
}

Expand Down Expand Up @@ -210,10 +212,15 @@ async fn main() -> anyhow::Result<()> {
Command::FindLargeObjects {
min_size,
ignore_deltas,
concurrency,
} => {
let summary =
find_large_objects::find_large_objects(bucket_config, min_size, ignore_deltas)
.await?;
let summary = find_large_objects::find_large_objects(
bucket_config,
min_size,
ignore_deltas,
concurrency,
)
.await?;
println!("{}", serde_json::to_string(&summary).unwrap());
Ok(())
}
Expand Down

0 comments on commit a6975f0

Please sign in to comment.