Skip to content

Commit

Permalink
pageserver: quieten some shutdown logs around logical size and flush (#…
Browse files Browse the repository at this point in the history
…7907)

## Problem

Looking at several noisy shutdown logs:
- In #7861 we're hitting a
log error with `InternalServerError(timeline shutting down\n'` on the
checkpoint API handler.
- In the field, we see initial_logical_size_calculation errors on
shutdown, via DownloadError
- In the field, we see errors logged from layer download code
(independent of the error propagated) during shutdown

Closes: #7861

## Summary of changes

The theme of these changes is to avoid propagating anyhow::Errors for
cases that aren't really unexpected error cases that we might want a
stacktrace for, and avoid "Other" error variants unless we really do
have unexpected error cases to propagate.

- On the flush_frozen_layers path, use the `FlushLayerError` type
throughout, rather than munging it into an anyhow::Error. Give
FlushLayerError an explicit from_anyhow helper that checks for timeline
cancellation, and uses it to give a Cancelled error instead of an Other
error when the timeline is shutting down.
- In logical size calculation, remove BackgroundCalculationError (this
type was just a Cancelled variant and an Other variant), and instead use
CalculateLogicalSizeError throughout. This can express a
PageReconstructError, and has a From impl that translates cancel-like
page reconstruct errors to Cancelled.
- Replace CalculateLogicalSizeError's Other(anyhow::Error) variant case
with a Decode(DeserializeError) variant, as this was the only kind of
error we actually used in the Other case.
- During layer download, drop out early if the timeline is shutting
down, so that we don't do an `error!()` log of the shutdown error in
this case.
  • Loading branch information
jcsp authored May 31, 2024
1 parent c18b1c0 commit 98dadf8
Show file tree
Hide file tree
Showing 8 changed files with 102 additions and 56 deletions.
16 changes: 14 additions & 2 deletions pageserver/src/http/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ use crate::tenant::size::ModelInputs;
use crate::tenant::storage_layer::LayerAccessStatsReset;
use crate::tenant::storage_layer::LayerName;
use crate::tenant::timeline::CompactFlags;
use crate::tenant::timeline::CompactionError;
use crate::tenant::timeline::Timeline;
use crate::tenant::GetTimelineError;
use crate::tenant::SpawnMode;
Expand Down Expand Up @@ -1813,11 +1814,22 @@ async fn timeline_checkpoint_handler(
timeline
.freeze_and_flush()
.await
.map_err(ApiError::InternalServerError)?;
.map_err(|e| {
match e {
tenant::timeline::FlushLayerError::Cancelled => ApiError::ShuttingDown,
other => ApiError::InternalServerError(other.into()),

}
})?;
timeline
.compact(&cancel, flags, &ctx)
.await
.map_err(|e| ApiError::InternalServerError(e.into()))?;
.map_err(|e|
match e {
CompactionError::ShuttingDown => ApiError::ShuttingDown,
CompactionError::Other(e) => ApiError::InternalServerError(e)
}
)?;

if wait_until_uploaded {
timeline.remote_client.wait_completion().await.map_err(ApiError::InternalServerError)?;
Expand Down
6 changes: 5 additions & 1 deletion pageserver/src/page_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ use crate::tenant::mgr::GetTenantError;
use crate::tenant::mgr::ShardResolveResult;
use crate::tenant::mgr::ShardSelector;
use crate::tenant::mgr::TenantManager;
use crate::tenant::timeline::FlushLayerError;
use crate::tenant::timeline::WaitLsnError;
use crate::tenant::GetTimelineError;
use crate::tenant::PageReconstructError;
Expand Down Expand Up @@ -830,7 +831,10 @@ impl PageServerHandler {
// We only want to persist the data, and it doesn't matter if it's in the
// shape of deltas or images.
info!("flushing layers");
timeline.freeze_and_flush().await?;
timeline.freeze_and_flush().await.map_err(|e| match e {
FlushLayerError::Cancelled => QueryError::Shutdown,
other => QueryError::Other(other.into()),
})?;

info!("done");
Ok(())
Expand Down
18 changes: 13 additions & 5 deletions pageserver/src/pgdatadir_mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,19 @@ pub enum LsnForTimestamp {
}

#[derive(Debug, thiserror::Error)]
pub enum CalculateLogicalSizeError {
pub(crate) enum CalculateLogicalSizeError {
#[error("cancelled")]
Cancelled,

/// Something went wrong while reading the metadata we use to calculate logical size
/// Note that cancellation variants of `PageReconstructError` are transformed to [`Self::Cancelled`]
/// in the `From` implementation for this variant.
#[error(transparent)]
Other(#[from] anyhow::Error),
PageRead(PageReconstructError),

/// Something went wrong deserializing metadata that we read to calculate logical size
#[error("decode error: {0}")]
Decode(#[from] DeserializeError),
}

#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -110,7 +118,7 @@ impl From<PageReconstructError> for CalculateLogicalSizeError {
PageReconstructError::AncestorStopping(_) | PageReconstructError::Cancelled => {
Self::Cancelled
}
_ => Self::Other(pre.into()),
_ => Self::PageRead(pre),
}
}
}
Expand Down Expand Up @@ -763,7 +771,7 @@ impl Timeline {
/// # Cancel-Safety
///
/// This method is cancellation-safe.
pub async fn get_current_logical_size_non_incremental(
pub(crate) async fn get_current_logical_size_non_incremental(
&self,
lsn: Lsn,
ctx: &RequestContext,
Expand All @@ -772,7 +780,7 @@ impl Timeline {

// Fetch list of database dirs and iterate them
let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
let dbdir = DbDirectory::des(&buf).context("deserialize db directory")?;
let dbdir = DbDirectory::des(&buf)?;

let mut total_size: u64 = 0;
for (spcnode, dbnode) in dbdir.dbdirs.keys() {
Expand Down
2 changes: 1 addition & 1 deletion pageserver/src/tenant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4154,7 +4154,7 @@ mod tests {
.await?;
writer.finish_write(lsn);
}
tline.freeze_and_flush().await
tline.freeze_and_flush().await.map_err(|e| e.into())
}

#[tokio::test]
Expand Down
10 changes: 9 additions & 1 deletion pageserver/src/tenant/storage_layer/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,10 @@ impl Layer {
.0
.get_or_maybe_download(true, Some(ctx))
.await
.map_err(|err| GetVectoredError::Other(anyhow::anyhow!(err)))?;
.map_err(|err| match err {
DownloadError::DownloadCancelled => GetVectoredError::Cancelled,
other => GetVectoredError::Other(anyhow::anyhow!(other)),
})?;

self.0
.access_stats
Expand Down Expand Up @@ -1158,6 +1161,11 @@ impl LayerInner {
let consecutive_failures =
1 + self.consecutive_failures.fetch_add(1, Ordering::Relaxed);

if timeline.cancel.is_cancelled() {
// If we're shutting down, drop out before logging the error
return Err(e);
}

tracing::error!(consecutive_failures, "layer file download failed: {e:#}");

let backoff = utils::backoff::exponential_backoff_duration_seconds(
Expand Down
100 changes: 57 additions & 43 deletions pageserver/src/tenant/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ use super::{remote_timeline_client::index::IndexPart, storage_layer::LayerFringe
use super::{remote_timeline_client::RemoteTimelineClient, storage_layer::ReadableLayer};

#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub(super) enum FlushLoopState {
pub(crate) enum FlushLoopState {
NotStarted,
Running {
#[cfg(test)]
Expand Down Expand Up @@ -577,7 +577,7 @@ impl PageReconstructError {
}

#[derive(thiserror::Error, Debug)]
enum CreateImageLayersError {
pub(crate) enum CreateImageLayersError {
#[error("timeline shutting down")]
Cancelled,

Expand All @@ -591,17 +591,35 @@ enum CreateImageLayersError {
Other(#[from] anyhow::Error),
}

#[derive(thiserror::Error, Debug)]
enum FlushLayerError {
#[derive(thiserror::Error, Debug, Clone)]
pub(crate) enum FlushLayerError {
/// Timeline cancellation token was cancelled
#[error("timeline shutting down")]
Cancelled,

/// We tried to flush a layer while the Timeline is in an unexpected state
#[error("cannot flush frozen layers when flush_loop is not running, state is {0:?}")]
NotRunning(FlushLoopState),

// Arc<> the following non-clonable error types: we must be Clone-able because the flush error is propagated from the flush
// loop via a watch channel, where we can only borrow it.
#[error(transparent)]
CreateImageLayersError(CreateImageLayersError),
CreateImageLayersError(Arc<CreateImageLayersError>),

#[error(transparent)]
Other(#[from] anyhow::Error),
Other(#[from] Arc<anyhow::Error>),
}

impl FlushLayerError {
// When crossing from generic anyhow errors to this error type, we explicitly check
// for timeline cancellation to avoid logging inoffensive shutdown errors as warn/err.
fn from_anyhow(timeline: &Timeline, err: anyhow::Error) -> Self {
if timeline.cancel.is_cancelled() {
Self::Cancelled
} else {
Self::Other(Arc::new(err))
}
}
}

#[derive(thiserror::Error, Debug)]
Expand Down Expand Up @@ -696,7 +714,7 @@ impl From<CreateImageLayersError> for FlushLayerError {
fn from(e: CreateImageLayersError) -> Self {
match e {
CreateImageLayersError::Cancelled => FlushLayerError::Cancelled,
any => FlushLayerError::CreateImageLayersError(any),
any => FlushLayerError::CreateImageLayersError(Arc::new(any)),
}
}
}
Expand Down Expand Up @@ -1547,13 +1565,13 @@ impl Timeline {

/// Flush to disk all data that was written with the put_* functions
#[instrument(skip(self), fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%self.timeline_id))]
pub(crate) async fn freeze_and_flush(&self) -> anyhow::Result<()> {
pub(crate) async fn freeze_and_flush(&self) -> Result<(), FlushLayerError> {
self.freeze_and_flush0().await
}

// This exists to provide a non-span creating version of `freeze_and_flush` we can call without
// polluting the span hierarchy.
pub(crate) async fn freeze_and_flush0(&self) -> anyhow::Result<()> {
pub(crate) async fn freeze_and_flush0(&self) -> Result<(), FlushLayerError> {
let to_lsn = self.freeze_inmem_layer(false).await;
self.flush_frozen_layers_and_wait(to_lsn).await
}
Expand Down Expand Up @@ -2735,11 +2753,6 @@ impl Timeline {
self.current_logical_size.initialized.add_permits(1);
}

enum BackgroundCalculationError {
Cancelled,
Other(anyhow::Error),
}

let try_once = |attempt: usize| {
let background_ctx = &background_ctx;
let self_ref = &self;
Expand All @@ -2757,10 +2770,10 @@ impl Timeline {
(Some(permit), StartCircumstances::AfterBackgroundTasksRateLimit)
}
_ = self_ref.cancel.cancelled() => {
return Err(BackgroundCalculationError::Cancelled);
return Err(CalculateLogicalSizeError::Cancelled);
}
_ = cancel.cancelled() => {
return Err(BackgroundCalculationError::Cancelled);
return Err(CalculateLogicalSizeError::Cancelled);
},
() = skip_concurrency_limiter.cancelled() => {
// Some action that is part of a end user interaction requested logical size
Expand All @@ -2787,18 +2800,7 @@ impl Timeline {
.await
{
Ok(calculated_size) => Ok((calculated_size, metrics_guard)),
Err(CalculateLogicalSizeError::Cancelled) => {
Err(BackgroundCalculationError::Cancelled)
}
Err(CalculateLogicalSizeError::Other(err)) => {
if let Some(PageReconstructError::AncestorStopping(_)) =
err.root_cause().downcast_ref()
{
Err(BackgroundCalculationError::Cancelled)
} else {
Err(BackgroundCalculationError::Other(err))
}
}
Err(e) => Err(e),
}
}
};
Expand All @@ -2810,8 +2812,11 @@ impl Timeline {

match try_once(attempt).await {
Ok(res) => return ControlFlow::Continue(res),
Err(BackgroundCalculationError::Cancelled) => return ControlFlow::Break(()),
Err(BackgroundCalculationError::Other(e)) => {
Err(CalculateLogicalSizeError::Cancelled) => return ControlFlow::Break(()),
Err(
e @ (CalculateLogicalSizeError::Decode(_)
| CalculateLogicalSizeError::PageRead(_)),
) => {
warn!(attempt, "initial size calculation failed: {e:?}");
// exponential back-off doesn't make sense at these long intervals;
// use fixed retry interval with generous jitter instead
Expand Down Expand Up @@ -3717,7 +3722,9 @@ impl Timeline {
return;
}
err @ Err(
FlushLayerError::Other(_) | FlushLayerError::CreateImageLayersError(_),
FlushLayerError::NotRunning(_)
| FlushLayerError::Other(_)
| FlushLayerError::CreateImageLayersError(_),
) => {
error!("could not flush frozen layer: {err:?}");
break err.map(|_| ());
Expand Down Expand Up @@ -3763,7 +3770,10 @@ impl Timeline {
/// `last_record_lsn` may be higher than the highest LSN of a frozen layer: if this is the case,
/// it means no data will be written between the top of the highest frozen layer and to_lsn,
/// e.g. because this tenant shard has ingested up to to_lsn and not written any data locally for that part of the WAL.
async fn flush_frozen_layers_and_wait(&self, last_record_lsn: Lsn) -> anyhow::Result<()> {
async fn flush_frozen_layers_and_wait(
&self,
last_record_lsn: Lsn,
) -> Result<(), FlushLayerError> {
let mut rx = self.layer_flush_done_tx.subscribe();

// Increment the flush cycle counter and wake up the flush task.
Expand All @@ -3774,7 +3784,7 @@ impl Timeline {

let flush_loop_state = { *self.flush_loop_state.lock().unwrap() };
if !matches!(flush_loop_state, FlushLoopState::Running { .. }) {
anyhow::bail!("cannot flush frozen layers when flush_loop is not running, state is {flush_loop_state:?}")
return Err(FlushLayerError::NotRunning(flush_loop_state));
}

self.layer_flush_start_tx.send_modify(|(counter, lsn)| {
Expand All @@ -3787,14 +3797,11 @@ impl Timeline {
{
let (last_result_counter, last_result) = &*rx.borrow();
if *last_result_counter >= my_flush_request {
if let Err(_err) = last_result {
if let Err(err) = last_result {
// We already logged the original error in
// flush_loop. We cannot propagate it to the caller
// here, because it might not be Cloneable
anyhow::bail!(
"Could not flush frozen layer. Request id: {}",
my_flush_request
);
return Err(err.clone());
} else {
return Ok(());
}
Expand All @@ -3803,7 +3810,7 @@ impl Timeline {
trace!("waiting for flush to complete");
tokio::select! {
rx_e = rx.changed() => {
rx_e?;
rx_e.map_err(|_| FlushLayerError::NotRunning(*self.flush_loop_state.lock().unwrap()))?;
},
// Cancellation safety: we are not leaving an I/O in-flight for the flush, we're just ignoring
// the notification from [`flush_loop`] that it completed.
Expand Down Expand Up @@ -3875,7 +3882,8 @@ impl Timeline {
EnumSet::empty(),
ctx,
)
.await?;
.await
.map_err(|e| FlushLayerError::from_anyhow(self, e))?;

if self.cancel.is_cancelled() {
return Err(FlushLayerError::Cancelled);
Expand All @@ -3899,7 +3907,8 @@ impl Timeline {
Some(metadata_keyspace.0.ranges[0].clone()),
ctx,
)
.await?
.await
.map_err(|e| FlushLayerError::from_anyhow(self, e))?
} else {
None
};
Expand All @@ -3926,7 +3935,11 @@ impl Timeline {
// Normal case, write out a L0 delta layer file.
// `create_delta_layer` will not modify the layer map.
// We will remove frozen layer and add delta layer in one atomic operation later.
let Some(layer) = self.create_delta_layer(&frozen_layer, None, ctx).await? else {
let Some(layer) = self
.create_delta_layer(&frozen_layer, None, ctx)
.await
.map_err(|e| FlushLayerError::from_anyhow(self, e))?
else {
panic!("delta layer cannot be empty if no filter is applied");
};
(
Expand Down Expand Up @@ -3959,7 +3972,8 @@ impl Timeline {

if self.set_disk_consistent_lsn(disk_consistent_lsn) {
// Schedule remote uploads that will reflect our new disk_consistent_lsn
self.schedule_uploads(disk_consistent_lsn, layers_to_upload)?;
self.schedule_uploads(disk_consistent_lsn, layers_to_upload)
.map_err(|e| FlushLayerError::from_anyhow(self, e))?;
}
// release lock on 'layers'
};
Expand Down
4 changes: 2 additions & 2 deletions pageserver/src/tenant/timeline/detach_ancestor.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::sync::Arc;

use super::{layer_manager::LayerManager, Timeline};
use super::{layer_manager::LayerManager, FlushLayerError, Timeline};
use crate::{
context::{DownloadBehavior, RequestContext},
task_mgr::TaskKind,
Expand All @@ -23,7 +23,7 @@ pub(crate) enum Error {
#[error("shutting down, please retry later")]
ShuttingDown,
#[error("flushing failed")]
FlushAncestor(#[source] anyhow::Error),
FlushAncestor(#[source] FlushLayerError),
#[error("layer download failed")]
RewrittenDeltaDownloadFailed(#[source] anyhow::Error),
#[error("copying LSN prefix locally failed")]
Expand Down
Loading

1 comment on commit 98dadf8

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3238 tests run: 3092 passed, 0 failed, 146 skipped (full report)


Flaky tests (1)

Postgres 16

  • test_download_remote_layers_api: debug

Code coverage* (full report)

  • functions: 31.4% (6492 of 20681 functions)
  • lines: 48.4% (50210 of 103783 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
98dadf8 at 2024-05-31T09:41:36.552Z :recycle:

Please sign in to comment.