Skip to content

Commit

Permalink
Implement decompression for vectored reads
Browse files Browse the repository at this point in the history
  • Loading branch information
arpad-m committed Jul 6, 2024
1 parent 0a937b7 commit 96f697b
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 17 deletions.
6 changes: 3 additions & 3 deletions pageserver/src/tenant/blob_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,14 +137,14 @@ impl<'a> BlockCursor<'a> {
}

/// Reserved bits for length and compression
const LEN_COMPRESSION_BIT_MASK: u8 = 0xf0;
pub(super) const LEN_COMPRESSION_BIT_MASK: u8 = 0xf0;

/// The maximum size of blobs we support. The highest few bits
/// are reserved for compression and other further uses.
const MAX_SUPPORTED_LEN: usize = 0x0fff_ffff;

const BYTE_UNCOMPRESSED: u8 = 0x80;
const BYTE_ZSTD: u8 = BYTE_UNCOMPRESSED | 0x10;
pub(super) const BYTE_UNCOMPRESSED: u8 = 0x80;
pub(super) const BYTE_ZSTD: u8 = BYTE_UNCOMPRESSED | 0x10;

/// A wrapper of `VirtualFile` that allows users to write blobs.
///
Expand Down
60 changes: 46 additions & 14 deletions pageserver/src/tenant/vectored_blob_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ use std::num::NonZeroUsize;

use bytes::BytesMut;
use pageserver_api::key::Key;
use tokio::io::AsyncWriteExt;
use tokio_epoll_uring::BoundedBuf;
use utils::lsn::Lsn;
use utils::vec_map::VecMap;

use crate::context::RequestContext;
use crate::tenant::blob_io::{BYTE_UNCOMPRESSED, BYTE_ZSTD, LEN_COMPRESSION_BIT_MASK};
use crate::virtual_file::VirtualFile;

#[derive(Copy, Clone, Debug, PartialEq, Eq)]
Expand Down Expand Up @@ -315,7 +317,7 @@ impl<'a> VectoredBlobReader<'a> {
read.size(),
buf.capacity()
);
let buf = self
let mut buf = self
.file
.read_exact_at(buf.slice(0..read.size()), read.start, ctx)
.await?
Expand All @@ -337,38 +339,68 @@ impl<'a> VectoredBlobReader<'a> {
.chain(std::iter::once(None)),
);

// Some scratch space, put here for reusing the allocation
let mut decompressed_vec = Vec::new();

for ((offset, meta), next) in pairs {
let offset_in_buf = offset - start_offset;
let first_len_byte = buf[offset_in_buf as usize];

// Each blob is prefixed by a header containing it's size.
// Each blob is prefixed by a header containing its size and compression information.
// Extract the size and skip that header to find the start of the data.
// The size can be 1 or 4 bytes. The most significant bit is 0 in the
// 1 byte case and 1 in the 4 byte case.
let (size_length, blob_size) = if first_len_byte < 0x80 {
(1, first_len_byte as u64)
let (size_length, blob_size, compression_bits) = if first_len_byte < 0x80 {
(1, first_len_byte as u64, BYTE_UNCOMPRESSED)
} else {
let mut blob_size_buf = [0u8; 4];
let offset_in_buf = offset_in_buf as usize;

blob_size_buf.copy_from_slice(&buf[offset_in_buf..offset_in_buf + 4]);
blob_size_buf[0] &= 0x7f;
(4, u32::from_be_bytes(blob_size_buf) as u64)
blob_size_buf[0] &= !LEN_COMPRESSION_BIT_MASK;

let compression_bits = first_len_byte & LEN_COMPRESSION_BIT_MASK;
(
4,
u32::from_be_bytes(blob_size_buf) as u64,
compression_bits,
)
};

let start = offset_in_buf + size_length;
let end = match next {
let start_raw = offset_in_buf + size_length;
let end_raw = match next {
Some((next_blob_start_offset, _)) => next_blob_start_offset - start_offset,
None => start + blob_size,
None => start_raw + blob_size,
};

assert_eq!(end - start, blob_size);
assert_eq!(end_raw - start_raw, blob_size);
let (start, end);
if compression_bits == BYTE_UNCOMPRESSED {
start = start_raw as usize;
end = end_raw as usize;
} else if compression_bits == BYTE_ZSTD {
let mut decoder =
async_compression::tokio::write::ZstdDecoder::new(&mut decompressed_vec);
decoder
.write_all(&buf[start_raw as usize..end_raw as usize])
.await?;
decoder.flush().await?;
start = buf.len();
buf.extend_from_slice(&decompressed_vec);
end = buf.len();
decompressed_vec.clear();
} else {
let error = std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("invalid compression byte {compression_bits:x}"),
);
return Err(error);
}

metas.push(VectoredBlob {
start: start as usize,
end: end as usize,
start,
end,
meta: *meta,
})
});
}

Ok(VectoredBlobsBuf { buf, blobs: metas })
Expand Down

0 comments on commit 96f697b

Please sign in to comment.