Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tokio-epoll-uring: use it on the layer-creating code paths #6378

Merged
merged 66 commits into from
Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
a3c88a8
tokio-epoll-uring integration: support more operations
problame Jan 17, 2024
c2d8251
WIP: statx
problame Jan 19, 2024
a0032f4
impl
problame Feb 5, 2024
0548374
[DO NOT MERGE} CI: test with both io engines
problame Feb 5, 2024
e824c84
clippy
problame Feb 5, 2024
29b9628
fix clippy in tokio-epoll-uring
problame Feb 5, 2024
c2e9cf4
for real
problame Feb 5, 2024
4fa0181
fix macos build
problame Feb 6, 2024
aaccb7a
Merge branch 'main' into problame/integrate-tokio-epoll-uring/more-ops
problame Feb 7, 2024
9a4880e
WIP
problame Feb 7, 2024
b5a00b0
refactor(disk_btree): make BlockWriter::write_blk infallible
problame Feb 7, 2024
7ba1949
Merge branch 'problame/integrate-tokio-epoll-uring/write-path/btree-b…
problame Feb 7, 2024
91d3e25
finish
problame Feb 7, 2024
6f65648
Revert "refactor(disk_btree): make BlockWriter::write_blk infallible"
problame Feb 7, 2024
85d5fc6
Merge branch 'main' into problame/integrate-tokio-epoll-uring/write-p…
problame Feb 7, 2024
edabce6
use right branch
problame Feb 7, 2024
e5d15df
WIP
problame Feb 7, 2024
2a39457
WIP
problame Feb 7, 2024
4659794
WIP
problame Feb 7, 2024
14bdd84
it turns out one wants to take BoundedBuf, not Slice<T>
problame Feb 7, 2024
4fe3b49
make tests pass
problame Feb 7, 2024
a6605a1
pull bunch of changes down
problame Feb 7, 2024
c92e8a7
don't pull that in
problame Feb 7, 2024
b0144e2
update lib
problame Feb 7, 2024
54561a8
fixup
problame Feb 7, 2024
6c47083
we can't use impl IoBuf for Array
problame Feb 7, 2024
238296a
Merge branch 'main' into problame/integrate-tokio-epoll-uring/write-p…
problame Feb 7, 2024
207764b
Merge branch 'problame/integrate-tokio-epoll-uring/write-path/refacto…
problame Feb 7, 2024
e5da261
work around BoundedBuf.slice(0..x) panicking for x == 0
problame Feb 7, 2024
26e51c7
Merge branch 'main' into problame/integrate-tokio-epoll-uring/write-p…
problame Feb 7, 2024
6f4d182
Merge branch 'problame/integrate-tokio-epoll-uring/write-path/refacto…
problame Feb 7, 2024
33f3053
Merge remote-tracking branch 'origin/main' into problame/integrate-to…
problame Feb 7, 2024
33261e4
Merge branch 'problame/integrate-tokio-epoll-uring/write-path/refacto…
problame Feb 7, 2024
720f633
refactor(virtual_file) make write_all_at take owned buffers
problame Feb 7, 2024
368f2ac
Merge remote-tracking branch 'origin/main' into problame/integrate-to…
problame Feb 8, 2024
3d8f5d1
fix copy-pasta
problame Feb 8, 2024
183c86c
clean up Cargo.toml
problame Feb 8, 2024
a6d022a
WIP
problame Feb 12, 2024
dff1b9b
implement write_at IoEngine function & wire it up; only discovered no…
problame Feb 12, 2024
64799e6
Merge remote-tracking branch 'origin/problame/integrate-tokio-epoll-u…
problame Feb 12, 2024
8998178
Merge branch 'main' into problame/integrate-tokio-epoll-uring/write-p…
problame Feb 12, 2024
98fe109
Merge branch 'problame/integrate-tokio-epoll-uring/write-path/virtual…
problame Feb 12, 2024
bfdd86c
Merge branch 'problame/integrate-tokio-epoll-uring/write-path/virtual…
problame Feb 12, 2024
87f05ca
Merge branch 'main' into problame/integrate-tokio-epoll-uring/write-p…
problame Feb 13, 2024
ebffde1
Merge branch 'main' into problame/integrate-tokio-epoll-uring/write-p…
problame Feb 14, 2024
ab51748
formatting
problame Feb 14, 2024
6a55ac7
complete comment; https://github.com/neondatabase/neon/pull/6673#disc…
problame Feb 14, 2024
0316adc
Merge remote-tracking branch 'origin/main' into problame/integrate-to…
problame Mar 1, 2024
0c50ee9
Merge branch 'problame/integrate-tokio-epoll-uring/write-path/virtual…
problame Mar 1, 2024
2c30090
weird formatting fluke during merge
problame Mar 1, 2024
32358cf
layer file creation: remove redundant fsync()s
problame Mar 1, 2024
4d8b9e3
Merge branch 'problame/integrate-tokio-epoll-uring/layer-write-path-f…
problame Mar 1, 2024
ebf2de5
layer file creation: fsync timeline directories using VirtualFile::sy…
problame Mar 1, 2024
ca2ed0b
layer file creation: fatal_err the timeline dir fsync
problame Mar 1, 2024
ce251ac
Merge remote-tracking branch 'origin/main' into problame/integrate-to…
problame Mar 1, 2024
c4f7a19
Merge branch 'problame/integrate-tokio-epoll-uring/layer-write-path-f…
problame Mar 1, 2024
1fe80d7
rebase on fatal_err changes
problame Mar 1, 2024
5528b16
Merge remote-tracking branch 'origin/main' into problame/integrate-to…
problame Mar 1, 2024
c972d17
Merge branch 'problame/integrate-tokio-epoll-uring/layer-write-path-f…
problame Mar 1, 2024
7299a0a
Merge branch 'problame/integrate-tokio-epoll-uring/create-layer-fatal…
problame Mar 1, 2024
348649d
Merge branch 'problame/integrate-tokio-epoll-uring/ioengine-par-fsync…
problame Mar 1, 2024
0b268fc
avoid `spawn_blocking(Handle::block_on())` for create_delta_layer
problame Mar 1, 2024
00be7e1
Merge branch 'main' into problame/integrate-tokio-epoll-uring/more-ops
problame Mar 4, 2024
ffbe466
fix doc check
problame Mar 4, 2024
f471a42
Revert "[DO NOT MERGE} CI: test with both io engines"
problame Mar 4, 2024
bdd5228
fix macos build
problame Mar 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions pageserver/src/tenant/blob_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
//! len >= 128: 1XXXXXXX XXXXXXXX XXXXXXXX XXXXXXXX
//!
use bytes::{BufMut, BytesMut};
use tokio_epoll_uring::{BoundedBuf, Slice};
use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice};

use crate::context::RequestContext;
use crate::page_cache::PAGE_SZ;
Expand Down Expand Up @@ -127,7 +127,7 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
/// You need to make sure that the internal buffer is empty, otherwise
/// data will be written in wrong order.
#[inline(always)]
async fn write_all_unbuffered<B: BoundedBuf>(
async fn write_all_unbuffered<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
&mut self,
src_buf: B,
) -> (B::Buf, Result<(), Error>) {
Expand Down Expand Up @@ -162,7 +162,10 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
}

/// Internal, possibly buffered, write function
async fn write_all<B: BoundedBuf>(&mut self, src_buf: B) -> (B::Buf, Result<(), Error>) {
async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
&mut self,
src_buf: B,
) -> (B::Buf, Result<(), Error>) {
if !BUFFERED {
assert!(self.buf.is_empty());
return self.write_all_unbuffered(src_buf).await;
Expand Down Expand Up @@ -210,7 +213,10 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {

/// Write a blob of data. Returns the offset that it was written to,
/// which can be used to retrieve the data later.
pub async fn write_blob<B: BoundedBuf>(&mut self, srcbuf: B) -> (B::Buf, Result<u64, Error>) {
pub async fn write_blob<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
&mut self,
srcbuf: B,
) -> (B::Buf, Result<u64, Error>) {
let offset = self.offset;

let len = srcbuf.bytes_init();
Expand Down
1 change: 1 addition & 0 deletions pageserver/src/tenant/storage_layer/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ impl Layer {
let downloaded = resident.expect("just initialized");

// if the rename works, the path is as expected
// TODO: sync system call
std::fs::rename(temp_path, owner.local_path())
.with_context(|| format!("rename temporary file as correct path for {owner}"))?;

Expand Down
78 changes: 41 additions & 37 deletions pageserver/src/tenant/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3410,44 +3410,48 @@ impl Timeline {
frozen_layer: &Arc<InMemoryLayer>,
ctx: &RequestContext,
) -> anyhow::Result<ResidentLayer> {
let span = tracing::info_span!("blocking");
let new_delta: ResidentLayer = tokio::task::spawn_blocking({
let self_clone = Arc::clone(self);
let frozen_layer = Arc::clone(frozen_layer);
let ctx = ctx.attached_child();
move || {
Handle::current().block_on(
async move {
let new_delta = frozen_layer.write_to_disk(&self_clone, &ctx).await?;
// The write_to_disk() above calls writer.finish() which already did the fsync of the inodes.
// We just need to fsync the directory in which these inodes are linked,
// which we know to be the timeline directory.
//
// We use fatal_err() below because the after write_to_disk returns with success,
// the in-memory state of the filesystem already has the layer file in its final place,
// and subsequent pageserver code could think it's durable while it really isn't.
let timeline_dir =
VirtualFile::open(&self_clone.conf.timeline_path(
&self_clone.tenant_shard_id,
&self_clone.timeline_id,
))
.await
.fatal_err("VirtualFile::open for timeline dir fsync");
timeline_dir
.sync_all()
.await
.fatal_err("VirtualFile::sync_all timeline dir");
anyhow::Ok(new_delta)
}
.instrument(span),
)
let self_clone = Arc::clone(self);
let frozen_layer = Arc::clone(frozen_layer);
let ctx = ctx.attached_child();
let work = async move {
let new_delta = frozen_layer.write_to_disk(&self_clone, &ctx).await?;
// The write_to_disk() above calls writer.finish() which already did the fsync of the inodes.
// We just need to fsync the directory in which these inodes are linked,
// which we know to be the timeline directory.
//
// We use fatal_err() below because the after write_to_disk returns with success,
// the in-memory state of the filesystem already has the layer file in its final place,
// and subsequent pageserver code could think it's durable while it really isn't.
let timeline_dir = VirtualFile::open(
&self_clone
.conf
.timeline_path(&self_clone.tenant_shard_id, &self_clone.timeline_id),
)
.await
.fatal_err("VirtualFile::open for timeline dir fsync");
timeline_dir
.sync_all()
.await
.fatal_err("VirtualFile::sync_all timeline dir");
anyhow::Ok(new_delta)
};
// Before tokio-epoll-uring, we ran write_to_disk & the sync_all inside spawn_blocking.
// Preserve that behavior to maintain the same behavior for `virtual_file_io_engine=std-fs`.
use crate::virtual_file::io_engine::IoEngine;
match crate::virtual_file::io_engine::get() {
IoEngine::NotSet => panic!("io engine not set"),
IoEngine::StdFs => {
let span = tracing::info_span!("blocking");
tokio::task::spawn_blocking({
move || Handle::current().block_on(work.instrument(span))
})
.await
.context("spawn_blocking")
.and_then(|x| x)
}
})
.await
.context("spawn_blocking")
.and_then(|x| x)?;

Ok(new_delta)
#[cfg(target_os = "linux")]
IoEngine::TokioEpollUring => work.await,
}
}

async fn repartition(
Expand Down
105 changes: 76 additions & 29 deletions pageserver/src/virtual_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,21 @@ use crate::tenant::TENANTS_SEGMENT_NAME;
use camino::{Utf8Path, Utf8PathBuf};
use once_cell::sync::OnceCell;
use pageserver_api::shard::TenantShardId;
use std::fs::{self, File};
use std::fs::File;
use std::io::{Error, ErrorKind, Seek, SeekFrom};
use tokio_epoll_uring::{BoundedBuf, IoBuf, IoBufMut, Slice};

use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
use std::os::unix::fs::FileExt;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use tokio::time::Instant;

pub use pageserver_api::models::virtual_file as api;
pub(crate) mod io_engine;
mod metadata;
mod open_options;
pub(crate) use io_engine::IoEngineKind;
pub(crate) use metadata::Metadata;
pub(crate) use open_options::*;

///
Expand Down Expand Up @@ -435,13 +436,25 @@ impl VirtualFile {

/// Call File::sync_all() on the underlying File.
pub async fn sync_all(&self) -> Result<(), Error> {
with_file!(self, StorageIoOperation::Fsync, |file_guard| file_guard
.with_std_file(|std_file| std_file.sync_all()))
with_file!(self, StorageIoOperation::Fsync, |file_guard| {
let (_file_guard, res) = io_engine::get().sync_all(file_guard).await;
res
})
}

pub async fn metadata(&self) -> Result<fs::Metadata, Error> {
with_file!(self, StorageIoOperation::Metadata, |file_guard| file_guard
.with_std_file(|std_file| std_file.metadata()))
/// Call File::sync_data() on the underlying File.
pub async fn sync_data(&self) -> Result<(), Error> {
with_file!(self, StorageIoOperation::Fsync, |file_guard| {
let (_file_guard, res) = io_engine::get().sync_data(file_guard).await;
res
})
}

pub async fn metadata(&self) -> Result<Metadata, Error> {
with_file!(self, StorageIoOperation::Metadata, |file_guard| {
let (_file_guard, res) = io_engine::get().metadata(file_guard).await;
res
})
}

/// Helper function internal to `VirtualFile` that looks up the underlying File,
Expand Down Expand Up @@ -579,7 +592,7 @@ impl VirtualFile {
}

// Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#219-235
pub async fn write_all_at<B: BoundedBuf>(
pub async fn write_all_at<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
&self,
buf: B,
mut offset: u64,
Expand All @@ -590,8 +603,9 @@ impl VirtualFile {
}
let mut buf = buf.slice(0..buf_len);
while !buf.is_empty() {
// TODO: push `buf` further down
match self.write_at(&buf, offset).await {
let res;
(buf, res) = self.write_at(buf, offset).await;
match res {
Ok(0) => {
return (
Slice::into_inner(buf),
Expand All @@ -605,7 +619,7 @@ impl VirtualFile {
buf = buf.slice(n..);
offset += n as u64;
}
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
Err(e) if e.kind() == std::io::ErrorKind::Interrupted => {}
Err(e) => return (Slice::into_inner(buf), Err(e)),
}
}
Expand All @@ -616,15 +630,19 @@ impl VirtualFile {
/// Returns the IoBuf that is underlying the BoundedBuf `buf`.
/// I.e., the returned value's `bytes_init()` method returns something different than the `bytes_init()` that was passed in.
/// It's quite brittle and easy to mis-use, so, we return the size in the Ok() variant.
pub async fn write_all<B: BoundedBuf>(&mut self, buf: B) -> (B::Buf, Result<usize, Error>) {
pub async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
&mut self,
buf: B,
) -> (B::Buf, Result<usize, Error>) {
let nbytes = buf.bytes_init();
if nbytes == 0 {
return (Slice::into_inner(buf.slice_full()), Ok(0));
}
let mut buf = buf.slice(0..nbytes);
while !buf.is_empty() {
// TODO: push `Slice` further down
match self.write(&buf).await {
let res;
(buf, res) = self.write(buf).await;
match res {
Ok(0) => {
return (
Slice::into_inner(buf),
Expand All @@ -644,11 +662,18 @@ impl VirtualFile {
(Slice::into_inner(buf), Ok(nbytes))
}

async fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
async fn write<B: IoBuf + Send>(
&mut self,
buf: Slice<B>,
) -> (Slice<B>, Result<usize, std::io::Error>) {
let pos = self.pos;
let n = self.write_at(buf, pos).await?;
let (buf, res) = self.write_at(buf, pos).await;
let n = match res {
Ok(n) => n,
Err(e) => return (buf, Err(e)),
};
self.pos += n as u64;
Ok(n)
(buf, Ok(n))
}

pub(crate) async fn read_at<B>(&self, buf: B, offset: u64) -> (B, Result<usize, Error>)
Expand Down Expand Up @@ -676,16 +701,30 @@ impl VirtualFile {
})
}

async fn write_at(&self, buf: &[u8], offset: u64) -> Result<usize, Error> {
let result = with_file!(self, StorageIoOperation::Write, |file_guard| {
file_guard.with_std_file(|std_file| std_file.write_at(buf, offset))
});
if let Ok(size) = result {
STORAGE_IO_SIZE
.with_label_values(&["write", &self.tenant_id, &self.shard_id, &self.timeline_id])
.add(size as i64);
}
result
async fn write_at<B: IoBuf + Send>(
&self,
buf: Slice<B>,
offset: u64,
) -> (Slice<B>, Result<usize, Error>) {
let file_guard = match self.lock_file().await {
Ok(file_guard) => file_guard,
Err(e) => return (buf, Err(e)),
};
observe_duration!(StorageIoOperation::Write, {
let ((_file_guard, buf), result) =
io_engine::get().write_at(file_guard, offset, buf).await;
if let Ok(size) = result {
STORAGE_IO_SIZE
.with_label_values(&[
"write",
&self.tenant_id,
&self.shard_id,
&self.timeline_id,
])
.add(size as i64);
}
(buf, result)
})
}
}

Expand Down Expand Up @@ -1083,6 +1122,7 @@ mod tests {
use rand::Rng;
use std::future::Future;
use std::io::Write;
use std::os::unix::fs::FileExt;
use std::sync::Arc;

enum MaybeVirtualFile {
Expand All @@ -1103,7 +1143,11 @@ mod tests {
MaybeVirtualFile::File(file) => file.read_exact_at(&mut buf, offset).map(|()| buf),
}
}
async fn write_all_at<B: BoundedBuf>(&self, buf: B, offset: u64) -> Result<(), Error> {
async fn write_all_at<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
&self,
buf: B,
offset: u64,
) -> Result<(), Error> {
match self {
MaybeVirtualFile::VirtualFile(file) => {
let (_buf, res) = file.write_all_at(buf, offset).await;
Expand All @@ -1124,7 +1168,10 @@ mod tests {
MaybeVirtualFile::File(file) => file.seek(pos),
}
}
async fn write_all<B: BoundedBuf>(&mut self, buf: B) -> Result<(), Error> {
async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
&mut self,
buf: B,
) -> Result<(), Error> {
match self {
MaybeVirtualFile::VirtualFile(file) => {
let (_buf, res) = file.write_all(buf).await;
Expand Down
Loading