Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: cleanup of layers from the future can race with their re-creation #5890

Merged
merged 9 commits into from
Nov 23, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pageserver/ctl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,5 @@ tokio.workspace = true
utils.workspace = true
svg_fmt.workspace = true
workspace_hack.workspace = true
serde.workspace = true
serde_json.workspace = true
38 changes: 38 additions & 0 deletions pageserver/ctl/src/index_part.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
use std::collections::HashMap;

use anyhow::Context;
use camino::Utf8PathBuf;
use pageserver::tenant::remote_timeline_client::index::IndexLayerMetadata;
use pageserver::tenant::storage_layer::LayerFileName;
use pageserver::tenant::{metadata::TimelineMetadata, IndexPart};
use utils::lsn::Lsn;

#[derive(clap::Subcommand)]
pub(crate) enum IndexPartCmd {
Dump { path: Utf8PathBuf },
}

pub(crate) async fn main(cmd: &IndexPartCmd) -> anyhow::Result<()> {
match cmd {
IndexPartCmd::Dump { path } => {
let bytes = tokio::fs::read(path).await.context("read file")?;
let des: IndexPart = IndexPart::from_s3_bytes(&bytes).context("deserialize")?;
#[derive(serde::Serialize)]
struct Output<'a> {
layer_metadata: &'a HashMap<LayerFileName, IndexLayerMetadata>,
disk_consistent_lsn: Lsn,
timeline_metadata: &'a TimelineMetadata,
}

let output = Output {
layer_metadata: &des.layer_metadata,
disk_consistent_lsn: des.get_disk_consistent_lsn(),
timeline_metadata: &des.metadata,
};

let output = serde_json::to_string_pretty(&output).context("serialize output")?;
println!("{output}");
Ok(())
}
}
}
7 changes: 7 additions & 0 deletions pageserver/ctl/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
//! Separate, `metadata` subcommand allows to print and update pageserver's metadata file.

mod draw_timeline_dir;
mod index_part;
mod layer_map_analyzer;
mod layers;

use camino::{Utf8Path, Utf8PathBuf};
use clap::{Parser, Subcommand};
use index_part::IndexPartCmd;
use layers::LayerCmd;
use pageserver::{
context::{DownloadBehavior, RequestContext},
Expand Down Expand Up @@ -38,6 +40,8 @@ struct CliOpts {
#[derive(Subcommand)]
enum Commands {
Metadata(MetadataCmd),
#[command(subcommand)]
IndexPart(IndexPartCmd),
PrintLayerFile(PrintLayerFileCmd),
DrawTimeline {},
AnalyzeLayerMap(AnalyzeLayerMapCmd),
Expand Down Expand Up @@ -83,6 +87,9 @@ async fn main() -> anyhow::Result<()> {
Commands::Metadata(cmd) => {
handle_metadata(&cmd)?;
}
Commands::IndexPart(cmd) => {
index_part::main(&cmd).await?;
}
Commands::DrawTimeline {} => {
draw_timeline_dir::main()?;
}
Expand Down
1 change: 1 addition & 0 deletions pageserver/src/deletion_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,7 @@ impl DeletionQueueClient {
) -> Result<(), DeletionQueueError> {
if current_generation.is_none() {
debug!("Enqueuing deletions in legacy mode, skipping queue");

let mut layer_paths = Vec::new();
for (layer, generation) in layers {
layer_paths.push(remote_layer_path(
Expand Down
15 changes: 13 additions & 2 deletions pageserver/src/http/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::str::FromStr;
use std::sync::Arc;

use anyhow::{anyhow, Context, Result};
use enumset::EnumSet;
use futures::TryFutureExt;
use humantime::format_rfc3339;
use hyper::header;
Expand Down Expand Up @@ -42,6 +43,7 @@ use crate::tenant::mgr::{
};
use crate::tenant::size::ModelInputs;
use crate::tenant::storage_layer::LayerAccessStatsReset;
use crate::tenant::timeline::CompactFlags;
use crate::tenant::timeline::Timeline;
use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError, TenantSharedResources};
use crate::{config::PageServerConf, tenant::mgr};
Expand Down Expand Up @@ -1268,11 +1270,15 @@ async fn timeline_compact_handler(
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
check_permission(&request, Some(tenant_id))?;

let mut flags = EnumSet::empty();
if let Some(true) = parse_query_param::<_, bool>(&request, "force_repartition")? {
problame marked this conversation as resolved.
Show resolved Hide resolved
flags |= CompactFlags::ForceRepartition;
}
async {
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
timeline
.compact(&cancel, &ctx)
.compact(&cancel, flags, &ctx)
.await
.map_err(|e| ApiError::InternalServerError(e.into()))?;
json_response(StatusCode::OK, ())
Expand All @@ -1289,6 +1295,11 @@ async fn timeline_checkpoint_handler(
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
check_permission(&request, Some(tenant_id))?;

let mut flags = EnumSet::empty();
if let Some(true) = parse_query_param::<_, bool>(&request, "force_repartition")? {
problame marked this conversation as resolved.
Show resolved Hide resolved
flags |= CompactFlags::ForceRepartition;
}
async {
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
Expand All @@ -1297,7 +1308,7 @@ async fn timeline_checkpoint_handler(
.await
.map_err(ApiError::InternalServerError)?;
timeline
.compact(&cancel, &ctx)
.compact(&cancel, flags, &ctx)
.await
.map_err(|e| ApiError::InternalServerError(e.into()))?;

Expand Down
31 changes: 23 additions & 8 deletions pageserver/src/tenant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

use anyhow::{bail, Context};
use camino::{Utf8Path, Utf8PathBuf};
use enumset::EnumSet;
use futures::FutureExt;
use pageserver_api::models::TimelineState;
use remote_storage::DownloadError;
Expand Down Expand Up @@ -1699,7 +1700,7 @@ impl Tenant {

for (timeline_id, timeline) in &timelines_to_compact {
timeline
.compact(cancel, ctx)
.compact(cancel, EnumSet::empty(), ctx)
.instrument(info_span!("compact_timeline", %timeline_id))
.await?;
}
Expand Down Expand Up @@ -4298,7 +4299,9 @@ mod tests {
drop(writer);

tline.freeze_and_flush().await?;
tline.compact(&CancellationToken::new(), &ctx).await?;
tline
.compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
.await?;

let writer = tline.writer().await;
writer
Expand All @@ -4313,7 +4316,9 @@ mod tests {
drop(writer);

tline.freeze_and_flush().await?;
tline.compact(&CancellationToken::new(), &ctx).await?;
tline
.compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
.await?;

let writer = tline.writer().await;
writer
Expand All @@ -4328,7 +4333,9 @@ mod tests {
drop(writer);

tline.freeze_and_flush().await?;
tline.compact(&CancellationToken::new(), &ctx).await?;
tline
.compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
.await?;

let writer = tline.writer().await;
writer
Expand All @@ -4343,7 +4350,9 @@ mod tests {
drop(writer);

tline.freeze_and_flush().await?;
tline.compact(&CancellationToken::new(), &ctx).await?;
tline
.compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
.await?;

assert_eq!(
tline.get(*TEST_KEY, Lsn(0x10), &ctx).await?,
Expand Down Expand Up @@ -4414,7 +4423,9 @@ mod tests {
.update_gc_info(Vec::new(), cutoff, Duration::ZERO, &ctx)
.await?;
tline.freeze_and_flush().await?;
tline.compact(&CancellationToken::new(), &ctx).await?;
tline
.compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
.await?;
tline.gc().await?;
}

Expand Down Expand Up @@ -4494,7 +4505,9 @@ mod tests {
.update_gc_info(Vec::new(), cutoff, Duration::ZERO, &ctx)
.await?;
tline.freeze_and_flush().await?;
tline.compact(&CancellationToken::new(), &ctx).await?;
tline
.compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
.await?;
tline.gc().await?;
}

Expand Down Expand Up @@ -4584,7 +4597,9 @@ mod tests {
.update_gc_info(Vec::new(), cutoff, Duration::ZERO, &ctx)
.await?;
tline.freeze_and_flush().await?;
tline.compact(&CancellationToken::new(), &ctx).await?;
tline
.compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
.await?;
tline.gc().await?;
}

Expand Down
33 changes: 21 additions & 12 deletions pageserver/src/tenant/remote_timeline_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -816,7 +816,7 @@ impl RemoteTimelineClient {
let mut receiver = {
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut()?;
self.schedule_barrier(upload_queue)
self.schedule_barrier0(upload_queue)
};

if receiver.changed().await.is_err() {
Expand All @@ -825,7 +825,14 @@ impl RemoteTimelineClient {
Ok(())
}

fn schedule_barrier(
pub(crate) fn schedule_barrier(self: &Arc<Self>) -> anyhow::Result<()> {
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut()?;
self.schedule_barrier0(upload_queue);
Ok(())
}

fn schedule_barrier0(
self: &Arc<Self>,
upload_queue: &mut UploadQueueInitialized,
) -> tokio::sync::watch::Receiver<()> {
Expand Down Expand Up @@ -1229,16 +1236,18 @@ impl RemoteTimelineClient {
}
res
}
UploadOp::Delete(delete) => self
.deletion_queue_client
.push_layers(
self.tenant_id,
self.timeline_id,
self.generation,
delete.layers.clone(),
)
.await
.map_err(|e| anyhow::anyhow!(e)),
UploadOp::Delete(delete) => {
pausable_failpoint!("before-delete-layer-pausable");
self.deletion_queue_client
.push_layers(
self.tenant_id,
self.timeline_id,
self.generation,
delete.layers.clone(),
)
.await
.map_err(|e| anyhow::anyhow!(e))
}
UploadOp::Barrier(_) => {
// unreachable. Barrier operations are handled synchronously in
// launch_queued_tasks
Expand Down
18 changes: 13 additions & 5 deletions pageserver/src/tenant/remote_timeline_client/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,14 @@ impl IndexPart {
pub fn get_disk_consistent_lsn(&self) -> Lsn {
self.disk_consistent_lsn
}

pub fn from_s3_bytes(bytes: &[u8]) -> Result<Self, serde_json::Error> {
serde_json::from_slice::<IndexPart>(bytes)
}

pub fn to_s3_bytes(&self) -> serde_json::Result<Vec<u8>> {
serde_json::to_vec(self)
}
}

impl TryFrom<&UploadQueueInitialized> for IndexPart {
Expand Down Expand Up @@ -201,7 +209,7 @@ mod tests {
deleted_at: None,
};

let part = serde_json::from_str::<IndexPart>(example).unwrap();
let part = IndexPart::from_s3_bytes(example.as_bytes()).unwrap();
assert_eq!(part, expected);
}

Expand Down Expand Up @@ -239,7 +247,7 @@ mod tests {
deleted_at: None,
};

let part = serde_json::from_str::<IndexPart>(example).unwrap();
let part = IndexPart::from_s3_bytes(example.as_bytes()).unwrap();
assert_eq!(part, expected);
}

Expand Down Expand Up @@ -279,7 +287,7 @@ mod tests {
"2023-07-31T09:00:00.123000000", "%Y-%m-%dT%H:%M:%S.%f").unwrap())
};

let part = serde_json::from_str::<IndexPart>(example).unwrap();
let part = IndexPart::from_s3_bytes(example.as_bytes()).unwrap();
assert_eq!(part, expected);
}

Expand Down Expand Up @@ -323,7 +331,7 @@ mod tests {
deleted_at: None,
};

let empty_layers_parsed = serde_json::from_str::<IndexPart>(empty_layers_json).unwrap();
let empty_layers_parsed = IndexPart::from_s3_bytes(empty_layers_json.as_bytes()).unwrap();

assert_eq!(empty_layers_parsed, expected);
}
Expand Down Expand Up @@ -361,7 +369,7 @@ mod tests {
"2023-07-31T09:00:00.123000000", "%Y-%m-%dT%H:%M:%S.%f").unwrap())
};

let part = serde_json::from_str::<IndexPart>(example).unwrap();
let part = IndexPart::from_s3_bytes(example.as_bytes()).unwrap();
assert_eq!(part, expected);
}
}
5 changes: 3 additions & 2 deletions pageserver/src/tenant/remote_timeline_client/upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ pub(super) async fn upload_index_part<'a>(
});
pausable_failpoint!("before-upload-index-pausable");

let index_part_bytes =
serde_json::to_vec(&index_part).context("serialize index part file into bytes")?;
let index_part_bytes = index_part
.to_s3_bytes()
.context("serialize index part file into bytes")?;
let index_part_size = index_part_bytes.len();
let index_part_bytes = tokio::io::BufReader::new(std::io::Cursor::new(index_part_bytes));

Expand Down
Loading
Loading