diff --git a/pco/README.md b/pco/README.md index 6a187492..762d793e 100644 --- a/pco/README.md +++ b/pco/README.md @@ -31,8 +31,8 @@ fn main() { To run something right away, try [the benchmarks](../bench/README.md). -For a lower-level standalone API that allows writing/reading one chunk at a time and -extracting all metadata, see [the docs.rs documentation](https://docs.rs/pco/latest/pco/). +For a lower-level standalone API that allows writing one chunk at a time / +streaming reads, see [the docs.rs documentation](https://docs.rs/pco/latest/pco/). ## Usage as a Wrapped Format diff --git a/pco/src/ans/decoding.rs b/pco/src/ans/decoding.rs index cdd01f37..ac58e9c4 100644 --- a/pco/src/ans/decoding.rs +++ b/pco/src/ans/decoding.rs @@ -60,17 +60,13 @@ impl Decoder { #[inline] pub fn unchecked_decode(&mut self, reader: &mut BitReader) -> Token { let node = &self.nodes[self.state - self.table_size]; - let bits_read = reader.unchecked_read_uint::(node.bits_to_read); - let next_state = node.next_state_base + bits_read; - self.state = next_state; + self.state = node.next_state_base + reader.unchecked_read_uint::(node.bits_to_read); node.token } pub fn decode(&mut self, reader: &mut BitReader) -> PcoResult { let node = &self.nodes[self.state - self.table_size]; - let bits_read = reader.read_small(node.bits_to_read)?; - let next_state = node.next_state_base + bits_read; - self.state = next_state; + self.state = node.next_state_base + reader.read_small(node.bits_to_read)?; Ok(node.token) } } diff --git a/pco/src/base_compressor.rs b/pco/src/base_compressor.rs index c3650126..2c6119df 100644 --- a/pco/src/base_compressor.rs +++ b/pco/src/base_compressor.rs @@ -3,7 +3,7 @@ use std::fmt::Debug; use crate::bin::{Bin, BinCompressionInfo}; use crate::bit_writer::BitWriter; -use crate::chunk_metadata::{ChunkMetadata, ChunkStreamMetadata}; +use crate::chunk_metadata::{ChunkMetadata, ChunkStreamMetadata, PageMetadata, PageStreamMetadata}; use crate::chunk_spec::ChunkSpec; use crate::compression_table::CompressionTable; use crate::constants::*; @@ -454,7 +454,7 @@ pub struct MidChunkInfo { } impl MidChunkInfo { - fn data_page_moments(&self, stream_idx: usize) -> &DeltaMoments { + fn page_moments(&self, stream_idx: usize) -> &DeltaMoments { &self.stream_configs[stream_idx].delta_momentss[self.page_idx] } @@ -660,7 +660,7 @@ impl BaseCompressor { Ok(meta) } - pub fn data_page_internal(&mut self) -> PcoResult<()> { + pub fn page_internal(&mut self) -> PcoResult<()> { let info = match &mut self.state { State::MidChunk(info) => Ok(info), other => Err(other.wrong_step_err("data page")), @@ -668,20 +668,23 @@ impl BaseCompressor { let decomposeds = decompose_unsigneds(info)?; + let mut streams = Vec::with_capacity(info.n_streams); for stream_idx in 0..info.n_streams { - info - .data_page_moments(stream_idx) - .write_to(&mut self.writer); + let delta_moments = info.page_moments(stream_idx).clone(); // write the final ANS state, moving it down the range [0, table_size) - let size_log = info.stream_configs[stream_idx].encoder.size_log(); - let final_state = decomposeds.ans_final_state(stream_idx); - self - .writer - .write_usize(final_state - (1 << size_log), size_log); + let ans_final_state = decomposeds.ans_final_state(stream_idx); + streams.push(PageStreamMetadata { + delta_moments, + ans_final_state, + }); } - - self.writer.finish_byte(); + let page_meta = PageMetadata { streams }; + let ans_size_logs = info + .stream_configs + .iter() + .map(|config| config.encoder.size_log()); + page_meta.write_to(ans_size_logs, &mut self.writer); match info.n_nontrivial_streams { 0 => write_decomposeds::<_, 0>( diff --git a/pco/src/base_decompressor.rs b/pco/src/base_decompressor.rs index 7f9e0a6f..7ca94d26 100644 --- a/pco/src/base_decompressor.rs +++ b/pco/src/base_decompressor.rs @@ -3,10 +3,10 @@ use std::io::Write; use crate::bit_reader::BitReader; use crate::bit_words::PaddedBytes; -use crate::body_decompressor::BodyDecompressor; -use crate::chunk_metadata::{ChunkMetadata, DataPageMetadata}; +use crate::chunk_metadata::{ChunkMetadata, PageMetadata}; use crate::constants::{MAGIC_CHUNK_BYTE, MAGIC_HEADER, MAGIC_TERMINATION_BYTE}; use crate::data_types::NumberLike; +use crate::page_decompressor::PageDecompressor; use crate::errors::{PcoError, PcoResult}; use crate::Flags; @@ -33,7 +33,7 @@ pub struct State { pub bit_idx: usize, pub flags: Option, pub chunk_meta: Option>, - pub body_decompressor: Option>, + pub page_decompressor: Option>, pub terminated: bool, } @@ -91,14 +91,14 @@ impl State { ChunkMetadata::::parse_from(reader, self.flags.as_ref().unwrap()).map(Some) } - pub fn new_body_decompressor( + pub fn new_page_decompressor( &self, reader: &mut BitReader, n: usize, compressed_page_size: usize, - ) -> PcoResult> { + ) -> PcoResult> { let start_bit_idx = reader.bit_idx(); - let res = self.new_body_decompressor_dirty(reader, n, compressed_page_size); + let res = self.new_page_decompressor_dirty(reader, n, compressed_page_size); if res.is_err() { reader.seek_to(start_bit_idx); @@ -106,16 +106,16 @@ impl State { res } - fn new_body_decompressor_dirty( + fn new_page_decompressor_dirty( &self, reader: &mut BitReader, n: usize, compressed_page_size: usize, - ) -> PcoResult> { + ) -> PcoResult> { let chunk_meta = self.chunk_meta.as_ref().unwrap(); let start_byte_idx = reader.aligned_byte_idx()?; - let data_page_meta = DataPageMetadata::parse_from(reader, chunk_meta)?; + let page_meta = PageMetadata::parse_from(reader, chunk_meta)?; let end_byte_idx = reader.aligned_byte_idx()?; let compressed_body_size = compressed_page_size @@ -124,11 +124,11 @@ impl State { PcoError::corruption("compressed page size {} is less than data page metadata size") })?; - BodyDecompressor::new( + PageDecompressor::new( n, compressed_body_size, chunk_meta, - data_page_meta, + page_meta, ) } @@ -139,10 +139,10 @@ impl State { Step::Terminated } else if self.chunk_meta.is_none() { Step::StartOfChunk - } else if self.body_decompressor.is_none() { - Step::StartOfDataPage + } else if self.page_decompressor.is_none() { + Step::StartOfPage } else { - Step::MidDataPage + Step::MidPage } } } @@ -151,8 +151,8 @@ impl State { pub enum Step { PreHeader, StartOfChunk, - StartOfDataPage, - MidDataPage, + StartOfPage, + MidPage, Terminated, } @@ -161,8 +161,8 @@ impl Step { let step_str = match self { Step::PreHeader => "has not yet parsed header", Step::StartOfChunk => "is at the start of a chunk", - Step::StartOfDataPage => "is at the start of a data page", - Step::MidDataPage => "is mid-data-page", + Step::StartOfPage => "is at the start of a data page", + Step::MidPage => "is mid-data-page", Step::Terminated => "has already parsed the footer", }; PcoError::invalid_argument(format!( @@ -228,18 +228,18 @@ impl BaseDecompressor { }) } - pub fn data_page_internal( + pub fn page_internal( &mut self, n: usize, compressed_page_size: usize, dest: &mut [T], ) -> PcoResult<()> { - let old_bd = self.state.body_decompressor.clone(); + let old_bd = self.state.page_decompressor.clone(); self.with_reader(|reader, state, _| { - let mut bd = state.new_body_decompressor(reader, n, compressed_page_size)?; + let mut bd = state.new_page_decompressor(reader, n, compressed_page_size)?; let res = bd.decompress(reader, true, dest); // we need to roll back the body decompressor if this failed - state.body_decompressor = if res.is_ok() { None } else { old_bd }; + state.page_decompressor = if res.is_ok() { None } else { old_bd }; res?; Ok(()) }) diff --git a/pco/src/num_decompressor.rs b/pco/src/batch_decompressor.rs similarity index 85% rename from pco/src/num_decompressor.rs rename to pco/src/batch_decompressor.rs index 404679f0..6befefe9 100644 --- a/pco/src/num_decompressor.rs +++ b/pco/src/batch_decompressor.rs @@ -6,7 +6,7 @@ use std::mem::MaybeUninit; use crate::bin::BinDecompressionInfo; use crate::bit_reader::BitReader; -use crate::chunk_metadata::DataPageMetadata; +use crate::chunk_metadata::PageMetadata; use crate::constants::{ Bitlen, DECOMPRESS_UNCHECKED_THRESHOLD, MAX_DELTA_ENCODING_ORDER, MAX_N_STREAMS, }; @@ -64,11 +64,9 @@ impl State { } } -pub trait NumDecompressor: Debug { +pub trait BatchDecompressor: Debug { fn bits_remaining(&self) -> usize; - fn initial_value_required(&self, stream_idx: usize) -> Option; - fn decompress_unsigneds( &mut self, reader: &mut BitReader, @@ -76,7 +74,7 @@ pub trait NumDecompressor: Debug { dst: &mut UnsignedDst, ) -> PcoResult; - fn clone_inner(&self) -> Box>; + fn clone_inner(&self) -> Box>; } #[derive(Clone, Debug)] @@ -85,9 +83,9 @@ struct StreamConfig { delta_order: usize, // only used to infer how many extra 0's are at the end } -// NumDecompressor does the main work of decoding bytes into NumberLikes +// BatchDecompressor does the main work of decoding bytes into UnsignedLikes #[derive(Clone, Debug)] -struct NumDecompressorImpl, const STREAMS: usize> { +struct BatchDecompressorImpl, const STREAMS: usize> { // known information about the chunk n: usize, compressed_body_size: usize, @@ -100,11 +98,11 @@ struct NumDecompressorImpl, const STREAMS: usiz state: State, } -struct NumDecompressorInputs<'a, U: UnsignedLike> { +struct BatchDecompressorInputs<'a, U: UnsignedLike> { n: usize, compressed_body_size: usize, chunk_meta: &'a ChunkMetadata, - data_page_meta: DataPageMetadata, + page_meta: PageMetadata, max_bits_per_num_block: Bitlen, initial_values_required: [Option; MAX_N_STREAMS], } @@ -114,8 +112,8 @@ pub fn new( n: usize, compressed_body_size: usize, chunk_meta: &ChunkMetadata, - data_page_meta: DataPageMetadata, -) -> PcoResult>> { + page_meta: PageMetadata, +) -> PcoResult>> { let mut max_bits_per_num_block = 0; for stream in &chunk_meta.streams { max_bits_per_num_block += stream @@ -139,39 +137,35 @@ pub fn new( .and_then(|stream_meta| stream_meta.bins.get(0)) .map(|only_bin| only_bin.lower); } - let inputs = NumDecompressorInputs { + let inputs = BatchDecompressorInputs { n, compressed_body_size, chunk_meta, - data_page_meta, + page_meta, max_bits_per_num_block, initial_values_required, }; - let res: Box> = match (needs_gcd, n_streams) { - (false, 0) => Box::new(NumDecompressorImpl::::new(inputs)?), - (false, 1) => Box::new(NumDecompressorImpl::::new(inputs)?), - (true, 1) => Box::new(NumDecompressorImpl::::new( + let res: Box> = match (needs_gcd, n_streams) { + (false, 0) => Box::new(BatchDecompressorImpl::::new(inputs)?), + (false, 1) => Box::new(BatchDecompressorImpl::::new(inputs)?), + (true, 1) => Box::new(BatchDecompressorImpl::::new( inputs, )?), - (false, 2) => Box::new(NumDecompressorImpl::::new(inputs)?), + (false, 2) => Box::new(BatchDecompressorImpl::::new(inputs)?), _ => panic!("unknown decompression implementation; should be unreachable"), }; Ok(res) } -impl, const STREAMS: usize> NumDecompressor - for NumDecompressorImpl +impl, const STREAMS: usize> BatchDecompressor + for BatchDecompressorImpl { fn bits_remaining(&self) -> usize { self.compressed_body_size * 8 - self.state.bits_processed } - fn initial_value_required(&self, stream_idx: usize) -> Option { - self.initial_values_required[stream_idx] - } - // If hits a corruption, it returns an error and leaves reader and self unchanged. // State managed here: n_processed, bits_processed fn decompress_unsigneds( @@ -209,32 +203,32 @@ impl, const STREAMS: usize> NumDecompressor res } - fn clone_inner(&self) -> Box> { + fn clone_inner(&self) -> Box> { Box::new(self.clone()) } } -impl, const STREAMS: usize> NumDecompressorImpl { - fn new(inputs: NumDecompressorInputs) -> PcoResult { - let NumDecompressorInputs { +impl, const STREAMS: usize> BatchDecompressorImpl { + fn new(inputs: BatchDecompressorInputs) -> PcoResult { + let BatchDecompressorInputs { n, compressed_body_size, chunk_meta, - data_page_meta, + page_meta, max_bits_per_num_block, initial_values_required, } = inputs; let mut decoders: [MaybeUninit; STREAMS] = unsafe { MaybeUninit::uninit().assume_init() }; - let delta_orders = data_page_meta + let delta_orders = page_meta .streams .iter() .map(|stream| stream.delta_moments.order()) .collect::>(); for stream_idx in 0..STREAMS { let chunk_stream = &chunk_meta.streams[stream_idx]; - let page_stream = &data_page_meta.streams[stream_idx]; + let page_stream = &page_meta.streams[stream_idx]; let delta_order = delta_orders[stream_idx]; if chunk_stream.bins.is_empty() && n > delta_order { @@ -335,6 +329,14 @@ impl, const STREAMS: usize> NumDecompressorImpl res } + fn fill_dst_if_needed(&self, dst: &mut UnsignedDst) { + for (stream_idx, maybe_initial_value) in self.initial_values_required.iter().enumerate() { + if let Some(initial_value) = maybe_initial_value { + dst.stream(stream_idx).fill(*initial_value); + } + } + } + #[inline(never)] fn decompress_unsigneds_dirty( &mut self, @@ -355,6 +357,8 @@ impl, const STREAMS: usize> NumDecompressorImpl }); } + self.fill_dst_if_needed(dst); + let mark_insufficient = |dst: &mut UnsignedDst, e: PcoError| { if error_on_insufficient_data { Err(e) @@ -402,7 +406,7 @@ impl, const STREAMS: usize> NumDecompressorImpl } } -impl Clone for Box> { +impl Clone for Box> { fn clone(&self) -> Self { self.clone_inner() } diff --git a/pco/src/chunk_metadata.rs b/pco/src/chunk_metadata.rs index 01cd0a33..c31f2407 100644 --- a/pco/src/chunk_metadata.rs +++ b/pco/src/chunk_metadata.rs @@ -50,18 +50,21 @@ impl ChunkStreamMetadata { } #[derive(Clone, Debug)] -pub struct DataPageStreamMetadata { +pub struct PageStreamMetadata { pub delta_moments: DeltaMoments, pub ans_final_state: usize, } -impl DataPageStreamMetadata { - // pub fn write_to(&self, ans_size_log: Bitlen, writer: &mut BitWriter) { - // self.delta_moments.write_to(writer); - // - // // write the final ANS state, moving it down the range [0, table_size) - // writer.write_usize(self.ans_final_state - (1 << ans_size_log), ans_size_log); - // } +impl PageStreamMetadata { + pub fn write_to(&self, ans_size_log: Bitlen, writer: &mut BitWriter) { + self.delta_moments.write_to(writer); + + // write the final ANS state, moving it down the range [0, table_size) + writer.write_usize( + self.ans_final_state - (1 << ans_size_log), + ans_size_log, + ); + } pub fn parse_from( reader: &mut BitReader, @@ -113,22 +116,22 @@ pub struct ChunkMetadata { // chunk metadata parsing step (standalone mode) OR from the wrapping format // (wrapped mode). #[derive(Clone, Debug)] -pub struct DataPageMetadata { - pub streams: Vec>, +pub struct PageMetadata { + pub streams: Vec>, } -impl DataPageMetadata { - // pub fn write_to(&self, chunk_meta: &ChunkMetadata, writer: &mut BitWriter) { - // for (stream_idx, stream_meta) in chunk_meta.streams.iter().enumerate() { - // self.streams[stream_idx].write_to(stream_meta.ans_size_log, writer); - // } - // writer.finish_byte(); - // } +impl PageMetadata { + pub fn write_to>(&self, ans_size_logs: I, writer: &mut BitWriter) { + for (stream_idx, ans_size_log) in ans_size_logs.enumerate() { + self.streams[stream_idx].write_to(ans_size_log, writer); + } + writer.finish_byte(); + } pub fn parse_from(reader: &mut BitReader, chunk_meta: &ChunkMetadata) -> PcoResult { let mut streams = Vec::with_capacity(chunk_meta.streams.len()); for (stream_idx, stream_meta) in chunk_meta.streams.iter().enumerate() { - streams.push(DataPageStreamMetadata::parse_from( + streams.push(PageStreamMetadata::parse_from( reader, chunk_meta.stream_delta_order(stream_idx), stream_meta.ans_size_log, @@ -352,7 +355,7 @@ impl ChunkMetadata { #[derive(Clone, Debug, Default)] #[non_exhaustive] -pub enum DataPagingSpec { +pub enum PagingSpec { #[default] SinglePage, ExactPageSizes(Vec), diff --git a/pco/src/chunk_spec.rs b/pco/src/chunk_spec.rs index df01e400..0d15349e 100644 --- a/pco/src/chunk_spec.rs +++ b/pco/src/chunk_spec.rs @@ -1,4 +1,4 @@ -use crate::chunk_metadata::DataPagingSpec; +use crate::chunk_metadata::PagingSpec; use crate::errors::{PcoError, PcoResult}; /// A specification for how many elements there will be in each of a chunk's @@ -11,7 +11,7 @@ use crate::errors::{PcoError, PcoResult}; /// reasons. #[derive(Clone, Debug, Default)] pub struct ChunkSpec { - data_paging_spec: DataPagingSpec, + paging_spec: PagingSpec, } impl ChunkSpec { @@ -25,14 +25,14 @@ impl ChunkSpec { /// ``` /// can only be used if the chunk actually contains 1+2+3=6 numbers. pub fn with_page_sizes(mut self, sizes: Vec) -> Self { - self.data_paging_spec = DataPagingSpec::ExactPageSizes(sizes); + self.paging_spec = PagingSpec::ExactPageSizes(sizes); self } pub(crate) fn page_sizes(&self, n: usize) -> PcoResult> { - let page_sizes = match &self.data_paging_spec { - DataPagingSpec::SinglePage => Ok(vec![n]), - DataPagingSpec::ExactPageSizes(sizes) => { + let page_sizes = match &self.paging_spec { + PagingSpec::SinglePage => Ok(vec![n]), + PagingSpec::ExactPageSizes(sizes) => { let sizes_n: usize = sizes.iter().sum(); if sizes_n == n { Ok(sizes.clone()) diff --git a/pco/src/delta_encoding.rs b/pco/src/delta_encoding.rs index 61cdd8c5..ee99fc95 100644 --- a/pco/src/delta_encoding.rs +++ b/pco/src/delta_encoding.rs @@ -56,23 +56,20 @@ fn first_order_deltas_in_place(dest: &mut Vec) { pub fn nth_order_deltas( unsigneds: &mut Vec, order: usize, - data_page_idxs: &[usize], + page_idxs: &[usize], ) -> Vec> { if order == 0 { - return data_page_idxs - .iter() - .map(|_| DeltaMoments::default()) - .collect(); + return page_idxs.iter().map(|_| DeltaMoments::default()).collect(); } - let mut data_page_moments = vec![Vec::with_capacity(order); data_page_idxs.len()]; + let mut page_moments = vec![Vec::with_capacity(order); page_idxs.len()]; for _ in 0..order { - for (page_idx, &i) in data_page_idxs.iter().enumerate() { - data_page_moments[page_idx].push(unsigneds.get(i).copied().unwrap_or(U::ZERO)); + for (page_idx, &i) in page_idxs.iter().enumerate() { + page_moments[page_idx].push(unsigneds.get(i).copied().unwrap_or(U::ZERO)); } first_order_deltas_in_place(unsigneds); } - let moments = data_page_moments + let moments = page_moments .into_iter() .map(DeltaMoments::new) .collect::>(); diff --git a/pco/src/float_mult_utils.rs b/pco/src/float_mult_utils.rs index aa42f84c..bde4e123 100644 --- a/pco/src/float_mult_utils.rs +++ b/pco/src/float_mult_utils.rs @@ -6,7 +6,7 @@ use crate::data_types::{FloatLike, NumberLike, UnsignedLike}; use crate::delta_encoding; use crate::unsigned_src_dst::{StreamSrc, UnsignedDst}; -// BodyDecompressor is already doing batching +// PageDecompressor is already doing batching, so we don't need to here pub fn join_streams(base: U::Float, dst: UnsignedDst) { let (unsigneds, adjustments) = dst.decompose(); delta_encoding::toggle_center_deltas_in_place(adjustments); diff --git a/pco/src/lib.rs b/pco/src/lib.rs index 42927752..b3ce45de 100644 --- a/pco/src/lib.rs +++ b/pco/src/lib.rs @@ -26,13 +26,13 @@ mod ans; mod auto; mod base_compressor; mod base_decompressor; +mod batch_decompressor; mod bin; mod bin_optimization; mod bit_reader; mod bit_words; mod bit_writer; mod bits; -mod body_decompressor; mod chunk_metadata; mod chunk_spec; mod compression_table; @@ -41,7 +41,7 @@ mod delta_encoding; mod flags; mod float_mult_utils; mod modes; -mod num_decompressor; +mod page_decompressor; mod progress; mod unsigned_src_dst; diff --git a/pco/src/body_decompressor.rs b/pco/src/page_decompressor.rs similarity index 72% rename from pco/src/body_decompressor.rs rename to pco/src/page_decompressor.rs index 4dc25076..9c8d32c4 100644 --- a/pco/src/body_decompressor.rs +++ b/pco/src/page_decompressor.rs @@ -1,24 +1,24 @@ use std::cmp::min; use std::marker::PhantomData; +use crate::batch_decompressor::BatchDecompressor; use crate::bit_reader::BitReader; -use crate::chunk_metadata::DataPageMetadata; +use crate::chunk_metadata::PageMetadata; use crate::constants::UNSIGNED_BATCH_SIZE; use crate::data_types::{NumberLike, UnsignedLike}; use crate::delta_encoding::DeltaMoments; use crate::errors::PcoResult; -use crate::num_decompressor::NumDecompressor; use crate::progress::Progress; use crate::unsigned_src_dst::UnsignedDst; +use crate::{batch_decompressor, Mode}; use crate::{delta_encoding, float_mult_utils, ChunkMetadata}; -use crate::{num_decompressor, Mode}; -// BodyDecompressor wraps NumDecompressor and handles reconstruction from +// PageDecompressor wraps BatchDecompressor and handles reconstruction from // delta encoding. #[derive(Clone, Debug)] -pub struct BodyDecompressor { +pub struct PageDecompressor { mode: Mode, - num_decompressor: Box>, + batch_decompressor: Box>, delta_momentss: Vec>, // one per stream secondary_stream: [T::Unsigned; UNSIGNED_BATCH_SIZE], phantom: PhantomData, @@ -39,28 +39,28 @@ fn join_streams(mode: Mode, dst: UnsignedDst) { } } -impl BodyDecompressor { +impl PageDecompressor { pub(crate) fn new( n: usize, compressed_body_size: usize, chunk_meta: &ChunkMetadata, - data_page_meta: DataPageMetadata, + page_meta: PageMetadata, ) -> PcoResult { - let delta_momentss = data_page_meta + let delta_momentss = page_meta .streams .iter() .map(|stream| stream.delta_moments.clone()) .collect(); - let num_decompressor = num_decompressor::new( + let batch_decompressor = batch_decompressor::new( n, compressed_body_size, chunk_meta, - data_page_meta, + page_meta, )?; Ok(Self { // we don't store the whole ChunkMeta because it can get large due to bins mode: chunk_meta.mode, - num_decompressor, + batch_decompressor, delta_momentss, secondary_stream: [T::Unsigned::default(); UNSIGNED_BATCH_SIZE], phantom: PhantomData, @@ -77,29 +77,15 @@ impl BodyDecompressor { let batch_end = min(UNSIGNED_BATCH_SIZE, num_dst.len()); let unsigneds_mut = T::transmute_to_unsigned_slice(&mut num_dst[..batch_end]); let Self { - num_decompressor, + batch_decompressor, delta_momentss, .. } = self; - let n_streams = self.mode.n_streams(); - - if let Some(initial_value_required) = num_decompressor.initial_value_required(0) { - unsigneds_mut.fill(initial_value_required); - } - if let Some(initial_value_required) = num_decompressor.initial_value_required(1) { - self.secondary_stream.fill(initial_value_required); - } let progress = { let mut u_dst = UnsignedDst::new(unsigneds_mut, &mut self.secondary_stream); - for stream_idx in 0..n_streams { - if let Some(initial_value) = num_decompressor.initial_value_required(stream_idx) { - u_dst.stream(stream_idx).fill(initial_value); - } - } - - let progress = num_decompressor.decompress_unsigneds( + let progress = batch_decompressor.decompress_unsigneds( reader, error_on_insufficient_data, &mut u_dst, @@ -143,6 +129,6 @@ impl BodyDecompressor { } pub fn bits_remaining(&self) -> usize { - self.num_decompressor.bits_remaining() + self.batch_decompressor.bits_remaining() } } diff --git a/pco/src/standalone/compressor.rs b/pco/src/standalone/compressor.rs index cf0d6dc1..c5408c03 100644 --- a/pco/src/standalone/compressor.rs +++ b/pco/src/standalone/compressor.rs @@ -78,7 +78,7 @@ impl Compressor { .chunk_metadata_internal(nums, &ChunkSpec::default())?; let post_meta_byte_idx = self.0.writer.byte_size(); - self.0.data_page_internal()?; + self.0.page_internal()?; meta.compressed_body_size = self.0.writer.byte_size() - post_meta_byte_idx; meta.update_write_compressed_body_size(&mut self.0.writer, pre_meta_bit_idx); diff --git a/pco/src/standalone/decompressor.rs b/pco/src/standalone/decompressor.rs index 872d93e2..be20600c 100644 --- a/pco/src/standalone/decompressor.rs +++ b/pco/src/standalone/decompressor.rs @@ -2,9 +2,9 @@ use std::io::Write; use crate::base_decompressor::{BaseDecompressor, State, Step}; use crate::bit_reader::BitReader; -use crate::body_decompressor::BodyDecompressor; use crate::data_types::NumberLike; use crate::errors::{ErrorKind, PcoError, PcoResult}; +use crate::page_decompressor::PageDecompressor; use crate::progress::Progress; use crate::{ChunkMetadata, DecompressorConfig, Flags}; @@ -101,11 +101,11 @@ impl Decompressor { /// or runs out of data. pub fn skip_chunk_body(&mut self) -> PcoResult<()> { self.0.state.check_step_among( - &[Step::StartOfDataPage, Step::MidDataPage], + &[Step::StartOfPage, Step::MidPage], "skip chunk body", )?; - let bits_remaining = match &self.0.state.body_decompressor { + let bits_remaining = match &self.0.state.page_decompressor { Some(bd) => bd.bits_remaining(), None => { let meta = self.0.state.chunk_meta.as_ref().unwrap(); @@ -117,7 +117,7 @@ impl Decompressor { if skipped_bit_idx <= self.0.words.total_bits() { self.0.state.bit_idx = skipped_bit_idx; self.0.state.chunk_meta = None; - self.0.state.body_decompressor = None; + self.0.state.page_decompressor = None; Ok(()) } else { Err(PcoError::insufficient_data(format!( @@ -136,13 +136,13 @@ impl Decompressor { self .0 .state - .check_step(Step::StartOfDataPage, "read chunk body")?; + .check_step(Step::StartOfPage, "read chunk body")?; let &ChunkMetadata { n, compressed_body_size, .. } = self.0.state.chunk_meta.as_ref().unwrap(); - self.0.data_page_internal(n, compressed_body_size, dest)?; + self.0.page_internal(n, compressed_body_size, dest)?; self.0.state.chunk_meta = None; Ok(()) } @@ -164,7 +164,7 @@ impl Decompressor { fn next_nums_dirty( reader: &mut BitReader, - bd: &mut BodyDecompressor, + bd: &mut PageDecompressor, dest: &mut [T], ) -> PcoResult { bd.decompress(reader, false, dest) @@ -180,7 +180,7 @@ fn apply_nums( } else { if progress.finished_body { state.chunk_meta = None; - state.body_decompressor = None; + state.page_decompressor = None; } Some(DecompressedItem::Numbers( dest[..progress.n_processed].to_vec(), @@ -205,13 +205,13 @@ impl Iterator for &mut Decompressor { Err(e) if matches!(e.kind, ErrorKind::InsufficientData) => Ok(None), Err(e) => Err(e), }, - Step::StartOfDataPage => self.0.with_reader(|reader, state, config| { + Step::StartOfPage => self.0.with_reader(|reader, state, config| { let &ChunkMetadata { n, compressed_body_size, .. } = state.chunk_meta.as_ref().unwrap(); - let maybe_bd = state.new_body_decompressor(reader, n, compressed_body_size); + let maybe_bd = state.new_page_decompressor(reader, n, compressed_body_size); if let Err(e) = &maybe_bd { if matches!(e.kind, ErrorKind::InsufficientData) { return Ok(None); @@ -220,14 +220,14 @@ impl Iterator for &mut Decompressor { let mut bd = maybe_bd?; let mut dest = vec![T::default(); config.numbers_limit_per_item]; let progress = next_nums_dirty(reader, &mut bd, &mut dest)?; - state.body_decompressor = Some(bd); + state.page_decompressor = Some(bd); Ok(apply_nums(state, dest, progress)) }), - Step::MidDataPage => self.0.with_reader(|reader, state, config| { + Step::MidPage => self.0.with_reader(|reader, state, config| { let mut dest = vec![T::default(); config.numbers_limit_per_item]; let progress = next_nums_dirty( reader, - state.body_decompressor.as_mut().unwrap(), + state.page_decompressor.as_mut().unwrap(), &mut dest, )?; Ok(apply_nums(state, dest, progress)) diff --git a/pco/src/tests/utils.rs b/pco/src/tests/utils.rs index 782ede06..c0a4c725 100644 --- a/pco/src/tests/utils.rs +++ b/pco/src/tests/utils.rs @@ -44,7 +44,7 @@ pub fn wrapped_compress( res.extend(meta); for size in sizes { - compressor.data_page()?; + compressor.page()?; let page = compressor.drain_bytes(); res.extend(encode_usize(page.len())); res.extend(encode_usize(size)); @@ -88,7 +88,7 @@ pub fn wrapped_decompress( res.reserve(size); unsafe { res.set_len(res.len() + size) }; decompressor.write_all(&buf[..page_len]).unwrap(); - decompressor.data_page(size, page_len, &mut res[i..])?; + decompressor.page(size, page_len, &mut res[i..])?; i += size; decompressor.free_compressed_memory(); buf = &mut buf[page_len..]; diff --git a/pco/src/wrapped/compressor.rs b/pco/src/wrapped/compressor.rs index c2d744f7..99994ac2 100644 --- a/pco/src/wrapped/compressor.rs +++ b/pco/src/wrapped/compressor.rs @@ -74,8 +74,8 @@ impl Compressor { /// [`.chunk_metadata`][Self::chunk_metadata]. /// Will return an error if the compressor is not at the start of a data /// page in the middle of a chunk. - pub fn data_page(&mut self) -> PcoResult<()> { - self.0.data_page_internal() + pub fn page(&mut self) -> PcoResult<()> { + self.0.page_internal() } /// Returns all bytes produced by the compressor so far that have not yet diff --git a/pco/src/wrapped/decompressor.rs b/pco/src/wrapped/decompressor.rs index cbf01424..3f01e24a 100644 --- a/pco/src/wrapped/decompressor.rs +++ b/pco/src/wrapped/decompressor.rs @@ -45,7 +45,7 @@ impl Decompressor { /// reading all data pages from the preceding chunk. pub fn chunk_metadata(&mut self) -> PcoResult> { self.0.state.check_step_among( - &[Step::StartOfChunk, Step::StartOfDataPage, Step::MidDataPage], + &[Step::StartOfChunk, Step::StartOfPage, Step::MidPage], "read chunk metadata", )?; @@ -53,7 +53,7 @@ impl Decompressor { let meta = ChunkMetadata::::parse_from(reader, state.flags.as_ref().unwrap())?; state.chunk_meta = Some(meta.clone()); - state.body_decompressor = None; + state.page_decompressor = None; Ok(meta) }) } @@ -65,14 +65,14 @@ impl Decompressor { /// /// This can be used regardless of whether the decompressor has finished /// reading the previous data page. - pub fn begin_data_page(&mut self, n: usize, compressed_page_size: usize) -> PcoResult<()> { + pub fn begin_page(&mut self, n: usize, compressed_page_size: usize) -> PcoResult<()> { self.0.state.check_step_among( - &[Step::StartOfDataPage, Step::MidDataPage], + &[Step::StartOfPage, Step::MidPage], "begin data page", )?; self.0.with_reader(|reader, state, _| { - state.body_decompressor = - Some(state.new_body_decompressor(reader, n, compressed_page_size)?); + state.page_decompressor = + Some(state.new_page_decompressor(reader, n, compressed_page_size)?); Ok(()) }) } @@ -81,15 +81,12 @@ impl Decompressor { /// Will return an error if the decompressor is not in a data page, /// it runs out of data, or any corruptions are found. pub fn next_batch(&mut self, dest: &mut [T]) -> PcoResult<()> { - self - .0 - .state - .check_step(Step::MidDataPage, "read next batch")?; + self.0.state.check_step(Step::MidPage, "read next batch")?; self.0.with_reader(|reader, state, _| { - let bd = state.body_decompressor.as_mut().unwrap(); + let bd = state.page_decompressor.as_mut().unwrap(); let batch_res = bd.decompress(reader, true, dest)?; if batch_res.finished_body { - state.body_decompressor = None; + state.page_decompressor = None; } Ok(()) }) @@ -99,19 +96,14 @@ impl Decompressor { /// Will return an error if the decompressor is not in a chunk, /// it runs out of data, or any corruptions are found. /// - /// This is similar to calling [`.begin_data_page`][Self::begin_data_page] and then + /// This is similar to calling [`.begin_page`][Self::begin_page] and then /// [`.next_batch(usize::MAX)`][Self::next_batch]. - pub fn data_page( - &mut self, - n: usize, - compressed_page_size: usize, - dest: &mut [T], - ) -> PcoResult<()> { + pub fn page(&mut self, n: usize, compressed_page_size: usize, dest: &mut [T]) -> PcoResult<()> { self.0.state.check_step_among( - &[Step::StartOfDataPage, Step::MidDataPage], + &[Step::StartOfPage, Step::MidPage], "data page", )?; - self.0.data_page_internal(n, compressed_page_size, dest) + self.0.page_internal(n, compressed_page_size, dest) } /// Frees memory used for storing compressed bytes the decompressor has @@ -125,7 +117,7 @@ impl Decompressor { /// As an example, if you want to want to read the first 5 numbers from each /// data page, you might write each compressed data page to the decompressor, /// then repeatedly call - /// [`.begin_data_page`][Self::begin_data_page], + /// [`.begin_page`][Self::begin_page], /// [`.next_nums`][Self::next_batch], and /// this method. pub fn clear_compressed_bytes(&mut self) {