From a6975f0ce0d5c90e5c24bfb947abb6d6deaac936 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Fri, 5 Jul 2024 19:28:23 +0200 Subject: [PATCH] Add concurrency to the find-large-objects scrubber subcommand --- storage_scrubber/src/find_large_objects.rs | 113 ++++++++++++++------- storage_scrubber/src/main.rs | 13 ++- 2 files changed, 85 insertions(+), 41 deletions(-) diff --git a/storage_scrubber/src/find_large_objects.rs b/storage_scrubber/src/find_large_objects.rs index 24668b65169a..d9bf83a7cee9 100644 --- a/storage_scrubber/src/find_large_objects.rs +++ b/storage_scrubber/src/find_large_objects.rs @@ -1,4 +1,9 @@ -use futures::StreamExt; +use std::sync::{ + atomic::{AtomicU64, Ordering}, + Arc, +}; + +use futures::{StreamExt, TryStreamExt}; use pageserver::tenant::storage_layer::LayerName; use serde::{Deserialize, Serialize}; @@ -29,7 +34,7 @@ impl LargeObjectKind { } } -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Clone)] pub struct LargeObject { pub key: String, pub size: u64, @@ -45,53 +50,85 @@ pub async fn find_large_objects( bucket_config: BucketConfig, min_size: u64, ignore_deltas: bool, + concurrency: usize, ) -> anyhow::Result { let (s3_client, target) = init_remote(bucket_config.clone(), NodeKind::Pageserver)?; - let mut tenants = std::pin::pin!(stream_tenants(&s3_client, &target)); - let mut objects = Vec::new(); - let mut tenant_ctr = 0u64; - let mut object_ctr = 0u64; - while let Some(tenant_shard_id) = tenants.next().await { - let tenant_shard_id = tenant_shard_id?; + let tenants = std::pin::pin!(stream_tenants(&s3_client, &target)); + + let tenant_ctr = Arc::new(AtomicU64::new(0)); + let object_ctr = Arc::new(AtomicU64::new(0)); + + let objects_stream = tenants.map_ok(|tenant_shard_id| { let mut tenant_root = target.tenant_root(&tenant_shard_id); - // We want the objects and not just common prefixes - tenant_root.delimiter.clear(); - let mut continuation_token = None; - loop { - let fetch_response = - list_objects_with_retries(&s3_client, &tenant_root, continuation_token.clone()) - .await?; - for obj in fetch_response.contents().iter().filter(|o| { - if let Some(obj_size) = o.size { - min_size as i64 <= obj_size - } else { - false + let tenant_ctr = tenant_ctr.clone(); + let object_ctr = object_ctr.clone(); + let s3_client = s3_client.clone(); + async move { + let mut objects = Vec::new(); + // We want the objects and not just common prefixes + tenant_root.delimiter.clear(); + let mut continuation_token = None; + loop { + let fetch_response = + list_objects_with_retries(&s3_client, &tenant_root, continuation_token.clone()) + .await + .expect("couldn't list objects"); + for obj in fetch_response.contents().iter().filter(|o| { + if let Some(obj_size) = o.size { + min_size as i64 <= obj_size + } else { + false + } + }) { + let key = obj.key().expect("couldn't get key").to_owned(); + let kind = LargeObjectKind::from_key(&key); + if ignore_deltas && kind == LargeObjectKind::DeltaLayer { + continue; + } + objects.push(LargeObject { + key, + size: obj.size.unwrap() as u64, + kind, + }) } - }) { - let key = obj.key().expect("couldn't get key").to_owned(); - let kind = LargeObjectKind::from_key(&key); - if ignore_deltas && kind == LargeObjectKind::DeltaLayer { - continue; + object_ctr.fetch_add(fetch_response.contents().len() as u64, Ordering::Relaxed); + match fetch_response.next_continuation_token { + Some(new_token) => continuation_token = Some(new_token), + None => break, } - objects.push(LargeObject { - key, - size: obj.size.unwrap() as u64, - kind, - }) - } - object_ctr += fetch_response.contents().len() as u64; - match fetch_response.next_continuation_token { - Some(new_token) => continuation_token = Some(new_token), - None => break, } + + tenant_ctr.fetch_add(1, Ordering::Relaxed); + + Ok((tenant_shard_id, objects)) } + }); + let mut objects_stream = std::pin::pin!(objects_stream.try_buffer_unordered(concurrency)); + + //let mut objects_stream = objects_stream.flatten(); + + let mut objects = Vec::new(); + while let Some(res) = objects_stream.next().await { + let (tenant_shard_id, objects_slice) = res?; + objects.extend_from_slice(&objects_slice); + + let tenants_count = tenant_ctr.load(Ordering::Relaxed); + + if tenants_count % 100 == 0 { + let objects_count = object_ctr.load(Ordering::Relaxed); - tenant_ctr += 1; - if tenant_ctr % 50 == 0 { tracing::info!( - "Scanned {tenant_ctr} shards. objects={object_ctr}, found={}, current={tenant_shard_id}.", objects.len() + "Scanned {tenants_count} shards. objects={objects_count}, found={}, current={tenant_shard_id}.", + objects.len() ); } } + + let tenants_count = tenant_ctr.load(Ordering::Relaxed); + let objects_count = object_ctr.load(Ordering::Relaxed); + tracing::info!( + "Scan finished. Scanned {tenants_count} shards. objects={objects_count}, found={}.", + objects.len() + ); Ok(LargeObjectListing { objects }) } diff --git a/storage_scrubber/src/main.rs b/storage_scrubber/src/main.rs index 10699edd3c94..16a26613d25b 100644 --- a/storage_scrubber/src/main.rs +++ b/storage_scrubber/src/main.rs @@ -78,6 +78,8 @@ enum Command { min_size: u64, #[arg(short, long, default_value_t = false)] ignore_deltas: bool, + #[arg(long = "concurrency", short = 'j', default_value_t = 64)] + concurrency: usize, }, } @@ -210,10 +212,15 @@ async fn main() -> anyhow::Result<()> { Command::FindLargeObjects { min_size, ignore_deltas, + concurrency, } => { - let summary = - find_large_objects::find_large_objects(bucket_config, min_size, ignore_deltas) - .await?; + let summary = find_large_objects::find_large_objects( + bucket_config, + min_size, + ignore_deltas, + concurrency, + ) + .await?; println!("{}", serde_json::to_string(&summary).unwrap()); Ok(()) }