Skip to content

Commit

Permalink
tokio-epoll-uring: retry on launch failures due to locked memory
Browse files Browse the repository at this point in the history
refs #7136

Problem
-------

Before this PR, we were using `tokio_epoll_uring::thread_local_system()`,
which panics on tokio_epoll_uring::System::launch() failure

As we've learned in [the
past](#6373 (comment)),
some older Linux kernels account io_uring instances as locked memory.

And while we've raised the limit in prod considerably, we did hit it
once on 2024-03-11 16:30 UTC.
That was after we enabled tokio-epoll-uring fleet-wide, but before
we had shipped release-5090 (c6ed86d)
which did away with the last mass-creation of tokio-epoll-uring
instances as per

    commit 3da410c
    Author: Christian Schwarz <christian@neon.tech>
    Date:   Tue Mar 5 10:03:54 2024 +0100

        tokio-epoll-uring: use it on the layer-creating code paths (#6378)

Nonetheless, it highlighted that panicking in this situation is probably
not ideal, as it can leave the pageserver process in a semi-broken state.

Further, due to low sampling rate of Prometheus metrics, we don't know
much about the circumstances of this failure instance.

Solution
--------

This PR implements a custom thread_local_system() that is pageserver-aware
and will do the following on failure:
- dump relevant stats to `tracing!`, hopefully they will be useful to
  understand the circumstances better
- if it's the locked memory failure (or any other ENOMEM): abort() the
  process
- if it's ENOMEM, retry with exponential back-off, capped at 3s.
- add metric counters so we can create an alert

This makes sense in the production environment where we know that
_usually_, there's ample locked memory allowance available, and we know
the failure rate is rare.
  • Loading branch information
problame committed Mar 15, 2024
1 parent 49bc734 commit 81c5759
Show file tree
Hide file tree
Showing 7 changed files with 234 additions and 12 deletions.
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions clippy.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ disallowed-methods = [
"tokio::task::block_in_place",
# Allow this for now, to deny it later once we stop using Handle::block_on completely
# "tokio::runtime::Handle::block_on",
# use tokio_epoll_uring_ext instead
"tokio_epoll_uring::thread_local_system",
]

disallowed-macros = [
Expand Down
1 change: 1 addition & 0 deletions pageserver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ postgres.workspace = true
postgres_backend.workspace = true
postgres-protocol.workspace = true
postgres-types.workspace = true
procfs.workspace = true
rand.workspace = true
regex.workspace = true
scopeguard.workspace = true
Expand Down
27 changes: 23 additions & 4 deletions pageserver/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2465,23 +2465,22 @@ impl<F: Future<Output = Result<O, E>>, O, E> Future for MeasuredRemoteOp<F> {
}

pub mod tokio_epoll_uring {
use metrics::UIntGauge;
use metrics::{register_int_counter, UIntGauge};
use once_cell::sync::Lazy;

pub struct Collector {
descs: Vec<metrics::core::Desc>,
systems_created: UIntGauge,
systems_destroyed: UIntGauge,
}

const NMETRICS: usize = 2;

impl metrics::core::Collector for Collector {
fn desc(&self) -> Vec<&metrics::core::Desc> {
self.descs.iter().collect()
}

fn collect(&self) -> Vec<metrics::proto::MetricFamily> {
let mut mfs = Vec::with_capacity(NMETRICS);
let mut mfs = Vec::with_capacity(Self::NMETRICS);
let tokio_epoll_uring::metrics::Metrics {
systems_created,
systems_destroyed,
Expand All @@ -2495,6 +2494,8 @@ pub mod tokio_epoll_uring {
}

impl Collector {
const NMETRICS: usize = 2;

#[allow(clippy::new_without_default)]
pub fn new() -> Self {
let mut descs = Vec::new();
Expand Down Expand Up @@ -2528,6 +2529,22 @@ pub mod tokio_epoll_uring {
}
}
}

pub(crate) static THREAD_LOCAL_LAUNCH_SUCCESSES: Lazy<metrics::IntCounter> = Lazy::new(|| {
register_int_counter!(
"pageserver_tokio_epoll_uring_pageserver_thread_local_launch_success_count",
"Number of times where thread_local_system creation spanned multiple executor threads",
)
.unwrap()
});

pub(crate) static THREAD_LOCAL_LAUNCH_FAILURES: Lazy<metrics::IntCounter> = Lazy::new(|| {
register_int_counter!(
"pageserver_tokio_epoll_uring_pageserver_thread_local_launch_failures_count",
"Number of times thread_local_system creation failed and was retried after back-off.",
)
.unwrap()
});
}

pub(crate) mod tenant_throttling {
Expand Down Expand Up @@ -2656,6 +2673,8 @@ pub fn preinitialize_metrics() {
&WALRECEIVER_BROKER_UPDATES,
&WALRECEIVER_CANDIDATES_ADDED,
&WALRECEIVER_CANDIDATES_REMOVED,
&tokio_epoll_uring::THREAD_LOCAL_LAUNCH_FAILURES,
&tokio_epoll_uring::THREAD_LOCAL_LAUNCH_SUCCESSES,
]
.into_iter()
.for_each(|c| {
Expand Down
14 changes: 9 additions & 5 deletions pageserver/src/virtual_file/io_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
//! Initialize using [`init`].
//!
//! Then use [`get`] and [`super::OpenOptions`].
//!
//!

pub(super) mod tokio_epoll_uring_ext;

use tokio_epoll_uring::{IoBuf, Slice};
use tracing::Instrument;
Expand Down Expand Up @@ -145,7 +149,7 @@ impl IoEngine {
}
#[cfg(target_os = "linux")]
IoEngine::TokioEpollUring => {
let system = tokio_epoll_uring::thread_local_system().await;
let system = tokio_epoll_uring_ext::thread_local_system().await;
let (resources, res) = system.read(file_guard, offset, buf).await;
(resources, res.map_err(epoll_uring_error_to_std))
}
Expand All @@ -160,7 +164,7 @@ impl IoEngine {
}
#[cfg(target_os = "linux")]
IoEngine::TokioEpollUring => {
let system = tokio_epoll_uring::thread_local_system().await;
let system = tokio_epoll_uring_ext::thread_local_system().await;
let (resources, res) = system.fsync(file_guard).await;
(resources, res.map_err(epoll_uring_error_to_std))
}
Expand All @@ -178,7 +182,7 @@ impl IoEngine {
}
#[cfg(target_os = "linux")]
IoEngine::TokioEpollUring => {
let system = tokio_epoll_uring::thread_local_system().await;
let system = tokio_epoll_uring_ext::thread_local_system().await;
let (resources, res) = system.fdatasync(file_guard).await;
(resources, res.map_err(epoll_uring_error_to_std))
}
Expand All @@ -197,7 +201,7 @@ impl IoEngine {
}
#[cfg(target_os = "linux")]
IoEngine::TokioEpollUring => {
let system = tokio_epoll_uring::thread_local_system().await;
let system = tokio_epoll_uring_ext::thread_local_system().await;
let (resources, res) = system.statx(file_guard).await;
(
resources,
Expand All @@ -220,7 +224,7 @@ impl IoEngine {
}
#[cfg(target_os = "linux")]
IoEngine::TokioEpollUring => {
let system = tokio_epoll_uring::thread_local_system().await;
let system = tokio_epoll_uring_ext::thread_local_system().await;
let (resources, res) = system.write(file_guard, offset, buf).await;
(resources, res.map_err(epoll_uring_error_to_std))
}
Expand Down
195 changes: 195 additions & 0 deletions pageserver/src/virtual_file/io_engine/tokio_epoll_uring_ext.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
//! Like [`::tokio_epoll_uring::thread_local_system()`], but with pageserver-specific
//! handling in case the instance can't launched.
//!
//! This is primarily necessary due to ENOMEM aka OutOfMemory errors during io_uring creation
//! on older kernels, such as some (but not all) older kernels in the Linux 5.10 series.
//! See https://github.com/neondatabase/neon/issues/6373#issuecomment-1905814391 for more details.

use std::sync::atomic::AtomicU32;
use std::sync::Arc;

use tokio_util::sync::CancellationToken;
use tracing::{error, info, info_span, warn, Instrument};
use utils::backoff::{DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS};

use tokio_epoll_uring::{System, SystemHandle};

use crate::virtual_file::on_fatal_io_error;

use crate::metrics::tokio_epoll_uring as metrics;

#[derive(Clone)]
struct ThreadLocalState(Arc<ThreadLocalStateInner>);

struct ThreadLocalStateInner {
cell: tokio::sync::OnceCell<SystemHandle>,
launch_attempts: AtomicU32,
}

impl ThreadLocalState {
pub fn new() -> Self {
Self(Arc::new(ThreadLocalStateInner {
cell: tokio::sync::OnceCell::default(),
launch_attempts: AtomicU32::new(0),
}))
}
pub fn make_id_string(&self) -> String {
format!("0x{:p}", Arc::as_ptr(&self.0))
}
}

impl Drop for ThreadLocalState {
fn drop(&mut self) {
info!(parent: None, id=%self.make_id_string(), "tokio-epoll-uring_ext: ThreadLocalState is being dropped and id might be re-used in the future");
}
}

thread_local! {
static THREAD_LOCAL: ThreadLocalState = ThreadLocalState::new();
}

/// Panics if we cannot [`System::launch`].
pub async fn thread_local_system() -> Handle {
let fake_cancel = CancellationToken::new();
loop {
let thread_local_state = THREAD_LOCAL.with(|arc| arc.clone());
let inner = &thread_local_state.0;
// NB: thread_id becomes stale after first await
let get_or_init_res = inner
.cell
.get_or_try_init(|| async {
let attempt_no = inner
.launch_attempts
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let span = info_span!("tokio_epoll_uring_ext::thread_local_system", thread_local=%thread_local_state.make_id_string(), %attempt_no);
async {
// Rate-limit retries per thread-local.
// NB: doesn't yield to executor at attempt_no=0.
utils::backoff::exponential_backoff(
attempt_no,
DEFAULT_BASE_BACKOFF_SECONDS,
DEFAULT_MAX_BACKOFF_SECONDS,
&fake_cancel,
)
.await;
let res = System::launch()
// this might move us to another executor thread => loop outside the get_or_try_init, not inside it
.await;
match res {
Ok(system) => {
info!("successfully launched system");
metrics::THREAD_LOCAL_LAUNCH_SUCCESSES.inc();
Ok(system)
}
Err(tokio_epoll_uring::LaunchResult::IoUringBuild(e)) if e.kind() == std::io::ErrorKind::OutOfMemory => {
warn!("not enough locked memory to tokio-epoll-uring, will retry");
info_span!("stats").in_scope(|| {
emit_launch_failure_process_stats();
});
metrics::THREAD_LOCAL_LAUNCH_FAILURES.inc();
Err(())
}
// abort the process instead of panicking because pageserver usually becomes half-broken if we panic somewhere.
// This is equivalent to a fatal IO error.
Err(ref e @ tokio_epoll_uring::LaunchResult::IoUringBuild(ref inner)) => {
error!(error=%e, "failed to launch thread-local tokio-epoll-uring, this should not happen, aborting process");
info_span!("stats").in_scope(|| {
emit_launch_failure_process_stats();
});
on_fatal_io_error(inner, "launch thread-local tokio-epoll-uring");
},
}
}
.instrument(span)
.await
})
.await;
if get_or_init_res.is_ok() {
return Handle(thread_local_state);
}
}
}

fn emit_launch_failure_process_stats() {
// tokio-epoll-uring stats
// vmlck + rlimit
// number of threads
// rss / system memory usage generally

let tokio_epoll_uring::metrics::Metrics {
systems_created,
systems_destroyed,
} = tokio_epoll_uring::metrics::global();
info!(systems_created, systems_destroyed, "tokio-epoll-uring");

match procfs::process::Process::myself() {
Ok(myself) => {
match myself.limits() {
Ok(limits) => {
info!(?limits.max_locked_memory, "/proc/self/limits");
}
Err(error) => {
info!(%error, "no limit stats due to error");
}
}

match myself.status() {
Ok(status) => {
let procfs::process::Status {
vmsize,
vmlck,
vmpin,
vmrss,
rssanon,
rssfile,
rssshmem,
vmdata,
vmstk,
vmexe,
vmlib,
vmpte,
threads,
..
} = status;
info!(
vmsize,
vmlck,
vmpin,
vmrss,
rssanon,
rssfile,
rssshmem,
vmdata,
vmstk,
vmexe,
vmlib,
vmpte,
threads,
"/proc/self/status"
);
}
Err(error) => {
info!(%error, "no status status due to error");
}
}
}
Err(error) => {
info!(%error, "no process stats due to error");
}
};
}

#[derive(Clone)]
pub struct Handle(ThreadLocalState);

impl std::ops::Deref for Handle {
type Target = SystemHandle;

fn deref(&self) -> &Self::Target {
self.0
.0
.cell
.get()
.expect("must be already initialized when using this")
}
}
2 changes: 1 addition & 1 deletion pageserver/src/virtual_file/open_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ impl OpenOptions {
OpenOptions::StdFs(x) => x.open(path).map(|file| file.into()),
#[cfg(target_os = "linux")]
OpenOptions::TokioEpollUring(x) => {
let system = tokio_epoll_uring::thread_local_system().await;
let system = super::io_engine::tokio_epoll_uring_ext::thread_local_system().await;
system.open(path, x).await.map_err(|e| match e {
tokio_epoll_uring::Error::Op(e) => e,
tokio_epoll_uring::Error::System(system) => {
Expand Down

0 comments on commit 81c5759

Please sign in to comment.