Skip to content

Commit

Permalink
refactor(Timeline::shutdown): rely more on Timeline::cancel; use it f…
Browse files Browse the repository at this point in the history
…rom deletion code path (#7233)

This PR is a fallout from work on #7062.

# Changes

- Unify the freeze-and-flush and hard shutdown code paths into a single
method `Timeline::shutdown` that takes the shutdown mode as an argument.
- Replace `freeze_and_flush` bool arg in callers with that mode
argument, makes them more expressive.
- Switch timeline deletion to use `Timeline::shutdown` instead of its
own slightly-out-of-sync copy.
- Remove usage of `task_mgr::shutdown_watcher` /
`task_mgr::shutdown_token` where possible

# Future Work

Do we really need the freeze_and_flush?
If we could get rid of it, then there'd be no need for a specific
shutdown order.

Also, if you undo this patch's changes to the `eviction_task.rs` and
enable RUST_LOG=debug, it's easy to see that we do leave some task
hanging that logs under span `Connection{...}` at debug level. I think
it's a pre-existing issue; it's probably a broker client task.
  • Loading branch information
problame committed Apr 3, 2024
1 parent 36b8753 commit b30b15e
Show file tree
Hide file tree
Showing 7 changed files with 140 additions and 139 deletions.
21 changes: 7 additions & 14 deletions pageserver/src/tenant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1783,7 +1783,7 @@ impl Tenant {
async fn shutdown(
&self,
shutdown_progress: completion::Barrier,
freeze_and_flush: bool,
shutdown_mode: timeline::ShutdownMode,
) -> Result<(), completion::Barrier> {
span::debug_assert_current_span_has_tenant_id();

Expand Down Expand Up @@ -1830,16 +1830,8 @@ impl Tenant {
timelines.values().for_each(|timeline| {
let timeline = Arc::clone(timeline);
let timeline_id = timeline.timeline_id;

let span =
tracing::info_span!("timeline_shutdown", %timeline_id, ?freeze_and_flush);
js.spawn(async move {
if freeze_and_flush {
timeline.flush_and_shutdown().instrument(span).await
} else {
timeline.shutdown().instrument(span).await
}
});
let span = tracing::info_span!("timeline_shutdown", %timeline_id, ?shutdown_mode);
js.spawn(async move { timeline.shutdown(shutdown_mode).instrument(span).await });
})
};
// test_long_timeline_create_then_tenant_delete is leaning on this message
Expand Down Expand Up @@ -3866,6 +3858,7 @@ mod tests {
use hex_literal::hex;
use pageserver_api::keyspace::KeySpace;
use rand::{thread_rng, Rng};
use tests::timeline::ShutdownMode;

static TEST_KEY: Lazy<Key> =
Lazy::new(|| Key::from_slice(&hex!("010000000033333333444444445500000001")));
Expand Down Expand Up @@ -4311,7 +4304,7 @@ mod tests {
make_some_layers(tline.as_ref(), Lsn(0x8000), &ctx).await?;
// so that all uploads finish & we can call harness.load() below again
tenant
.shutdown(Default::default(), true)
.shutdown(Default::default(), ShutdownMode::FreezeAndFlush)
.instrument(harness.span())
.await
.ok()
Expand Down Expand Up @@ -4352,7 +4345,7 @@ mod tests {

// so that all uploads finish & we can call harness.load() below again
tenant
.shutdown(Default::default(), true)
.shutdown(Default::default(), ShutdownMode::FreezeAndFlush)
.instrument(harness.span())
.await
.ok()
Expand Down Expand Up @@ -5133,7 +5126,7 @@ mod tests {
// Leave the timeline ID in [`Tenant::timelines_creating`] to exclude attempting to create it again
let raw_tline = tline.raw_timeline().unwrap();
raw_tline
.shutdown()
.shutdown(super::timeline::ShutdownMode::Hard)
.instrument(info_span!("test_shutdown", tenant_id=%raw_tline.tenant_shard_id, shard_id=%raw_tline.tenant_shard_id.shard_slug(), timeline_id=%TIMELINE_ID))
.await;
std::mem::forget(tline);
Expand Down
7 changes: 5 additions & 2 deletions pageserver/src/tenant/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ use crate::{
config::PageServerConf,
context::RequestContext,
task_mgr::{self, TaskKind},
tenant::mgr::{TenantSlot, TenantsMapRemoveResult},
tenant::{
mgr::{TenantSlot, TenantsMapRemoveResult},
timeline::ShutdownMode,
},
};

use super::{
Expand Down Expand Up @@ -463,7 +466,7 @@ impl DeleteTenantFlow {
// tenant.shutdown
// Its also bad that we're holding tenants.read here.
// TODO relax set_stopping to be idempotent?
if tenant.shutdown(progress, false).await.is_err() {
if tenant.shutdown(progress, ShutdownMode::Hard).await.is_err() {
return Err(DeleteTenantError::Other(anyhow::anyhow!(
"tenant shutdown is already in progress"
)));
Expand Down
17 changes: 8 additions & 9 deletions pageserver/src/tenant/mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use crate::tenant::config::{
use crate::tenant::delete::DeleteTenantFlow;
use crate::tenant::span::debug_assert_current_span_has_tenant_id;
use crate::tenant::storage_layer::inmemory_layer;
use crate::tenant::timeline::ShutdownMode;
use crate::tenant::{AttachedTenantConf, SpawnMode, Tenant, TenantState};
use crate::{InitializationOrder, IGNORED_TENANT_FILE_NAME, METADATA_FILE_NAME, TEMP_FILE_SUFFIX};

Expand Down Expand Up @@ -783,11 +784,9 @@ async fn shutdown_all_tenants0(tenants: &std::sync::RwLock<TenantsMap>) {
shutdown_state.insert(tenant_shard_id, TenantSlot::Attached(t.clone()));
join_set.spawn(
async move {
let freeze_and_flush = true;

let res = {
let (_guard, shutdown_progress) = completion::channel();
t.shutdown(shutdown_progress, freeze_and_flush).await
t.shutdown(shutdown_progress, ShutdownMode::FreezeAndFlush).await
};

if let Err(other_progress) = res {
Expand Down Expand Up @@ -1107,7 +1106,7 @@ impl TenantManager {
};

info!("Shutting down attached tenant");
match tenant.shutdown(progress, false).await {
match tenant.shutdown(progress, ShutdownMode::Hard).await {
Ok(()) => {}
Err(barrier) => {
info!("Shutdown already in progress, waiting for it to complete");
Expand Down Expand Up @@ -1223,7 +1222,7 @@ impl TenantManager {
TenantSlot::Attached(tenant) => {
let (_guard, progress) = utils::completion::channel();
info!("Shutting down just-spawned tenant, because tenant manager is shut down");
match tenant.shutdown(progress, false).await {
match tenant.shutdown(progress, ShutdownMode::Hard).await {
Ok(()) => {
info!("Finished shutting down just-spawned tenant");
}
Expand Down Expand Up @@ -1273,7 +1272,7 @@ impl TenantManager {
};

let (_guard, progress) = utils::completion::channel();
match tenant.shutdown(progress, false).await {
match tenant.shutdown(progress, ShutdownMode::Hard).await {
Ok(()) => {
slot_guard.drop_old_value()?;
}
Expand Down Expand Up @@ -1677,7 +1676,7 @@ impl TenantManager {

// Phase 5: Shut down the parent shard, and erase it from disk
let (_guard, progress) = completion::channel();
match parent.shutdown(progress, false).await {
match parent.shutdown(progress, ShutdownMode::Hard).await {
Ok(()) => {}
Err(other) => {
other.wait().await;
Expand Down Expand Up @@ -2664,11 +2663,11 @@ where
let attached_tenant = match slot_guard.get_old_value() {
Some(TenantSlot::Attached(tenant)) => {
// whenever we remove a tenant from memory, we don't want to flush and wait for upload
let freeze_and_flush = false;
let shutdown_mode = ShutdownMode::Hard;

// shutdown is sure to transition tenant to stopping, and wait for all tasks to complete, so
// that we can continue safely to cleanup.
match tenant.shutdown(progress, freeze_and_flush).await {
match tenant.shutdown(progress, shutdown_mode).await {
Ok(()) => {}
Err(_other) => {
// if pageserver shutdown or other detach/ignore is already ongoing, we don't want to
Expand Down
2 changes: 1 addition & 1 deletion pageserver/src/tenant/remote_timeline_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1569,7 +1569,7 @@ impl RemoteTimelineClient {
/// Use [`RemoteTimelineClient::shutdown`] for graceful stop.
///
/// In-progress operations will still be running after this function returns.
/// Use `task_mgr::shutdown_tasks(None, Some(self.tenant_id), Some(timeline_id))`
/// Use `task_mgr::shutdown_tasks(Some(TaskKind::RemoteUploadTask), Some(self.tenant_shard_id), Some(timeline_id))`
/// to wait for them to complete, after calling this function.
pub(crate) fn stop(&self) {
// Whichever *task* for this RemoteTimelineClient grabs the mutex first will transition the queue
Expand Down
173 changes: 105 additions & 68 deletions pageserver/src/tenant/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,19 @@ pub(crate) enum WaitLsnWaiter<'a> {
PageService,
}

/// Argument to [`Timeline::shutdown`].
#[derive(Debug, Clone, Copy)]
pub(crate) enum ShutdownMode {
/// Graceful shutdown, may do a lot of I/O as we flush any open layers to disk and then
/// also to remote storage. This method can easily take multiple seconds for a busy timeline.
///
/// While we are flushing, we continue to accept read I/O for LSNs ingested before
/// the call to [`Timeline::shutdown`].
FreezeAndFlush,
/// Shut down immediately, without waiting for any open layers to flush.
Hard,
}

/// Public interface functions
impl Timeline {
/// Get the LSN where this branch was created
Expand Down Expand Up @@ -1306,86 +1319,119 @@ impl Timeline {
self.launch_eviction_task(parent, background_jobs_can_start);
}

/// Graceful shutdown, may do a lot of I/O as we flush any open layers to disk and then
/// also to remote storage. This method can easily take multiple seconds for a busy timeline.
/// After this function returns, there are no timeline-scoped tasks are left running.
///
/// The preferred pattern for is:
/// - in any spawned tasks, keep Timeline::guard open + Timeline::cancel / child token
/// - if early shutdown (not just cancellation) of a sub-tree of tasks is required,
/// go the extra mile and keep track of JoinHandles
/// - Keep track of JoinHandles using a passed-down `Arc<Mutex<Option<JoinSet>>>` or similar,
/// instead of spawning directly on a runtime. It is a more composable / testable pattern.
///
/// While we are flushing, we continue to accept read I/O.
pub(crate) async fn flush_and_shutdown(&self) {
/// For legacy reasons, we still have multiple tasks spawned using
/// `task_mgr::spawn(X, Some(tenant_id), Some(timeline_id))`.
/// We refer to these as "timeline-scoped task_mgr tasks".
/// Some of these tasks are already sensitive to Timeline::cancel while others are
/// not sensitive to Timeline::cancel and instead respect [`task_mgr::shutdown_token`]
/// or [`task_mgr::shutdown_watcher`].
/// We want to gradually convert the code base away from these.
///
/// Here is an inventory of timeline-scoped task_mgr tasks that are still sensitive to
/// `task_mgr::shutdown_{token,watcher}` (there are also tenant-scoped and global-scoped
/// ones that aren't mentioned here):
/// - [`TaskKind::TimelineDeletionWorker`]
/// - NB: also used for tenant deletion
/// - [`TaskKind::RemoteUploadTask`]`
/// - [`TaskKind::InitialLogicalSizeCalculation`]
/// - [`TaskKind::DownloadAllRemoteLayers`] (can we get rid of it?)
// Inventory of timeline-scoped task_mgr tasks that use spawn but aren't sensitive:
/// - [`TaskKind::Eviction`]
/// - [`TaskKind::LayerFlushTask`]
/// - [`TaskKind::OndemandLogicalSizeCalculation`]
/// - [`TaskKind::GarbageCollector`] (immediate_gc is timeline-scoped)
pub(crate) async fn shutdown(&self, mode: ShutdownMode) {
debug_assert_current_span_has_tenant_and_timeline_id();

// Stop ingesting data. Walreceiver only provides cancellation but no
// "wait until gone", because it uses the Timeline::gate. So, only
// after the self.gate.close() in self.shutdown() below will we know for
// sure that no walreceiver tasks are left.
// This means that we might still be ingesting data during the call to
// `self.freeze_and_flush()` below. That's not ideal, but, we don't have
// the concept of a ChildGuard, which is what we'd need to properly model
// early shutdown of the walreceiver task sub-tree before the other
// Timeline task sub-trees.
if let Some(walreceiver) = self.walreceiver.lock().unwrap().take() {
let try_freeze_and_flush = match mode {
ShutdownMode::FreezeAndFlush => true,
ShutdownMode::Hard => false,
};

// Regardless of whether we're going to try_freeze_and_flush
// or not, stop ingesting any more data. Walreceiver only provides
// cancellation but no "wait until gone", because it uses the Timeline::gate.
// So, only after the self.gate.close() below will we know for sure that
// no walreceiver tasks are left.
// For `try_freeze_and_flush=true`, this means that we might still be ingesting
// data during the call to `self.freeze_and_flush()` below.
// That's not ideal, but, we don't have the concept of a ChildGuard,
// which is what we'd need to properly model early shutdown of the walreceiver
// task sub-tree before the other Timeline task sub-trees.
let walreceiver = self.walreceiver.lock().unwrap().take();
tracing::debug!(
is_some = walreceiver.is_some(),
"Waiting for WalReceiverManager..."
);
if let Some(walreceiver) = walreceiver {
walreceiver.cancel();
}

// Since we have shut down WAL ingest, we should not let anyone start waiting for the LSN to advance
// ... and inform any waiters for newer LSNs that there won't be any.
self.last_record_lsn.shutdown();

// now all writers to InMemory layer are gone, do the final flush if requested
match self.freeze_and_flush().await {
Ok(_) => {
// drain the upload queue
if let Some(client) = self.remote_client.as_ref() {
// if we did not wait for completion here, it might be our shutdown process
// didn't wait for remote uploads to complete at all, as new tasks can forever
// be spawned.
//
// what is problematic is the shutting down of RemoteTimelineClient, because
// obviously it does not make sense to stop while we wait for it, but what
// about corner cases like s3 suddenly hanging up?
client.shutdown().await;
if try_freeze_and_flush {
// we shut down walreceiver above, so, we won't add anything more
// to the InMemoryLayer; freeze it and wait for all frozen layers
// to reach the disk & upload queue, then shut the upload queue and
// wait for it to drain.
match self.freeze_and_flush().await {
Ok(_) => {
// drain the upload queue
if let Some(client) = self.remote_client.as_ref() {
// if we did not wait for completion here, it might be our shutdown process
// didn't wait for remote uploads to complete at all, as new tasks can forever
// be spawned.
//
// what is problematic is the shutting down of RemoteTimelineClient, because
// obviously it does not make sense to stop while we wait for it, but what
// about corner cases like s3 suddenly hanging up?
client.shutdown().await;
}
}
Err(e) => {
// Non-fatal. Shutdown is infallible. Failures to flush just mean that
// we have some extra WAL replay to do next time the timeline starts.
warn!("failed to freeze and flush: {e:#}");
}
}
Err(e) => {
// Non-fatal. Shutdown is infallible. Failures to flush just mean that
// we have some extra WAL replay to do next time the timeline starts.
warn!("failed to freeze and flush: {e:#}");
}
}

self.shutdown().await;
}

/// Shut down immediately, without waiting for any open layers to flush to disk. This is a subset of
/// the graceful [`Timeline::flush_and_shutdown`] function.
pub(crate) async fn shutdown(&self) {
debug_assert_current_span_has_tenant_and_timeline_id();

// Signal any subscribers to our cancellation token to drop out
tracing::debug!("Cancelling CancellationToken");
self.cancel.cancel();

// Page request handlers might be waiting for LSN to advance: they do not respect Timeline::cancel
// while doing so.
self.last_record_lsn.shutdown();

// Shut down the layer flush task before the remote client, as one depends on the other
task_mgr::shutdown_tasks(
Some(TaskKind::LayerFlushTask),
Some(self.tenant_shard_id),
Some(self.timeline_id),
)
.await;

// Shut down remote timeline client: this gracefully moves its metadata into its Stopping state in
// case our caller wants to use that for a deletion
// Transition the remote_client into a state where it's only useful for timeline deletion.
// (The deletion use case is why we can't just hook up remote_client to Self::cancel).)
if let Some(remote_client) = self.remote_client.as_ref() {
remote_client.stop();
// As documented in remote_client.stop()'s doc comment, it's our responsibility
// to shut down the upload queue tasks.
// TODO: fix that, task management should be encapsulated inside remote_client.
task_mgr::shutdown_tasks(
Some(TaskKind::RemoteUploadTask),
Some(self.tenant_shard_id),
Some(self.timeline_id),
)
.await;
}

// TODO: work toward making this a no-op. See this funciton's doc comment for more context.
tracing::debug!("Waiting for tasks...");

task_mgr::shutdown_tasks(None, Some(self.tenant_shard_id), Some(self.timeline_id)).await;

// Finally wait until any gate-holders are complete
// Finally wait until any gate-holders are complete.
//
// TODO: once above shutdown_tasks is a no-op, we can close the gate before calling shutdown_tasks
// and use a TBD variant of shutdown_tasks that asserts that there were no tasks left.
self.gate.close().await;

self.metrics.shutdown();
Expand Down Expand Up @@ -2475,10 +2521,6 @@ impl Timeline {
debug!("cancelling logical size calculation for timeline shutdown");
calculation.await
}
_ = task_mgr::shutdown_watcher() => {
debug!("cancelling logical size calculation for task shutdown");
calculation.await
}
}
}

Expand Down Expand Up @@ -3162,16 +3204,11 @@ impl Timeline {
loop {
tokio::select! {
_ = self.cancel.cancelled() => {
info!("shutting down layer flush task");
break;
},
_ = task_mgr::shutdown_watcher() => {
info!("shutting down layer flush task");
info!("shutting down layer flush task due to Timeline::cancel");
break;
},
_ = layer_flush_start_rx.changed() => {}
}

trace!("waking up");
let flush_counter = *layer_flush_start_rx.borrow();
let result = loop {
Expand Down
Loading

1 comment on commit b30b15e

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2828 tests run: 2667 passed, 1 failed, 160 skipped (full report)


Failures on Postgres 16

  • test_tenant_delete_scrubber: debug
# Run all failed tests locally:
scripts/pytest -vv -n $(nproc) -k "test_tenant_delete_scrubber[debug-pg16]"

Test coverage report is not available

The comment gets automatically updated with the latest test results
b30b15e at 2024-04-03T16:56:24.901Z :recycle:

Please sign in to comment.