Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: shutdown does not kill walredo processes #8150

Merged
merged 8 commits into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pageserver/benches/bench_walredo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
//! medium/128 time: [8.8311 ms 8.9849 ms 9.1263 ms]
//! ```

use anyhow::Context;
use bytes::{Buf, Bytes};
use criterion::{BenchmarkId, Criterion};
use pageserver::{config::PageServerConf, walrecord::NeonWalRecord, walredo::PostgresRedoManager};
Expand Down Expand Up @@ -188,6 +189,7 @@ impl Request {
manager
.request_redo(*key, *lsn, base_img.clone(), records.clone(), *pg_version)
.await
.context("request_redo")
}

fn pg_record(will_init: bool, bytes: &'static [u8]) -> NeonWalRecord {
Expand Down
19 changes: 17 additions & 2 deletions pageserver/src/tenant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ use crate::tenant::remote_timeline_client::MaybeDeletedIndexPart;
use crate::tenant::remote_timeline_client::INITDB_PATH;
use crate::tenant::storage_layer::DeltaLayer;
use crate::tenant::storage_layer::ImageLayer;
use crate::walredo;
use crate::InitializationOrder;
use std::collections::hash_map::Entry;
use std::collections::BTreeSet;
Expand Down Expand Up @@ -323,6 +324,16 @@ impl From<harness::TestRedoManager> for WalRedoManager {
}

impl WalRedoManager {
pub(crate) async fn shutdown(&self) {
match self {
Self::Prod(mgr) => mgr.shutdown().await,
#[cfg(test)]
Self::Test(_) => {
// Not applicable to test redo manager
}
}
}

pub(crate) fn maybe_quiesce(&self, idle_timeout: Duration) {
match self {
Self::Prod(mgr) => mgr.maybe_quiesce(idle_timeout),
Expand All @@ -343,7 +354,7 @@ impl WalRedoManager {
base_img: Option<(Lsn, bytes::Bytes)>,
records: Vec<(Lsn, crate::walrecord::NeonWalRecord)>,
pg_version: u32,
) -> anyhow::Result<bytes::Bytes> {
) -> Result<bytes::Bytes, walredo::Error> {
match self {
Self::Prod(mgr) => {
mgr.request_redo(key, lsn, base_img, records, pg_version)
Expand Down Expand Up @@ -1877,6 +1888,10 @@ impl Tenant {
tracing::debug!("Waiting for tasks...");
task_mgr::shutdown_tasks(None, Some(self.tenant_shard_id), None).await;

if let Some(walredo_mgr) = self.walredo_mgr.as_ref() {
walredo_mgr.shutdown().await;
}

// Wait for any in-flight operations to complete
self.gate.close().await;

Expand Down Expand Up @@ -3953,7 +3968,7 @@ pub(crate) mod harness {
base_img: Option<(Lsn, Bytes)>,
records: Vec<(Lsn, NeonWalRecord)>,
_pg_version: u32,
) -> anyhow::Result<Bytes> {
) -> Result<Bytes, walredo::Error> {
let records_neon = records.iter().all(|r| apply_neon::can_apply_in_neon(&r.1));
if records_neon {
// For Neon wal records, we can decode without spawning postgres, so do so.
Expand Down
190 changes: 150 additions & 40 deletions pageserver/src/walredo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use std::time::Duration;
use std::time::Instant;
use tracing::*;
use utils::lsn::Lsn;
use utils::sync::gate::GateError;
use utils::sync::heavier_once_cell;

///
Expand All @@ -53,10 +54,18 @@ pub struct PostgresRedoManager {
tenant_shard_id: TenantShardId,
conf: &'static PageServerConf,
last_redo_at: std::sync::Mutex<Option<Instant>>,
/// The current [`process::WalRedoProcess`] that is used by new redo requests.
/// We use [`heavier_once_cell`] for coalescing the spawning, but the redo
/// requests don't use the [`heavier_once_cell::Guard`] to keep ahold of the
/// We use [`heavier_once_cell`] for
///
/// 1. coalescing the lazy spawning of walredo processes ([`ProcessOnceCell::Spawned`])
/// 2. prevent new processes from being spawned on [`Self::shutdown`] (=> [`ProcessOnceCell::ManagerShutDown`]).
///
/// # Spawning
///
/// Redo requests use the once cell to coalesce onto one call to [`process::WalRedoProcess::launch`].
///
/// Notably, requests don't use the [`heavier_once_cell::Guard`] to keep ahold of the
/// their process object; we use [`Arc::clone`] for that.
///
/// This is primarily because earlier implementations that didn't use [`heavier_once_cell`]
/// had that behavior; it's probably unnecessary.
/// The only merit of it is that if one walredo process encounters an error,
Expand All @@ -65,7 +74,63 @@ pub struct PostgresRedoManager {
/// still be using the old redo process. But, those other tasks will most likely
/// encounter an error as well, and errors are an unexpected condition anyway.
/// So, probably we could get rid of the `Arc` in the future.
redo_process: heavier_once_cell::OnceCell<Arc<process::WalRedoProcess>>,
///
/// # Shutdown
///
/// See [`Self::launched_processes`].
redo_process: heavier_once_cell::OnceCell<ProcessOnceCell>,

/// Gate that is entered when launching a walredo process and held open
/// until the process has been `kill()`ed and `wait()`ed upon.
///
/// Manager shutdown waits for this gate to close after setting the
/// [`ProcessOnceCell::ManagerShutDown`] state in [`Self::redo_process`].
///
/// This type of usage is a bit unusual because gates usually keep track of
/// concurrent operations, e.g., every [`Self::request_redo`] that is inflight.
/// But we use it here to keep track of the _processes_ that we have launched,
/// which may outlive any individual redo request because
/// - we keep walredo process around until its quiesced to amortize spawn cost and
/// - the Arc may be held by multiple concurrent redo requests, so, just because
/// you replace the [`Self::redo_process`] cell's content doesn't mean the
/// process gets killed immediately.
///
/// We could simplify this by getting rid of the [`Arc`].
problame marked this conversation as resolved.
Show resolved Hide resolved
/// See the comment on [`Self::redo_process`] for more details.
launched_processes: utils::sync::gate::Gate,
}

/// See [`PostgresRedoManager::redo_process`].
enum ProcessOnceCell {
Spawned(Arc<Process>),
ManagerShutDown,
}

struct Process {
_launched_processes_guard: utils::sync::gate::GateGuard,
process: process::WalRedoProcess,
}

impl std::ops::Deref for Process {
type Target = process::WalRedoProcess;

fn deref(&self) -> &Self::Target {
&self.process
}
}

#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("cancelled")]
Cancelled,
#[error(transparent)]
Other(#[from] anyhow::Error),
}

macro_rules! bail {
($($arg:tt)*) => {
return Err($crate::walredo::Error::Other(::anyhow::anyhow!($($arg)*)));
}
}

///
Expand All @@ -88,9 +153,9 @@ impl PostgresRedoManager {
base_img: Option<(Lsn, Bytes)>,
records: Vec<(Lsn, NeonWalRecord)>,
pg_version: u32,
) -> anyhow::Result<Bytes> {
) -> Result<Bytes, Error> {
if records.is_empty() {
anyhow::bail!("invalid WAL redo request with no records");
bail!("invalid WAL redo request with no records");
}

let base_img_lsn = base_img.as_ref().map(|p| p.0).unwrap_or(Lsn::INVALID);
Expand Down Expand Up @@ -148,10 +213,10 @@ impl PostgresRedoManager {
chrono::Utc::now().checked_sub_signed(chrono::Duration::from_std(age).ok()?)
})
},
process: self
.redo_process
.get()
.map(|p| WalRedoManagerProcessStatus { pid: p.id() }),
process: self.redo_process.get().and_then(|p| match &*p {
ProcessOnceCell::Spawned(p) => Some(WalRedoManagerProcessStatus { pid: p.id() }),
ProcessOnceCell::ManagerShutDown => None,
}),
}
}
}
Expand All @@ -170,9 +235,39 @@ impl PostgresRedoManager {
conf,
last_redo_at: std::sync::Mutex::default(),
redo_process: heavier_once_cell::OnceCell::default(),
launched_processes: utils::sync::gate::Gate::default(),
}
}

/// Shut down the WAL redo manager.
///
/// After this future completes
/// - no redo process is running
/// - no new redo process will be spawned
/// - redo requests that need walredo process will fail with [`Error::Cancelled`]
/// - [`apply_neon`]-only redo requests may still work, but this may change in the future
///
/// # Cancel-Safety
///
/// This method is cancellation-safe.
pub async fn shutdown(&self) {
// prevent new processes from being spawned
let permit = match self.redo_process.get_or_init_detached().await {
Ok(guard) => {
let (proc, permit) = guard.take_and_deinit();
drop(proc); // this just drops the Arc, its refcount may not be zero yet
permit
}
Err(permit) => permit,
};
self.redo_process
.set(ProcessOnceCell::ManagerShutDown, permit);
// wait for ongoing requests to drain and the refcounts of all Arc<WalRedoProcess> that
// we ever launched to drop to zero, which when it happens synchronously kill()s & wait()s
// for the underlying process.
self.launched_processes.close().await;
}

/// This type doesn't have its own background task to check for idleness: we
/// rely on our owner calling this function periodically in its own housekeeping
/// loops.
Expand Down Expand Up @@ -203,38 +298,48 @@ impl PostgresRedoManager {
records: &[(Lsn, NeonWalRecord)],
wal_redo_timeout: Duration,
pg_version: u32,
) -> anyhow::Result<Bytes> {
) -> Result<Bytes, Error> {
*(self.last_redo_at.lock().unwrap()) = Some(Instant::now());

let (rel, blknum) = key.to_rel_block().context("invalid record")?;
const MAX_RETRY_ATTEMPTS: u32 = 1;
let mut n_attempts = 0u32;
loop {
let proc: Arc<process::WalRedoProcess> =
match self.redo_process.get_or_init_detached().await {
Ok(guard) => Arc::clone(&guard),
Err(permit) => {
// don't hold poison_guard, the launch code can bail
let start = Instant::now();
let proc = Arc::new(
process::WalRedoProcess::launch(
let proc: Arc<Process> = match self.redo_process.get_or_init_detached().await {
Ok(guard) => match &*guard {
ProcessOnceCell::Spawned(proc) => Arc::clone(proc),
ProcessOnceCell::ManagerShutDown => {
return Err(Error::Cancelled);
}
},
Err(permit) => {
let start = Instant::now();
let proc = Arc::new(Process {
_launched_processes_guard: match self.launched_processes.enter() {
Ok(guard) => guard,
Err(GateError::GateClosed) => unreachable!(
"shutdown sets the once cell to `ManagerShutDown` state before closing the gate"
),
},
process: process::WalRedoProcess::launch(
self.conf,
self.tenant_shard_id,
pg_version,
)
.context("launch walredo process")?,
);
let duration = start.elapsed();
WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM.observe(duration.as_secs_f64());
info!(
duration_ms = duration.as_millis(),
pid = proc.id(),
"launched walredo process"
);
self.redo_process.set(Arc::clone(&proc), permit);
proc
}
};
});
let duration = start.elapsed();
WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM.observe(duration.as_secs_f64());
info!(
duration_ms = duration.as_millis(),
pid = proc.id(),
"launched walredo process"
);
self.redo_process
.set(ProcessOnceCell::Spawned(Arc::clone(&proc)), permit);
proc
}
};

let started_at = std::time::Instant::now();

Expand Down Expand Up @@ -299,12 +404,17 @@ impl PostgresRedoManager {
match self.redo_process.get() {
None => (),
Some(guard) => {
if Arc::ptr_eq(&proc, &*guard) {
// We're the first to observe an error from `proc`, it's our job to take it out of rotation.
guard.take_and_deinit();
} else {
// Another task already spawned another redo process (further up in this method)
// and put it into `redo_process`. Do nothing, our view of the world is behind.
match &*guard {
ProcessOnceCell::ManagerShutDown => {}
ProcessOnceCell::Spawned(guard_proc) => {
if Arc::ptr_eq(&proc, guard_proc) {
// We're the first to observe an error from `proc`, it's our job to take it out of rotation.
guard.take_and_deinit();
} else {
// Another task already spawned another redo process (further up in this method)
// and put it into `redo_process`. Do nothing, our view of the world is behind.
}
}
}
}
}
Expand All @@ -315,7 +425,7 @@ impl PostgresRedoManager {
}
n_attempts += 1;
if n_attempts > MAX_RETRY_ATTEMPTS || result.is_ok() {
return result;
return result.map_err(Error::Other);
}
}
}
Expand All @@ -329,7 +439,7 @@ impl PostgresRedoManager {
lsn: Lsn,
base_img: Option<Bytes>,
records: &[(Lsn, NeonWalRecord)],
) -> anyhow::Result<Bytes> {
) -> Result<Bytes, Error> {
let start_time = Instant::now();

let mut page = BytesMut::new();
Expand All @@ -338,7 +448,7 @@ impl PostgresRedoManager {
page.extend_from_slice(&fpi[..]);
} else {
// All the current WAL record types that we can handle require a base image.
anyhow::bail!("invalid neon WAL redo request with no base image");
bail!("invalid neon WAL redo request with no base image");
}

// Apply all the WAL records in the batch
Expand Down
Loading