Skip to content

Commit

Permalink
tokio-epoll-uring: retry on launch failures due to locked memory (#7141)
Browse files Browse the repository at this point in the history
refs #7136

Problem
-------

Before this PR, we were using
`tokio_epoll_uring::thread_local_system()`,
which panics on tokio_epoll_uring::System::launch() failure

As we've learned in [the

past](#6373 (comment)),
some older Linux kernels account io_uring instances as locked memory.

And while we've raised the limit in prod considerably, we did hit it
once on 2024-03-11 16:30 UTC.
That was after we enabled tokio-epoll-uring fleet-wide, but before
we had shipped release-5090 (c6ed86d)
which did away with the last mass-creation of tokio-epoll-uring
instances as per

    commit 3da410c
    Author: Christian Schwarz <christian@neon.tech>
    Date:   Tue Mar 5 10:03:54 2024 +0100

tokio-epoll-uring: use it on the layer-creating code paths (#6378)

Nonetheless, it highlighted that panicking in this situation is probably
not ideal, as it can leave the pageserver process in a semi-broken
state.

Further, due to low sampling rate of Prometheus metrics, we don't know
much about the circumstances of this failure instance.

Solution
--------

This PR implements a custom thread_local_system() that is
pageserver-aware
and will do the following on failure:
- dump relevant stats to `tracing!`, hopefully they will be useful to
  understand the circumstances better
- if it's the locked memory failure (or any other ENOMEM): abort() the
  process
- if it's ENOMEM, retry with exponential back-off, capped at 3s.
- add metric counters so we can create an alert

This makes sense in the production environment where we know that
_usually_, there's ample locked memory allowance available, and we know
the failure rate is rare.
  • Loading branch information
problame committed Mar 15, 2024
1 parent 9752ad8 commit 0694ee9
Show file tree
Hide file tree
Showing 8 changed files with 234 additions and 10 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions clippy.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ disallowed-methods = [
"tokio::task::block_in_place",
# Allow this for now, to deny it later once we stop using Handle::block_on completely
# "tokio::runtime::Handle::block_on",
# use tokio_epoll_uring_ext instead
"tokio_epoll_uring::thread_local_system",
]

disallowed-macros = [
Expand Down
1 change: 1 addition & 0 deletions pageserver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ postgres.workspace = true
postgres_backend.workspace = true
postgres-protocol.workspace = true
postgres-types.workspace = true
procfs.workspace = true
rand.workspace = true
regex.workspace = true
scopeguard.workspace = true
Expand Down
27 changes: 23 additions & 4 deletions pageserver/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2465,23 +2465,22 @@ impl<F: Future<Output = Result<O, E>>, O, E> Future for MeasuredRemoteOp<F> {
}

pub mod tokio_epoll_uring {
use metrics::UIntGauge;
use metrics::{register_int_counter, UIntGauge};
use once_cell::sync::Lazy;

pub struct Collector {
descs: Vec<metrics::core::Desc>,
systems_created: UIntGauge,
systems_destroyed: UIntGauge,
}

const NMETRICS: usize = 2;

impl metrics::core::Collector for Collector {
fn desc(&self) -> Vec<&metrics::core::Desc> {
self.descs.iter().collect()
}

fn collect(&self) -> Vec<metrics::proto::MetricFamily> {
let mut mfs = Vec::with_capacity(NMETRICS);
let mut mfs = Vec::with_capacity(Self::NMETRICS);
let tokio_epoll_uring::metrics::Metrics {
systems_created,
systems_destroyed,
Expand All @@ -2495,6 +2494,8 @@ pub mod tokio_epoll_uring {
}

impl Collector {
const NMETRICS: usize = 2;

#[allow(clippy::new_without_default)]
pub fn new() -> Self {
let mut descs = Vec::new();
Expand Down Expand Up @@ -2528,6 +2529,22 @@ pub mod tokio_epoll_uring {
}
}
}

pub(crate) static THREAD_LOCAL_LAUNCH_SUCCESSES: Lazy<metrics::IntCounter> = Lazy::new(|| {
register_int_counter!(
"pageserver_tokio_epoll_uring_pageserver_thread_local_launch_success_count",
"Number of times where thread_local_system creation spanned multiple executor threads",
)
.unwrap()
});

pub(crate) static THREAD_LOCAL_LAUNCH_FAILURES: Lazy<metrics::IntCounter> = Lazy::new(|| {
register_int_counter!(
"pageserver_tokio_epoll_uring_pageserver_thread_local_launch_failures_count",
"Number of times thread_local_system creation failed and was retried after back-off.",
)
.unwrap()
});
}

pub(crate) mod tenant_throttling {
Expand Down Expand Up @@ -2656,6 +2673,8 @@ pub fn preinitialize_metrics() {
&WALRECEIVER_BROKER_UPDATES,
&WALRECEIVER_CANDIDATES_ADDED,
&WALRECEIVER_CANDIDATES_REMOVED,
&tokio_epoll_uring::THREAD_LOCAL_LAUNCH_FAILURES,
&tokio_epoll_uring::THREAD_LOCAL_LAUNCH_SUCCESSES,
]
.into_iter()
.for_each(|c| {
Expand Down
14 changes: 9 additions & 5 deletions pageserver/src/virtual_file/io_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
//! Initialize using [`init`].
//!
//! Then use [`get`] and [`super::OpenOptions`].
//!
//!

pub(super) mod tokio_epoll_uring_ext;

use tokio_epoll_uring::{IoBuf, Slice};
use tracing::Instrument;
Expand Down Expand Up @@ -145,7 +149,7 @@ impl IoEngine {
}
#[cfg(target_os = "linux")]
IoEngine::TokioEpollUring => {
let system = tokio_epoll_uring::thread_local_system().await;
let system = tokio_epoll_uring_ext::thread_local_system().await;
let (resources, res) = system.read(file_guard, offset, buf).await;
(resources, res.map_err(epoll_uring_error_to_std))
}
Expand All @@ -160,7 +164,7 @@ impl IoEngine {
}
#[cfg(target_os = "linux")]
IoEngine::TokioEpollUring => {
let system = tokio_epoll_uring::thread_local_system().await;
let system = tokio_epoll_uring_ext::thread_local_system().await;
let (resources, res) = system.fsync(file_guard).await;
(resources, res.map_err(epoll_uring_error_to_std))
}
Expand All @@ -178,7 +182,7 @@ impl IoEngine {
}
#[cfg(target_os = "linux")]
IoEngine::TokioEpollUring => {
let system = tokio_epoll_uring::thread_local_system().await;
let system = tokio_epoll_uring_ext::thread_local_system().await;
let (resources, res) = system.fdatasync(file_guard).await;
(resources, res.map_err(epoll_uring_error_to_std))
}
Expand All @@ -197,7 +201,7 @@ impl IoEngine {
}
#[cfg(target_os = "linux")]
IoEngine::TokioEpollUring => {
let system = tokio_epoll_uring::thread_local_system().await;
let system = tokio_epoll_uring_ext::thread_local_system().await;
let (resources, res) = system.statx(file_guard).await;
(
resources,
Expand All @@ -220,7 +224,7 @@ impl IoEngine {
}
#[cfg(target_os = "linux")]
IoEngine::TokioEpollUring => {
let system = tokio_epoll_uring::thread_local_system().await;
let system = tokio_epoll_uring_ext::thread_local_system().await;
let (resources, res) = system.write(file_guard, offset, buf).await;
(resources, res.map_err(epoll_uring_error_to_std))
}
Expand Down
194 changes: 194 additions & 0 deletions pageserver/src/virtual_file/io_engine/tokio_epoll_uring_ext.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
//! Like [`::tokio_epoll_uring::thread_local_system()`], but with pageserver-specific
//! handling in case the instance can't launched.
//!
//! This is primarily necessary due to ENOMEM aka OutOfMemory errors during io_uring creation
//! on older kernels, such as some (but not all) older kernels in the Linux 5.10 series.
//! See <https://github.com/neondatabase/neon/issues/6373#issuecomment-1905814391> for more details.

use std::sync::atomic::AtomicU32;
use std::sync::Arc;

use tokio_util::sync::CancellationToken;
use tracing::{error, info, info_span, warn, Instrument};
use utils::backoff::{DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS};

use tokio_epoll_uring::{System, SystemHandle};

use crate::virtual_file::on_fatal_io_error;

use crate::metrics::tokio_epoll_uring as metrics;

#[derive(Clone)]
struct ThreadLocalState(Arc<ThreadLocalStateInner>);

struct ThreadLocalStateInner {
cell: tokio::sync::OnceCell<SystemHandle>,
launch_attempts: AtomicU32,
}

impl ThreadLocalState {
pub fn new() -> Self {
Self(Arc::new(ThreadLocalStateInner {
cell: tokio::sync::OnceCell::default(),
launch_attempts: AtomicU32::new(0),
}))
}
pub fn make_id_string(&self) -> String {
format!("0x{:p}", Arc::as_ptr(&self.0))
}
}

impl Drop for ThreadLocalState {
fn drop(&mut self) {
info!(parent: None, id=%self.make_id_string(), "tokio-epoll-uring_ext: ThreadLocalState is being dropped and id might be re-used in the future");
}
}

thread_local! {
static THREAD_LOCAL: ThreadLocalState = ThreadLocalState::new();
}

/// Panics if we cannot [`System::launch`].
pub async fn thread_local_system() -> Handle {
let fake_cancel = CancellationToken::new();
loop {
let thread_local_state = THREAD_LOCAL.with(|arc| arc.clone());
let inner = &thread_local_state.0;
let get_or_init_res = inner
.cell
.get_or_try_init(|| async {
let attempt_no = inner
.launch_attempts
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let span = info_span!("tokio_epoll_uring_ext::thread_local_system", thread_local=%thread_local_state.make_id_string(), %attempt_no);
async {
// Rate-limit retries per thread-local.
// NB: doesn't yield to executor at attempt_no=0.
utils::backoff::exponential_backoff(
attempt_no,
DEFAULT_BASE_BACKOFF_SECONDS,
DEFAULT_MAX_BACKOFF_SECONDS,
&fake_cancel,
)
.await;
let res = System::launch()
// this might move us to another executor thread => loop outside the get_or_try_init, not inside it
.await;
match res {
Ok(system) => {
info!("successfully launched system");
metrics::THREAD_LOCAL_LAUNCH_SUCCESSES.inc();
Ok(system)
}
Err(tokio_epoll_uring::LaunchResult::IoUringBuild(e)) if e.kind() == std::io::ErrorKind::OutOfMemory => {
warn!("not enough locked memory to tokio-epoll-uring, will retry");
info_span!("stats").in_scope(|| {
emit_launch_failure_process_stats();
});
metrics::THREAD_LOCAL_LAUNCH_FAILURES.inc();
Err(())
}
// abort the process instead of panicking because pageserver usually becomes half-broken if we panic somewhere.
// This is equivalent to a fatal IO error.
Err(ref e @ tokio_epoll_uring::LaunchResult::IoUringBuild(ref inner)) => {
error!(error=%e, "failed to launch thread-local tokio-epoll-uring, this should not happen, aborting process");
info_span!("stats").in_scope(|| {
emit_launch_failure_process_stats();
});
on_fatal_io_error(inner, "launch thread-local tokio-epoll-uring");
},
}
}
.instrument(span)
.await
})
.await;
if get_or_init_res.is_ok() {
return Handle(thread_local_state);
}
}
}

fn emit_launch_failure_process_stats() {
// tokio-epoll-uring stats
// vmlck + rlimit
// number of threads
// rss / system memory usage generally

let tokio_epoll_uring::metrics::Metrics {
systems_created,
systems_destroyed,
} = tokio_epoll_uring::metrics::global();
info!(systems_created, systems_destroyed, "tokio-epoll-uring");

match procfs::process::Process::myself() {
Ok(myself) => {
match myself.limits() {
Ok(limits) => {
info!(?limits.max_locked_memory, "/proc/self/limits");
}
Err(error) => {
info!(%error, "no limit stats due to error");
}
}

match myself.status() {
Ok(status) => {
let procfs::process::Status {
vmsize,
vmlck,
vmpin,
vmrss,
rssanon,
rssfile,
rssshmem,
vmdata,
vmstk,
vmexe,
vmlib,
vmpte,
threads,
..
} = status;
info!(
vmsize,
vmlck,
vmpin,
vmrss,
rssanon,
rssfile,
rssshmem,
vmdata,
vmstk,
vmexe,
vmlib,
vmpte,
threads,
"/proc/self/status"
);
}
Err(error) => {
info!(%error, "no status status due to error");
}
}
}
Err(error) => {
info!(%error, "no process stats due to error");
}
};
}

#[derive(Clone)]
pub struct Handle(ThreadLocalState);

impl std::ops::Deref for Handle {
type Target = SystemHandle;

fn deref(&self) -> &Self::Target {
self.0
.0
.cell
.get()
.expect("must be already initialized when using this")
}
}
2 changes: 1 addition & 1 deletion pageserver/src/virtual_file/open_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ impl OpenOptions {
OpenOptions::StdFs(x) => x.open(path).map(|file| file.into()),
#[cfg(target_os = "linux")]
OpenOptions::TokioEpollUring(x) => {
let system = tokio_epoll_uring::thread_local_system().await;
let system = super::io_engine::tokio_epoll_uring_ext::thread_local_system().await;
system.open(path, x).await.map_err(|e| match e {
tokio_epoll_uring::Error::Op(e) => e,
tokio_epoll_uring::Error::System(system) => {
Expand Down
2 changes: 2 additions & 0 deletions workspace_hack/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ aws-smithy-types = { version = "1", default-features = false, features = ["byte-
axum = { version = "0.6", features = ["ws"] }
base64 = { version = "0.21", features = ["alloc"] }
base64ct = { version = "1", default-features = false, features = ["std"] }
byteorder = { version = "1", features = ["i128"] }
bytes = { version = "1", features = ["serde"] }
chrono = { version = "0.4", default-features = false, features = ["clock", "serde", "wasmbind"] }
clap = { version = "4", features = ["derive", "string"] }
Expand Down Expand Up @@ -86,6 +87,7 @@ zstd-sys = { version = "2", default-features = false, features = ["legacy", "std

[build-dependencies]
anyhow = { version = "1", features = ["backtrace"] }
byteorder = { version = "1", features = ["i128"] }
bytes = { version = "1", features = ["serde"] }
cc = { version = "1", default-features = false, features = ["parallel"] }
chrono = { version = "0.4", default-features = false, features = ["clock", "serde", "wasmbind"] }
Expand Down

1 comment on commit 0694ee9

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No tests were run or test report is not available

Test coverage report is not available

The comment gets automatically updated with the latest test results
0694ee9 at 2024-03-15T19:47:45.644Z :recycle:

Please sign in to comment.