diff --git a/pageserver/src/tenant/vectored_blob_io.rs b/pageserver/src/tenant/vectored_blob_io.rs index 1241a1390209..7ad8446e0411 100644 --- a/pageserver/src/tenant/vectored_blob_io.rs +++ b/pageserver/src/tenant/vectored_blob_io.rs @@ -20,6 +20,7 @@ use std::num::NonZeroUsize; use bytes::BytesMut; use pageserver_api::key::Key; +use tokio_epoll_uring::BoundedBuf; use utils::lsn::Lsn; use utils::vec_map::VecMap; @@ -316,8 +317,9 @@ impl<'a> VectoredBlobReader<'a> { ); let buf = self .file - .read_exact_at_n(buf, read.start, read.size(), ctx) - .await?; + .read_exact_at(buf.slice(0..read.size()), read.start, ctx) + .await? + .into_inner(); let blobs_at = read.blobs_at.as_slice(); let start_offset = blobs_at.first().expect("VectoredRead is never empty").0; diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 04d9386fab92..51b0c420c346 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -13,7 +13,7 @@ use crate::context::RequestContext; use crate::metrics::{StorageIoOperation, STORAGE_IO_SIZE, STORAGE_IO_TIME_METRIC}; -use crate::page_cache::PageWriteGuard; +use crate::page_cache::{PageWriteGuard, PAGE_SZ}; use crate::tenant::TENANTS_SEGMENT_NAME; use camino::{Utf8Path, Utf8PathBuf}; use once_cell::sync::OnceCell; @@ -48,6 +48,7 @@ pub(crate) mod owned_buffers_io { //! but for the time being we're proving out the primitives in the neon.git repo //! for faster iteration. + pub(crate) mod slice; pub(crate) mod write; pub(crate) mod util { pub(crate) mod size_tracking_writer; @@ -143,16 +144,17 @@ struct SlotInner { /// Impl of [`tokio_epoll_uring::IoBuf`] and [`tokio_epoll_uring::IoBufMut`] for [`PageWriteGuard`]. struct PageWriteGuardBuf { page: PageWriteGuard<'static>, - init_up_to: usize, } // Safety: the [`PageWriteGuard`] gives us exclusive ownership of the page cache slot, // and the location remains stable even if [`Self`] or the [`PageWriteGuard`] is moved. +// Page cache pages are zero-initialized, so, wrt uninitialized memory we're good. +// (Page cache tracks separately whether the contents are valid, see `PageWriteGuard::mark_valid`.) unsafe impl tokio_epoll_uring::IoBuf for PageWriteGuardBuf { fn stable_ptr(&self) -> *const u8 { self.page.as_ptr() } fn bytes_init(&self) -> usize { - self.init_up_to + self.page.len() } fn bytes_total(&self) -> usize { self.page.len() @@ -166,8 +168,8 @@ unsafe impl tokio_epoll_uring::IoBufMut for PageWriteGuardBuf { } unsafe fn set_init(&mut self, pos: usize) { + // There shouldn't really be any reason to call this API since bytes_init() == bytes_total(). assert!(pos <= self.page.len()); - self.init_up_to = pos; } } @@ -585,37 +587,37 @@ impl VirtualFile { Ok(self.pos) } - pub async fn read_exact_at( + /// Read the file contents in range `offset..(offset + slice.bytes_total())` into `slice[0..slice.bytes_total()]`. + /// + /// The returned `Slice` is equivalent to the input `slice`, i.e., it's the same view into the same buffer. + pub async fn read_exact_at( &self, - buf: B, + slice: Slice, offset: u64, ctx: &RequestContext, - ) -> Result + ) -> Result, Error> where - B: IoBufMut + Send, + Buf: IoBufMut + Send, { - let (buf, res) = read_exact_at_impl(buf, offset, None, |buf, offset| { - self.read_at(buf, offset, ctx) - }) - .await; - res.map(|()| buf) - } + let assert_we_return_original_bounds = if cfg!(debug_assertions) { + Some((slice.stable_ptr() as usize, slice.bytes_total())) + } else { + None + }; - pub async fn read_exact_at_n( - &self, - buf: B, - offset: u64, - count: usize, - ctx: &RequestContext, - ) -> Result - where - B: IoBufMut + Send, - { - let (buf, res) = read_exact_at_impl(buf, offset, Some(count), |buf, offset| { - self.read_at(buf, offset, ctx) - }) - .await; - res.map(|()| buf) + let original_bounds = slice.bounds(); + let (buf, res) = + read_exact_at_impl(slice, offset, |buf, offset| self.read_at(buf, offset, ctx)).await; + let res = res.map(|_| buf.slice(original_bounds)); + + if let Some(original_bounds) = assert_we_return_original_bounds { + if let Ok(slice) = &res { + let returned_bounds = (slice.stable_ptr() as usize, slice.bytes_total()); + assert_eq!(original_bounds, returned_bounds); + } + } + + res } /// Like [`Self::read_exact_at`] but for [`PageWriteGuard`]. @@ -625,13 +627,11 @@ impl VirtualFile { offset: u64, ctx: &RequestContext, ) -> Result, Error> { - let buf = PageWriteGuardBuf { - page, - init_up_to: 0, - }; - let res = self.read_exact_at(buf, offset, ctx).await; - res.map(|PageWriteGuardBuf { page, .. }| page) - .map_err(|e| Error::new(ErrorKind::Other, e)) + let buf = PageWriteGuardBuf { page }.slice_full(); + debug_assert_eq!(buf.bytes_total(), PAGE_SZ); + self.read_exact_at(buf, offset, ctx) + .await + .map(|slice| slice.into_inner().page) } // Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#219-235 @@ -722,14 +722,14 @@ impl VirtualFile { (buf, Ok(n)) } - pub(crate) async fn read_at( + pub(crate) async fn read_at( &self, - buf: B, + buf: tokio_epoll_uring::Slice, offset: u64, _ctx: &RequestContext, /* TODO: use for metrics: https://github.com/neondatabase/neon/issues/6107 */ - ) -> (B, Result) + ) -> (tokio_epoll_uring::Slice, Result) where - B: tokio_epoll_uring::BoundedBufMut + Send, + Buf: tokio_epoll_uring::IoBufMut + Send, { let file_guard = match self.lock_file().await { Ok(file_guard) => file_guard, @@ -781,26 +781,16 @@ impl VirtualFile { } // Adapted from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135 -pub async fn read_exact_at_impl( - buf: B, +pub async fn read_exact_at_impl( + mut buf: tokio_epoll_uring::Slice, mut offset: u64, - count: Option, mut read_at: F, -) -> (B, std::io::Result<()>) +) -> (Buf, std::io::Result<()>) where - B: IoBufMut + Send, - F: FnMut(tokio_epoll_uring::Slice, u64) -> Fut, - Fut: std::future::Future, std::io::Result)>, + Buf: IoBufMut + Send, + F: FnMut(tokio_epoll_uring::Slice, u64) -> Fut, + Fut: std::future::Future, std::io::Result)>, { - let mut buf: tokio_epoll_uring::Slice = match count { - Some(count) => { - assert!(count <= buf.bytes_total()); - assert!(count > 0); - buf.slice(..count) // may include uninitialized memory - } - None => buf.slice_full(), // includes all the uninitialized memory - }; - while buf.bytes_total() != 0 { let res; (buf, res) = read_at(buf, offset).await; @@ -882,7 +872,7 @@ mod test_read_exact_at_impl { #[tokio::test] async fn test_basic() { - let buf = Vec::with_capacity(5); + let buf = Vec::with_capacity(5).slice_full(); let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt { expectations: VecDeque::from(vec![Expectation { offset: 0, @@ -890,7 +880,7 @@ mod test_read_exact_at_impl { result: Ok(vec![b'a', b'b', b'c', b'd', b'e']), }]), })); - let (buf, res) = read_exact_at_impl(buf, 0, None, |buf, offset| { + let (buf, res) = read_exact_at_impl(buf, 0, |buf, offset| { let mock_read_at = Arc::clone(&mock_read_at); async move { mock_read_at.lock().await.read_at(buf, offset).await } }) @@ -899,33 +889,13 @@ mod test_read_exact_at_impl { assert_eq!(buf, vec![b'a', b'b', b'c', b'd', b'e']); } - #[tokio::test] - async fn test_with_count() { - let buf = Vec::with_capacity(5); - let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt { - expectations: VecDeque::from(vec![Expectation { - offset: 0, - bytes_total: 3, - result: Ok(vec![b'a', b'b', b'c']), - }]), - })); - - let (buf, res) = read_exact_at_impl(buf, 0, Some(3), |buf, offset| { - let mock_read_at = Arc::clone(&mock_read_at); - async move { mock_read_at.lock().await.read_at(buf, offset).await } - }) - .await; - assert!(res.is_ok()); - assert_eq!(buf, vec![b'a', b'b', b'c']); - } - #[tokio::test] async fn test_empty_buf_issues_no_syscall() { - let buf = Vec::new(); + let buf = Vec::new().slice_full(); let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt { expectations: VecDeque::new(), })); - let (_buf, res) = read_exact_at_impl(buf, 0, None, |buf, offset| { + let (_buf, res) = read_exact_at_impl(buf, 0, |buf, offset| { let mock_read_at = Arc::clone(&mock_read_at); async move { mock_read_at.lock().await.read_at(buf, offset).await } }) @@ -935,7 +905,7 @@ mod test_read_exact_at_impl { #[tokio::test] async fn test_two_read_at_calls_needed_until_buf_filled() { - let buf = Vec::with_capacity(4); + let buf = Vec::with_capacity(4).slice_full(); let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt { expectations: VecDeque::from(vec![ Expectation { @@ -950,7 +920,7 @@ mod test_read_exact_at_impl { }, ]), })); - let (buf, res) = read_exact_at_impl(buf, 0, None, |buf, offset| { + let (buf, res) = read_exact_at_impl(buf, 0, |buf, offset| { let mock_read_at = Arc::clone(&mock_read_at); async move { mock_read_at.lock().await.read_at(buf, offset).await } }) @@ -961,7 +931,7 @@ mod test_read_exact_at_impl { #[tokio::test] async fn test_eof_before_buffer_full() { - let buf = Vec::with_capacity(3); + let buf = Vec::with_capacity(3).slice_full(); let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt { expectations: VecDeque::from(vec![ Expectation { @@ -981,7 +951,7 @@ mod test_read_exact_at_impl { }, ]), })); - let (_buf, res) = read_exact_at_impl(buf, 0, None, |buf, offset| { + let (_buf, res) = read_exact_at_impl(buf, 0, |buf, offset| { let mock_read_at = Arc::clone(&mock_read_at); async move { mock_read_at.lock().await.read_at(buf, offset).await } }) @@ -1051,27 +1021,29 @@ impl VirtualFile { ctx: &RequestContext, ) -> Result, std::io::Error> { use crate::page_cache::PAGE_SZ; - let buf = vec![0; PAGE_SZ]; - let buf = self - .read_exact_at(buf, blknum as u64 * (PAGE_SZ as u64), ctx) + let slice = Vec::with_capacity(PAGE_SZ).slice_full(); + assert_eq!(slice.bytes_total(), PAGE_SZ); + let slice = self + .read_exact_at(slice, blknum as u64 * (PAGE_SZ as u64), ctx) .await?; - Ok(crate::tenant::block_io::BlockLease::Vec(buf)) + Ok(crate::tenant::block_io::BlockLease::Vec(slice.into_inner())) } async fn read_to_end(&mut self, buf: &mut Vec, ctx: &RequestContext) -> Result<(), Error> { let mut tmp = vec![0; 128]; loop { - let res; - (tmp, res) = self.read_at(tmp, self.pos, ctx).await; + let slice = tmp.slice(..128); + let (slice, res) = self.read_at(slice, self.pos, ctx).await; match res { Ok(0) => return Ok(()), Ok(n) => { self.pos += n as u64; - buf.extend_from_slice(&tmp[..n]); + buf.extend_from_slice(&slice[..n]); } Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {} Err(e) => return Err(e), } + tmp = slice.into_inner(); } } } @@ -1185,6 +1157,7 @@ mod tests { use crate::task_mgr::TaskKind; use super::*; + use owned_buffers_io::slice::SliceExt; use rand::seq::SliceRandom; use rand::thread_rng; use rand::Rng; @@ -1206,13 +1179,16 @@ mod tests { impl MaybeVirtualFile { async fn read_exact_at( &self, - mut buf: Vec, + mut slice: tokio_epoll_uring::Slice>, offset: u64, ctx: &RequestContext, - ) -> Result, Error> { + ) -> Result>, Error> { match self { - MaybeVirtualFile::VirtualFile(file) => file.read_exact_at(buf, offset, ctx).await, - MaybeVirtualFile::File(file) => file.read_exact_at(&mut buf, offset).map(|()| buf), + MaybeVirtualFile::VirtualFile(file) => file.read_exact_at(slice, offset, ctx).await, + MaybeVirtualFile::File(file) => { + let rust_slice: &mut [u8] = slice.as_mut_rust_slice_full_zeroed(); + file.read_exact_at(rust_slice, offset).map(|()| slice) + } } } async fn write_all_at, Buf: IoBuf + Send>( @@ -1286,9 +1262,12 @@ mod tests { len: usize, ctx: &RequestContext, ) -> Result { - let buf = vec![0; len]; - let buf = self.read_exact_at(buf, pos, ctx).await?; - Ok(String::from_utf8(buf).unwrap()) + let slice = Vec::with_capacity(len).slice_full(); + assert_eq!(slice.bytes_total(), len); + let slice = self.read_exact_at(slice, pos, ctx).await?; + let vec = slice.into_inner(); + assert_eq!(vec.len(), len); + Ok(String::from_utf8(vec).unwrap()) } } @@ -1507,7 +1486,11 @@ mod tests { let mut rng = rand::rngs::OsRng; for _ in 1..1000 { let f = &files[rng.gen_range(0..files.len())]; - buf = f.read_exact_at(buf, 0, &ctx).await.unwrap(); + buf = f + .read_exact_at(buf.slice_full(), 0, &ctx) + .await + .unwrap() + .into_inner(); assert!(buf == SAMPLE); } }); diff --git a/pageserver/src/virtual_file/io_engine.rs b/pageserver/src/virtual_file/io_engine.rs index 7a27be2ca108..2820cea097d1 100644 --- a/pageserver/src/virtual_file/io_engine.rs +++ b/pageserver/src/virtual_file/io_engine.rs @@ -107,7 +107,7 @@ use std::{ sync::atomic::{AtomicU8, Ordering}, }; -use super::{FileGuard, Metadata}; +use super::{owned_buffers_io::slice::SliceExt, FileGuard, Metadata}; #[cfg(target_os = "linux")] fn epoll_uring_error_to_std(e: tokio_epoll_uring::Error) -> std::io::Error { @@ -120,38 +120,29 @@ fn epoll_uring_error_to_std(e: tokio_epoll_uring::Error) -> std: } impl IoEngine { - pub(super) async fn read_at( + pub(super) async fn read_at( &self, file_guard: FileGuard, offset: u64, - mut buf: B, - ) -> ((FileGuard, B), std::io::Result) + mut slice: tokio_epoll_uring::Slice, + ) -> ( + (FileGuard, tokio_epoll_uring::Slice), + std::io::Result, + ) where - B: tokio_epoll_uring::BoundedBufMut + Send, + Buf: tokio_epoll_uring::IoBufMut + Send, { match self { IoEngine::NotSet => panic!("not initialized"), IoEngine::StdFs => { - // SAFETY: `dst` only lives at most as long as this match arm, during which buf remains valid memory. - let dst = unsafe { - std::slice::from_raw_parts_mut(buf.stable_mut_ptr(), buf.bytes_total()) - }; - let res = file_guard.with_std_file(|std_file| std_file.read_at(dst, offset)); - if let Ok(nbytes) = &res { - assert!(*nbytes <= buf.bytes_total()); - // SAFETY: see above assertion - unsafe { - buf.set_init(*nbytes); - } - } - #[allow(dropping_references)] - drop(dst); - ((file_guard, buf), res) + let rust_slice = slice.as_mut_rust_slice_full_zeroed(); + let res = file_guard.with_std_file(|std_file| std_file.read_at(rust_slice, offset)); + ((file_guard, slice), res) } #[cfg(target_os = "linux")] IoEngine::TokioEpollUring => { let system = tokio_epoll_uring_ext::thread_local_system().await; - let (resources, res) = system.read(file_guard, offset, buf).await; + let (resources, res) = system.read(file_guard, offset, slice).await; (resources, res.map_err(epoll_uring_error_to_std)) } } diff --git a/pageserver/src/virtual_file/owned_buffers_io/slice.rs b/pageserver/src/virtual_file/owned_buffers_io/slice.rs new file mode 100644 index 000000000000..d19e5ddffefb --- /dev/null +++ b/pageserver/src/virtual_file/owned_buffers_io/slice.rs @@ -0,0 +1,121 @@ +use tokio_epoll_uring::BoundedBuf; +use tokio_epoll_uring::BoundedBufMut; +use tokio_epoll_uring::IoBufMut; +use tokio_epoll_uring::Slice; + +pub(crate) trait SliceExt { + /// Get a `&mut[0..self.bytes_total()`] slice, for when you need to do borrow-based IO. + /// + /// See the test case `test_slice_full_zeroed` for the difference to just doing `&slice[..]` + fn as_mut_rust_slice_full_zeroed(&mut self) -> &mut [u8]; +} + +impl SliceExt for Slice +where + B: IoBufMut, +{ + #[inline(always)] + fn as_mut_rust_slice_full_zeroed(&mut self) -> &mut [u8] { + // zero-initialize the uninitialized parts of the buffer so we can create a Rust slice + // + // SAFETY: we own `slice`, don't write outside the bounds + unsafe { + let to_init = self.bytes_total() - self.bytes_init(); + self.stable_mut_ptr() + .add(self.bytes_init()) + .write_bytes(0, to_init); + self.set_init(self.bytes_total()); + }; + let bytes_total = self.bytes_total(); + &mut self[0..bytes_total] + } +} + +#[cfg(test)] +mod tests { + use std::io::Read; + + use super::*; + use bytes::Buf; + use tokio_epoll_uring::Slice; + + #[test] + fn test_slice_full_zeroed() { + let make_fake_file = || bytes::BytesMut::from(&b"12345"[..]).reader(); + + // before we start the test, let's make sure we have a shared understanding of what slice_full does + { + let buf = Vec::with_capacity(3); + let slice: Slice<_> = buf.slice_full(); + assert_eq!(slice.bytes_init(), 0); + assert_eq!(slice.bytes_total(), 3); + let rust_slice = &slice[..]; + assert_eq!( + rust_slice.len(), + 0, + "Slice only derefs to a &[u8] of the initialized part" + ); + } + + // and also let's establish a shared understanding of .slice() + { + let buf = Vec::with_capacity(3); + let slice: Slice<_> = buf.slice(0..2); + assert_eq!(slice.bytes_init(), 0); + assert_eq!(slice.bytes_total(), 2); + let rust_slice = &slice[..]; + assert_eq!( + rust_slice.len(), + 0, + "Slice only derefs to a &[u8] of the initialized part" + ); + } + + // the above leads to the easy mistake of using slice[..] for borrow-based IO like so: + { + let buf = Vec::with_capacity(3); + let mut slice: Slice<_> = buf.slice_full(); + assert_eq!(slice[..].len(), 0); + let mut file = make_fake_file(); + file.read_exact(&mut slice[..]).unwrap(); // one might think this reads 3 bytes but it reads 0 + assert_eq!(&slice[..] as &[u8], &[][..] as &[u8]); + } + + // With owned buffers IO like with VirtualFilem, you could totally + // pass in a `Slice` with bytes_init()=0 but bytes_total()=5 + // and it will read 5 bytes into the slice, and return a slice that has bytes_init()=5. + { + // TODO: demo + } + + // + // Ok, now that we have a shared understanding let's demo how to use the extension trait. + // + + // slice_full() + { + let buf = Vec::with_capacity(3); + let mut slice: Slice<_> = buf.slice_full(); + let rust_slice = slice.as_mut_rust_slice_full_zeroed(); + assert_eq!(rust_slice.len(), 3); + assert_eq!(rust_slice, &[0, 0, 0]); + let mut file = make_fake_file(); + file.read_exact(rust_slice).unwrap(); + assert_eq!(rust_slice, b"123"); + assert_eq!(&slice[..], b"123"); + } + + // .slice(..) + { + let buf = Vec::with_capacity(3); + let mut slice: Slice<_> = buf.slice(0..2); + let rust_slice = slice.as_mut_rust_slice_full_zeroed(); + assert_eq!(rust_slice.len(), 2); + assert_eq!(rust_slice, &[0, 0]); + let mut file = make_fake_file(); + file.read_exact(rust_slice).unwrap(); + assert_eq!(rust_slice, b"12"); + assert_eq!(&slice[..], b"12"); + } + } +}