Skip to content

Commit

Permalink
Remove atomics completely
Browse files Browse the repository at this point in the history
they are not needed
  • Loading branch information
arpad-m committed Jul 5, 2024
1 parent beeb8b7 commit 9c5e2f3
Showing 1 changed file with 10 additions and 17 deletions.
27 changes: 10 additions & 17 deletions storage_scrubber/src/find_large_objects.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,3 @@
use std::sync::{
atomic::{AtomicU64, Ordering},
Arc,
};

use futures::{StreamExt, TryStreamExt};
use pageserver::tenant::storage_layer::LayerName;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -55,14 +50,12 @@ pub async fn find_large_objects(
let (s3_client, target) = init_remote(bucket_config.clone(), NodeKind::Pageserver)?;
let tenants = std::pin::pin!(stream_tenants(&s3_client, &target));

let object_ctr = Arc::new(AtomicU64::new(0));

let objects_stream = tenants.map_ok(|tenant_shard_id| {
let mut tenant_root = target.tenant_root(&tenant_shard_id);
let object_ctr = object_ctr.clone();
let s3_client = s3_client.clone();
async move {
let mut objects = Vec::new();
let mut total_objects_ctr = 0u64;
// We want the objects and not just common prefixes
tenant_root.delimiter.clear();
let mut continuation_token = None;
Expand All @@ -88,39 +81,39 @@ pub async fn find_large_objects(
kind,
})
}
object_ctr.fetch_add(fetch_response.contents().len() as u64, Ordering::Relaxed);
total_objects_ctr += fetch_response.contents().len() as u64;
match fetch_response.next_continuation_token {
Some(new_token) => continuation_token = Some(new_token),
None => break,
}
}

Ok((tenant_shard_id, objects))
Ok((tenant_shard_id, objects, total_objects_ctr))
}
});
let mut objects_stream = std::pin::pin!(objects_stream.try_buffer_unordered(concurrency));

let mut objects = Vec::new();

let tenant_ctr = 0u64;
let mut tenant_ctr = 0u64;
let mut object_ctr = 0u64;
while let Some(res) = objects_stream.next().await {
let (tenant_shard_id, objects_slice) = res?;
let (tenant_shard_id, objects_slice, total_objects_ctr) = res?;
objects.extend_from_slice(&objects_slice);

object_ctr += total_objects_ctr;
tenant_ctr += 1;
if tenant_ctr % 100 == 0 {
let objects_count = object_ctr.load(Ordering::Relaxed);

tracing::info!(
"Scanned {tenant_ctr} shards. objects={objects_count}, found={}, current={tenant_shard_id}.",
"Scanned {tenant_ctr} shards. objects={object_ctr}, found={}, current={tenant_shard_id}.",
objects.len()
);
}
}

let bucket_name = target.bucket_name();
let objects_count = object_ctr.load(Ordering::Relaxed);
tracing::info!(
"Scan of {bucket_name} finished. Scanned {tenant_ctr} shards. objects={objects_count}, found={}.",
"Scan of {bucket_name} finished. Scanned {tenant_ctr} shards. objects={object_ctr}, found={}.",
objects.len()
);
Ok(LargeObjectListing { objects })
Expand Down

0 comments on commit 9c5e2f3

Please sign in to comment.