Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

virtual_file: take a Slice in the read APIs, eliminate read_exact_at_n, fix UB for engine std-fs #8186

Merged
merged 12 commits into from
Jun 28, 2024
6 changes: 4 additions & 2 deletions pageserver/src/tenant/vectored_blob_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::num::NonZeroUsize;

use bytes::BytesMut;
use pageserver_api::key::Key;
use tokio_epoll_uring::BoundedBuf;
use utils::lsn::Lsn;
use utils::vec_map::VecMap;

Expand Down Expand Up @@ -316,8 +317,9 @@ impl<'a> VectoredBlobReader<'a> {
);
let buf = self
.file
.read_exact_at_n(buf, read.start, read.size(), ctx)
.await?;
.read_exact_at(buf.slice(0..read.size()), read.start, ctx)
.await?
.into_inner();

let blobs_at = read.blobs_at.as_slice();
let start_offset = blobs_at.first().expect("VectoredRead is never empty").0;
Expand Down
181 changes: 82 additions & 99 deletions pageserver/src/virtual_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
use crate::context::RequestContext;
use crate::metrics::{StorageIoOperation, STORAGE_IO_SIZE, STORAGE_IO_TIME_METRIC};

use crate::page_cache::PageWriteGuard;
use crate::page_cache::{PageWriteGuard, PAGE_SZ};
use crate::tenant::TENANTS_SEGMENT_NAME;
use camino::{Utf8Path, Utf8PathBuf};
use once_cell::sync::OnceCell;
Expand Down Expand Up @@ -48,6 +48,7 @@ pub(crate) mod owned_buffers_io {
//! but for the time being we're proving out the primitives in the neon.git repo
//! for faster iteration.

pub(crate) mod slice;
pub(crate) mod write;
pub(crate) mod util {
pub(crate) mod size_tracking_writer;
Expand Down Expand Up @@ -143,16 +144,17 @@ struct SlotInner {
/// Impl of [`tokio_epoll_uring::IoBuf`] and [`tokio_epoll_uring::IoBufMut`] for [`PageWriteGuard`].
struct PageWriteGuardBuf {
page: PageWriteGuard<'static>,
init_up_to: usize,
}
// Safety: the [`PageWriteGuard`] gives us exclusive ownership of the page cache slot,
// and the location remains stable even if [`Self`] or the [`PageWriteGuard`] is moved.
// Page cache pages are zero-initialized, so, wrt uninitialized memory we're good.
// (Page cache tracks separately whether the contents are valid, see `PageWriteGuard::mark_valid`.)
unsafe impl tokio_epoll_uring::IoBuf for PageWriteGuardBuf {
fn stable_ptr(&self) -> *const u8 {
self.page.as_ptr()
}
fn bytes_init(&self) -> usize {
self.init_up_to
self.page.len()
}
fn bytes_total(&self) -> usize {
self.page.len()
Expand All @@ -166,8 +168,8 @@ unsafe impl tokio_epoll_uring::IoBufMut for PageWriteGuardBuf {
}

unsafe fn set_init(&mut self, pos: usize) {
// There shouldn't really be any reason to call this API since bytes_init() == bytes_total().
assert!(pos <= self.page.len());
self.init_up_to = pos;
}
}

Expand Down Expand Up @@ -585,37 +587,37 @@ impl VirtualFile {
Ok(self.pos)
}

pub async fn read_exact_at<B>(
/// Read the file contents in range `offset..(offset + slice.bytes_total())` into `slice[0..slice.bytes_total()]`.
///
/// The returned `Slice<Buf>` is equivalent to the input `slice`, i.e., it's the same view into the same buffer.
pub async fn read_exact_at<Buf>(
&self,
buf: B,
slice: Slice<Buf>,
offset: u64,
ctx: &RequestContext,
) -> Result<B, Error>
) -> Result<Slice<Buf>, Error>
where
B: IoBufMut + Send,
Buf: IoBufMut + Send,
{
let (buf, res) = read_exact_at_impl(buf, offset, None, |buf, offset| {
self.read_at(buf, offset, ctx)
})
.await;
res.map(|()| buf)
}
let assert_we_return_original_bounds = if cfg!(debug_assertions) {
Some((slice.stable_ptr() as usize, slice.bytes_total()))
} else {
None
};

pub async fn read_exact_at_n<B>(
&self,
buf: B,
offset: u64,
count: usize,
ctx: &RequestContext,
) -> Result<B, Error>
where
B: IoBufMut + Send,
{
let (buf, res) = read_exact_at_impl(buf, offset, Some(count), |buf, offset| {
self.read_at(buf, offset, ctx)
})
.await;
res.map(|()| buf)
let original_bounds = slice.bounds();
let (buf, res) =
read_exact_at_impl(slice, offset, |buf, offset| self.read_at(buf, offset, ctx)).await;
let res = res.map(|_| buf.slice(original_bounds));

if let Some(original_bounds) = assert_we_return_original_bounds {
if let Ok(slice) = &res {
let returned_bounds = (slice.stable_ptr() as usize, slice.bytes_total());
assert_eq!(original_bounds, returned_bounds);
}
}

res
}

/// Like [`Self::read_exact_at`] but for [`PageWriteGuard`].
Expand All @@ -625,13 +627,11 @@ impl VirtualFile {
offset: u64,
ctx: &RequestContext,
) -> Result<PageWriteGuard<'static>, Error> {
let buf = PageWriteGuardBuf {
page,
init_up_to: 0,
};
let res = self.read_exact_at(buf, offset, ctx).await;
res.map(|PageWriteGuardBuf { page, .. }| page)
.map_err(|e| Error::new(ErrorKind::Other, e))
let buf = PageWriteGuardBuf { page }.slice_full();
debug_assert_eq!(buf.bytes_total(), PAGE_SZ);
self.read_exact_at(buf, offset, ctx)
.await
.map(|slice| slice.into_inner().page)
}

// Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#219-235
Expand Down Expand Up @@ -722,14 +722,14 @@ impl VirtualFile {
(buf, Ok(n))
}

pub(crate) async fn read_at<B>(
pub(crate) async fn read_at<Buf>(
&self,
buf: B,
buf: tokio_epoll_uring::Slice<Buf>,
offset: u64,
_ctx: &RequestContext, /* TODO: use for metrics: https://github.com/neondatabase/neon/issues/6107 */
) -> (B, Result<usize, Error>)
) -> (tokio_epoll_uring::Slice<Buf>, Result<usize, Error>)
where
B: tokio_epoll_uring::BoundedBufMut + Send,
Buf: tokio_epoll_uring::IoBufMut + Send,
{
let file_guard = match self.lock_file().await {
Ok(file_guard) => file_guard,
Expand Down Expand Up @@ -781,26 +781,16 @@ impl VirtualFile {
}

// Adapted from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135
pub async fn read_exact_at_impl<B, F, Fut>(
buf: B,
pub async fn read_exact_at_impl<Buf, F, Fut>(
mut buf: tokio_epoll_uring::Slice<Buf>,
mut offset: u64,
count: Option<usize>,
mut read_at: F,
) -> (B, std::io::Result<()>)
) -> (Buf, std::io::Result<()>)
where
B: IoBufMut + Send,
F: FnMut(tokio_epoll_uring::Slice<B>, u64) -> Fut,
Fut: std::future::Future<Output = (tokio_epoll_uring::Slice<B>, std::io::Result<usize>)>,
Buf: IoBufMut + Send,
F: FnMut(tokio_epoll_uring::Slice<Buf>, u64) -> Fut,
Fut: std::future::Future<Output = (tokio_epoll_uring::Slice<Buf>, std::io::Result<usize>)>,
{
let mut buf: tokio_epoll_uring::Slice<B> = match count {
Some(count) => {
assert!(count <= buf.bytes_total());
assert!(count > 0);
buf.slice(..count) // may include uninitialized memory
}
None => buf.slice_full(), // includes all the uninitialized memory
};

while buf.bytes_total() != 0 {
let res;
(buf, res) = read_at(buf, offset).await;
Expand Down Expand Up @@ -882,15 +872,15 @@ mod test_read_exact_at_impl {

#[tokio::test]
async fn test_basic() {
let buf = Vec::with_capacity(5);
let buf = Vec::with_capacity(5).slice_full();
let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
expectations: VecDeque::from(vec![Expectation {
offset: 0,
bytes_total: 5,
result: Ok(vec![b'a', b'b', b'c', b'd', b'e']),
}]),
}));
let (buf, res) = read_exact_at_impl(buf, 0, None, |buf, offset| {
let (buf, res) = read_exact_at_impl(buf, 0, |buf, offset| {
let mock_read_at = Arc::clone(&mock_read_at);
async move { mock_read_at.lock().await.read_at(buf, offset).await }
})
Expand All @@ -899,33 +889,13 @@ mod test_read_exact_at_impl {
assert_eq!(buf, vec![b'a', b'b', b'c', b'd', b'e']);
}

#[tokio::test]
async fn test_with_count() {
let buf = Vec::with_capacity(5);
let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
expectations: VecDeque::from(vec![Expectation {
offset: 0,
bytes_total: 3,
result: Ok(vec![b'a', b'b', b'c']),
}]),
}));

let (buf, res) = read_exact_at_impl(buf, 0, Some(3), |buf, offset| {
let mock_read_at = Arc::clone(&mock_read_at);
async move { mock_read_at.lock().await.read_at(buf, offset).await }
})
.await;
assert!(res.is_ok());
assert_eq!(buf, vec![b'a', b'b', b'c']);
}

#[tokio::test]
async fn test_empty_buf_issues_no_syscall() {
let buf = Vec::new();
let buf = Vec::new().slice_full();
let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
expectations: VecDeque::new(),
}));
let (_buf, res) = read_exact_at_impl(buf, 0, None, |buf, offset| {
let (_buf, res) = read_exact_at_impl(buf, 0, |buf, offset| {
let mock_read_at = Arc::clone(&mock_read_at);
async move { mock_read_at.lock().await.read_at(buf, offset).await }
})
Expand All @@ -935,7 +905,7 @@ mod test_read_exact_at_impl {

#[tokio::test]
async fn test_two_read_at_calls_needed_until_buf_filled() {
let buf = Vec::with_capacity(4);
let buf = Vec::with_capacity(4).slice_full();
let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
expectations: VecDeque::from(vec![
Expectation {
Expand All @@ -950,7 +920,7 @@ mod test_read_exact_at_impl {
},
]),
}));
let (buf, res) = read_exact_at_impl(buf, 0, None, |buf, offset| {
let (buf, res) = read_exact_at_impl(buf, 0, |buf, offset| {
let mock_read_at = Arc::clone(&mock_read_at);
async move { mock_read_at.lock().await.read_at(buf, offset).await }
})
Expand All @@ -961,7 +931,7 @@ mod test_read_exact_at_impl {

#[tokio::test]
async fn test_eof_before_buffer_full() {
let buf = Vec::with_capacity(3);
let buf = Vec::with_capacity(3).slice_full();
let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
expectations: VecDeque::from(vec![
Expectation {
Expand All @@ -981,7 +951,7 @@ mod test_read_exact_at_impl {
},
]),
}));
let (_buf, res) = read_exact_at_impl(buf, 0, None, |buf, offset| {
let (_buf, res) = read_exact_at_impl(buf, 0, |buf, offset| {
let mock_read_at = Arc::clone(&mock_read_at);
async move { mock_read_at.lock().await.read_at(buf, offset).await }
})
Expand Down Expand Up @@ -1051,27 +1021,29 @@ impl VirtualFile {
ctx: &RequestContext,
) -> Result<crate::tenant::block_io::BlockLease<'_>, std::io::Error> {
use crate::page_cache::PAGE_SZ;
let buf = vec![0; PAGE_SZ];
let buf = self
.read_exact_at(buf, blknum as u64 * (PAGE_SZ as u64), ctx)
let slice = Vec::with_capacity(PAGE_SZ).slice_full();
assert_eq!(slice.bytes_total(), PAGE_SZ);
let slice = self
.read_exact_at(slice, blknum as u64 * (PAGE_SZ as u64), ctx)
.await?;
Ok(crate::tenant::block_io::BlockLease::Vec(buf))
Ok(crate::tenant::block_io::BlockLease::Vec(slice.into_inner()))
}

async fn read_to_end(&mut self, buf: &mut Vec<u8>, ctx: &RequestContext) -> Result<(), Error> {
let mut tmp = vec![0; 128];
loop {
let res;
(tmp, res) = self.read_at(tmp, self.pos, ctx).await;
let slice = tmp.slice(..128);
let (slice, res) = self.read_at(slice, self.pos, ctx).await;
match res {
Ok(0) => return Ok(()),
Ok(n) => {
self.pos += n as u64;
buf.extend_from_slice(&tmp[..n]);
buf.extend_from_slice(&slice[..n]);
}
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
Err(e) => return Err(e),
}
tmp = slice.into_inner();
}
}
}
Expand Down Expand Up @@ -1185,6 +1157,7 @@ mod tests {
use crate::task_mgr::TaskKind;

use super::*;
use owned_buffers_io::slice::SliceExt;
use rand::seq::SliceRandom;
use rand::thread_rng;
use rand::Rng;
Expand All @@ -1206,13 +1179,16 @@ mod tests {
impl MaybeVirtualFile {
async fn read_exact_at(
&self,
mut buf: Vec<u8>,
mut slice: tokio_epoll_uring::Slice<Vec<u8>>,
offset: u64,
ctx: &RequestContext,
) -> Result<Vec<u8>, Error> {
) -> Result<tokio_epoll_uring::Slice<Vec<u8>>, Error> {
match self {
MaybeVirtualFile::VirtualFile(file) => file.read_exact_at(buf, offset, ctx).await,
MaybeVirtualFile::File(file) => file.read_exact_at(&mut buf, offset).map(|()| buf),
MaybeVirtualFile::VirtualFile(file) => file.read_exact_at(slice, offset, ctx).await,
MaybeVirtualFile::File(file) => {
let rust_slice: &mut [u8] = slice.as_mut_rust_slice_full_zeroed();
problame marked this conversation as resolved.
Show resolved Hide resolved
file.read_exact_at(rust_slice, offset).map(|()| slice)
}
}
}
async fn write_all_at<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
Expand Down Expand Up @@ -1286,9 +1262,12 @@ mod tests {
len: usize,
ctx: &RequestContext,
) -> Result<String, Error> {
let buf = vec![0; len];
let buf = self.read_exact_at(buf, pos, ctx).await?;
Ok(String::from_utf8(buf).unwrap())
let slice = Vec::with_capacity(len).slice_full();
assert_eq!(slice.bytes_total(), len);
let slice = self.read_exact_at(slice, pos, ctx).await?;
let vec = slice.into_inner();
assert_eq!(vec.len(), len);
Ok(String::from_utf8(vec).unwrap())
}
}

Expand Down Expand Up @@ -1507,7 +1486,11 @@ mod tests {
let mut rng = rand::rngs::OsRng;
for _ in 1..1000 {
let f = &files[rng.gen_range(0..files.len())];
buf = f.read_exact_at(buf, 0, &ctx).await.unwrap();
buf = f
.read_exact_at(buf.slice_full(), 0, &ctx)
.await
.unwrap()
.into_inner();
assert!(buf == SAMPLE);
}
});
Expand Down
Loading
Loading