Skip to content

Commit

Permalink
pageserver: post-shard-split layer rewrites (2/2) (#7531)
Browse files Browse the repository at this point in the history
## Problem

- After a shard split of a large existing tenant, child tenants can end
up with oversized historic layers indefinitely, if those layers are
prevented from being GC'd by branchpoints.

This PR follows #7531, and adds
rewriting of layers that contain a mixture of needed & un-needed
contents, in addition to dropping un-needed layers.

Closes: #7504

## Summary of changes

- Add methods to ImageLayer for reading back existing layers
- Extend `compact_shard_ancestors` to rewrite layer files that contain a
mixture of keys that we want and keys we do not, if unwanted keys are
the majority of those in the file.
- Amend initialization code to handle multiple layers with the same
LayerName properly
- Get rid of of renaming bad layer files to `.old` since that's now
expected on restarts during rewrites.
  • Loading branch information
jcsp authored May 24, 2024
1 parent c1f4028 commit 3860bc9
Show file tree
Hide file tree
Showing 8 changed files with 548 additions and 193 deletions.
200 changes: 196 additions & 4 deletions pageserver/src/tenant/storage_layer/image_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use hex;
use itertools::Itertools;
use pageserver_api::keyspace::KeySpace;
use pageserver_api::models::LayerAccessKind;
use pageserver_api::shard::TenantShardId;
use pageserver_api::shard::{ShardIdentity, TenantShardId};
use rand::{distributions::Alphanumeric, Rng};
use serde::{Deserialize, Serialize};
use std::fs::File;
Expand Down Expand Up @@ -473,7 +473,7 @@ impl ImageLayerInner {
ctx: &RequestContext,
) -> Result<(), GetVectoredError> {
let reads = self
.plan_reads(keyspace, ctx)
.plan_reads(keyspace, None, ctx)
.await
.map_err(GetVectoredError::Other)?;

Expand All @@ -485,9 +485,15 @@ impl ImageLayerInner {
Ok(())
}

/// Traverse the layer's index to build read operations on the overlap of the input keyspace
/// and the keys in this layer.
///
/// If shard_identity is provided, it will be used to filter keys down to those stored on
/// this shard.
async fn plan_reads(
&self,
keyspace: KeySpace,
shard_identity: Option<&ShardIdentity>,
ctx: &RequestContext,
) -> anyhow::Result<Vec<VectoredRead>> {
let mut planner = VectoredReadPlanner::new(
Expand All @@ -507,7 +513,6 @@ impl ImageLayerInner {

for range in keyspace.ranges.iter() {
let mut range_end_handled = false;

let mut search_key: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
range.start.write_to_byte_slice(&mut search_key);

Expand All @@ -520,12 +525,22 @@ impl ImageLayerInner {
let key = Key::from_slice(&raw_key[..KEY_SIZE]);
assert!(key >= range.start);

let flag = if let Some(shard_identity) = shard_identity {
if shard_identity.is_key_disposable(&key) {
BlobFlag::Ignore
} else {
BlobFlag::None
}
} else {
BlobFlag::None
};

if key >= range.end {
planner.handle_range_end(offset);
range_end_handled = true;
break;
} else {
planner.handle(key, self.lsn, offset, BlobFlag::None);
planner.handle(key, self.lsn, offset, flag);
}
}

Expand All @@ -538,6 +553,50 @@ impl ImageLayerInner {
Ok(planner.finish())
}

/// Given a key range, select the parts of that range that should be retained by the ShardIdentity,
/// then execute vectored GET operations, passing the results of all read keys into the writer.
pub(super) async fn filter(
&self,
shard_identity: &ShardIdentity,
writer: &mut ImageLayerWriter,
ctx: &RequestContext,
) -> anyhow::Result<usize> {
// Fragment the range into the regions owned by this ShardIdentity
let plan = self
.plan_reads(
KeySpace {
// If asked for the total key space, plan_reads will give us all the keys in the layer
ranges: vec![Key::MIN..Key::MAX],
},
Some(shard_identity),
ctx,
)
.await?;

let vectored_blob_reader = VectoredBlobReader::new(&self.file);
let mut key_count = 0;
for read in plan.into_iter() {
let buf_size = read.size();

let buf = BytesMut::with_capacity(buf_size);
let blobs_buf = vectored_blob_reader.read_blobs(&read, buf, ctx).await?;

let frozen_buf = blobs_buf.buf.freeze();

for meta in blobs_buf.blobs.iter() {
let img_buf = frozen_buf.slice(meta.start..meta.end);

key_count += 1;
writer
.put_image(meta.meta.key, img_buf, ctx)
.await
.context(format!("Storing key {}", meta.meta.key))?;
}
}

Ok(key_count)
}

async fn do_reads_and_update_state(
&self,
reads: Vec<VectoredRead>,
Expand Down Expand Up @@ -855,3 +914,136 @@ impl Drop for ImageLayerWriter {
}
}
}

#[cfg(test)]
mod test {
use bytes::Bytes;
use pageserver_api::{
key::Key,
shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize},
};
use utils::{id::TimelineId, lsn::Lsn};

use crate::{tenant::harness::TenantHarness, DEFAULT_PG_VERSION};

use super::ImageLayerWriter;

#[tokio::test]
async fn image_layer_rewrite() {
let harness = TenantHarness::create("test_image_layer_rewrite").unwrap();
let (tenant, ctx) = harness.load().await;

// The LSN at which we will create an image layer to filter
let lsn = Lsn(0xdeadbeef0000);

let timeline_id = TimelineId::generate();
let timeline = tenant
.create_test_timeline(timeline_id, lsn, DEFAULT_PG_VERSION, &ctx)
.await
.unwrap();

// This key range contains several 0x8000 page stripes, only one of which belongs to shard zero
let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
let input_end = Key::from_hex("000000067f00000001000000ae0000020000").unwrap();
let range = input_start..input_end;

// Build an image layer to filter
let resident = {
let mut writer = ImageLayerWriter::new(
harness.conf,
timeline_id,
harness.tenant_shard_id,
&range,
lsn,
&ctx,
)
.await
.unwrap();

let foo_img = Bytes::from_static(&[1, 2, 3, 4]);
let mut key = range.start;
while key < range.end {
writer.put_image(key, foo_img.clone(), &ctx).await.unwrap();

key = key.next();
}
writer.finish(&timeline, &ctx).await.unwrap()
};
let original_size = resident.metadata().file_size;

// Filter for various shards: this exercises cases like values at start of key range, end of key
// range, middle of key range.
for shard_number in 0..4 {
let mut filtered_writer = ImageLayerWriter::new(
harness.conf,
timeline_id,
harness.tenant_shard_id,
&range,
lsn,
&ctx,
)
.await
.unwrap();

// TenantHarness gave us an unsharded tenant, but we'll use a sharded ShardIdentity
// to exercise filter()
let shard_identity = ShardIdentity::new(
ShardNumber(shard_number),
ShardCount::new(4),
ShardStripeSize(0x8000),
)
.unwrap();

let wrote_keys = resident
.filter(&shard_identity, &mut filtered_writer, &ctx)
.await
.unwrap();
let replacement = if wrote_keys > 0 {
Some(filtered_writer.finish(&timeline, &ctx).await.unwrap())
} else {
None
};

// This exact size and those below will need updating as/when the layer encoding changes, but
// should be deterministic for a given version of the format, as we used no randomness generating the input.
assert_eq!(original_size, 1597440);

match shard_number {
0 => {
// We should have written out just one stripe for our shard identity
assert_eq!(wrote_keys, 0x8000);
let replacement = replacement.unwrap();

// We should have dropped some of the data
assert!(replacement.metadata().file_size < original_size);
assert!(replacement.metadata().file_size > 0);

// Assert that we dropped ~3/4 of the data.
assert_eq!(replacement.metadata().file_size, 417792);
}
1 => {
// Shard 1 has no keys in our input range
assert_eq!(wrote_keys, 0x0);
assert!(replacement.is_none());
}
2 => {
// Shard 2 has one stripes in the input range
assert_eq!(wrote_keys, 0x8000);
let replacement = replacement.unwrap();
assert!(replacement.metadata().file_size < original_size);
assert!(replacement.metadata().file_size > 0);
assert_eq!(replacement.metadata().file_size, 417792);
}
3 => {
// Shard 3 has two stripes in the input range
assert_eq!(wrote_keys, 0x10000);
let replacement = replacement.unwrap();
assert!(replacement.metadata().file_size < original_size);
assert!(replacement.metadata().file_size > 0);
assert_eq!(replacement.metadata().file_size, 811008);
}
_ => unreachable!(),
}
}
}
}
32 changes: 24 additions & 8 deletions pageserver/src/tenant/storage_layer/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use pageserver_api::keyspace::KeySpace;
use pageserver_api::models::{
HistoricLayerInfo, LayerAccessKind, LayerResidenceEventReason, LayerResidenceStatus,
};
use pageserver_api::shard::{ShardIndex, TenantShardId};
use pageserver_api::shard::{ShardIdentity, ShardIndex, TenantShardId};
use std::ops::Range;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, Weak};
Expand All @@ -23,10 +23,10 @@ use crate::tenant::timeline::GetVectoredError;
use crate::tenant::{remote_timeline_client::LayerFileMetadata, Timeline};

use super::delta_layer::{self, DeltaEntry};
use super::image_layer;
use super::image_layer::{self};
use super::{
AsLayerDesc, LayerAccessStats, LayerAccessStatsReset, LayerName, PersistentLayerDesc,
ValueReconstructResult, ValueReconstructState, ValuesReconstructState,
AsLayerDesc, ImageLayerWriter, LayerAccessStats, LayerAccessStatsReset, LayerName,
PersistentLayerDesc, ValueReconstructResult, ValueReconstructState, ValuesReconstructState,
};

use utils::generation::Generation;
Expand Down Expand Up @@ -1802,16 +1802,15 @@ impl ResidentLayer {
use LayerKind::*;

let owner = &self.owner.0;

match self.downloaded.get(owner, ctx).await? {
Delta(ref d) => {
// this is valid because the DownloadedLayer::kind is a OnceCell, not a
// Mutex<OnceCell>, so we cannot go and deinitialize the value with OnceCell::take
// while it's being held.
owner
.access_stats
.record_access(LayerAccessKind::KeyIter, ctx);

// this is valid because the DownloadedLayer::kind is a OnceCell, not a
// Mutex<OnceCell>, so we cannot go and deinitialize the value with OnceCell::take
// while it's being held.
delta_layer::DeltaLayerInner::load_keys(d, ctx)
.await
.with_context(|| format!("Layer index is corrupted for {self}"))
Expand All @@ -1820,6 +1819,23 @@ impl ResidentLayer {
}
}

/// Read all they keys in this layer which match the ShardIdentity, and write them all to
/// the provided writer. Return the number of keys written.
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all, fields(layer=%self))]
pub(crate) async fn filter<'a>(
&'a self,
shard_identity: &ShardIdentity,
writer: &mut ImageLayerWriter,
ctx: &RequestContext,
) -> anyhow::Result<usize> {
use LayerKind::*;

match self.downloaded.get(&self.owner.0, ctx).await? {
Delta(_) => anyhow::bail!(format!("cannot filter() on a delta layer {self}")),
Image(i) => i.filter(shard_identity, writer, ctx).await,
}
}

/// Returns the amount of keys and values written to the writer.
pub(crate) async fn copy_delta_prefix(
&self,
Expand Down
Loading

1 comment on commit 3860bc9

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3220 tests run: 3080 passed, 0 failed, 140 skipped (full report)


Flaky tests (1)

Postgres 15

  • test_timeline_deletion_with_files_stuck_in_upload_queue: debug

Code coverage* (full report)

  • functions: 31.4% (6450 of 20545 functions)
  • lines: 48.3% (49971 of 103400 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
3860bc9 at 2024-05-24T09:55:41.121Z :recycle:

Please sign in to comment.