Skip to content

Commit

Permalink
Misc renames and cleanup (#107)
Browse files Browse the repository at this point in the history
  • Loading branch information
mwlon authored Jul 18, 2023
1 parent c59366e commit e9876ce
Show file tree
Hide file tree
Showing 16 changed files with 162 additions and 181 deletions.
4 changes: 2 additions & 2 deletions pco/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ fn main() {
To run something right away, try
[the benchmarks](../bench/README.md).

For a lower-level standalone API that allows writing/reading one chunk at a time and
extracting all metadata, see [the docs.rs documentation](https://docs.rs/pco/latest/pco/).
For a lower-level standalone API that allows writing one chunk at a time /
streaming reads, see [the docs.rs documentation](https://docs.rs/pco/latest/pco/).

## Usage as a Wrapped Format

Expand Down
8 changes: 2 additions & 6 deletions pco/src/ans/decoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,17 +60,13 @@ impl Decoder {
#[inline]
pub fn unchecked_decode(&mut self, reader: &mut BitReader) -> Token {
let node = &self.nodes[self.state - self.table_size];
let bits_read = reader.unchecked_read_uint::<usize>(node.bits_to_read);
let next_state = node.next_state_base + bits_read;
self.state = next_state;
self.state = node.next_state_base + reader.unchecked_read_uint::<usize>(node.bits_to_read);
node.token
}

pub fn decode(&mut self, reader: &mut BitReader) -> PcoResult<Token> {
let node = &self.nodes[self.state - self.table_size];
let bits_read = reader.read_small(node.bits_to_read)?;
let next_state = node.next_state_base + bits_read;
self.state = next_state;
self.state = node.next_state_base + reader.read_small(node.bits_to_read)?;
Ok(node.token)
}
}
29 changes: 16 additions & 13 deletions pco/src/base_compressor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::fmt::Debug;

use crate::bin::{Bin, BinCompressionInfo};
use crate::bit_writer::BitWriter;
use crate::chunk_metadata::{ChunkMetadata, ChunkStreamMetadata};
use crate::chunk_metadata::{ChunkMetadata, ChunkStreamMetadata, PageMetadata, PageStreamMetadata};
use crate::chunk_spec::ChunkSpec;
use crate::compression_table::CompressionTable;
use crate::constants::*;
Expand Down Expand Up @@ -454,7 +454,7 @@ pub struct MidChunkInfo<U: UnsignedLike> {
}

impl<U: UnsignedLike> MidChunkInfo<U> {
fn data_page_moments(&self, stream_idx: usize) -> &DeltaMoments<U> {
fn page_moments(&self, stream_idx: usize) -> &DeltaMoments<U> {
&self.stream_configs[stream_idx].delta_momentss[self.page_idx]
}

Expand Down Expand Up @@ -660,28 +660,31 @@ impl<T: NumberLike> BaseCompressor<T> {
Ok(meta)
}

pub fn data_page_internal(&mut self) -> PcoResult<()> {
pub fn page_internal(&mut self) -> PcoResult<()> {
let info = match &mut self.state {
State::MidChunk(info) => Ok(info),
other => Err(other.wrong_step_err("data page")),
}?;

let decomposeds = decompose_unsigneds(info)?;

let mut streams = Vec::with_capacity(info.n_streams);
for stream_idx in 0..info.n_streams {
info
.data_page_moments(stream_idx)
.write_to(&mut self.writer);
let delta_moments = info.page_moments(stream_idx).clone();

// write the final ANS state, moving it down the range [0, table_size)
let size_log = info.stream_configs[stream_idx].encoder.size_log();
let final_state = decomposeds.ans_final_state(stream_idx);
self
.writer
.write_usize(final_state - (1 << size_log), size_log);
let ans_final_state = decomposeds.ans_final_state(stream_idx);
streams.push(PageStreamMetadata {
delta_moments,
ans_final_state,
});
}

self.writer.finish_byte();
let page_meta = PageMetadata { streams };
let ans_size_logs = info
.stream_configs
.iter()
.map(|config| config.encoder.size_log());
page_meta.write_to(ans_size_logs, &mut self.writer);

match info.n_nontrivial_streams {
0 => write_decomposeds::<_, 0>(
Expand Down
44 changes: 22 additions & 22 deletions pco/src/base_decompressor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ use std::io::Write;

use crate::bit_reader::BitReader;
use crate::bit_words::PaddedBytes;
use crate::body_decompressor::BodyDecompressor;
use crate::chunk_metadata::{ChunkMetadata, DataPageMetadata};
use crate::chunk_metadata::{ChunkMetadata, PageMetadata};
use crate::constants::{MAGIC_CHUNK_BYTE, MAGIC_HEADER, MAGIC_TERMINATION_BYTE};
use crate::data_types::NumberLike;
use crate::page_decompressor::PageDecompressor;

use crate::errors::{PcoError, PcoResult};
use crate::Flags;
Expand All @@ -33,7 +33,7 @@ pub struct State<T: NumberLike> {
pub bit_idx: usize,
pub flags: Option<Flags>,
pub chunk_meta: Option<ChunkMetadata<T::Unsigned>>,
pub body_decompressor: Option<BodyDecompressor<T>>,
pub page_decompressor: Option<PageDecompressor<T>>,
pub terminated: bool,
}

Expand Down Expand Up @@ -91,31 +91,31 @@ impl<T: NumberLike> State<T> {
ChunkMetadata::<T::Unsigned>::parse_from(reader, self.flags.as_ref().unwrap()).map(Some)
}

pub fn new_body_decompressor(
pub fn new_page_decompressor(
&self,
reader: &mut BitReader,
n: usize,
compressed_page_size: usize,
) -> PcoResult<BodyDecompressor<T>> {
) -> PcoResult<PageDecompressor<T>> {
let start_bit_idx = reader.bit_idx();
let res = self.new_body_decompressor_dirty(reader, n, compressed_page_size);
let res = self.new_page_decompressor_dirty(reader, n, compressed_page_size);

if res.is_err() {
reader.seek_to(start_bit_idx);
}
res
}

fn new_body_decompressor_dirty(
fn new_page_decompressor_dirty(
&self,
reader: &mut BitReader,
n: usize,
compressed_page_size: usize,
) -> PcoResult<BodyDecompressor<T>> {
) -> PcoResult<PageDecompressor<T>> {
let chunk_meta = self.chunk_meta.as_ref().unwrap();

let start_byte_idx = reader.aligned_byte_idx()?;
let data_page_meta = DataPageMetadata::parse_from(reader, chunk_meta)?;
let page_meta = PageMetadata::parse_from(reader, chunk_meta)?;
let end_byte_idx = reader.aligned_byte_idx()?;

let compressed_body_size = compressed_page_size
Expand All @@ -124,11 +124,11 @@ impl<T: NumberLike> State<T> {
PcoError::corruption("compressed page size {} is less than data page metadata size")
})?;

BodyDecompressor::new(
PageDecompressor::new(
n,
compressed_body_size,
chunk_meta,
data_page_meta,
page_meta,
)
}

Expand All @@ -139,10 +139,10 @@ impl<T: NumberLike> State<T> {
Step::Terminated
} else if self.chunk_meta.is_none() {
Step::StartOfChunk
} else if self.body_decompressor.is_none() {
Step::StartOfDataPage
} else if self.page_decompressor.is_none() {
Step::StartOfPage
} else {
Step::MidDataPage
Step::MidPage
}
}
}
Expand All @@ -151,8 +151,8 @@ impl<T: NumberLike> State<T> {
pub enum Step {
PreHeader,
StartOfChunk,
StartOfDataPage,
MidDataPage,
StartOfPage,
MidPage,
Terminated,
}

Expand All @@ -161,8 +161,8 @@ impl Step {
let step_str = match self {
Step::PreHeader => "has not yet parsed header",
Step::StartOfChunk => "is at the start of a chunk",
Step::StartOfDataPage => "is at the start of a data page",
Step::MidDataPage => "is mid-data-page",
Step::StartOfPage => "is at the start of a data page",
Step::MidPage => "is mid-data-page",
Step::Terminated => "has already parsed the footer",
};
PcoError::invalid_argument(format!(
Expand Down Expand Up @@ -228,18 +228,18 @@ impl<T: NumberLike> BaseDecompressor<T> {
})
}

pub fn data_page_internal(
pub fn page_internal(
&mut self,
n: usize,
compressed_page_size: usize,
dest: &mut [T],
) -> PcoResult<()> {
let old_bd = self.state.body_decompressor.clone();
let old_bd = self.state.page_decompressor.clone();
self.with_reader(|reader, state, _| {
let mut bd = state.new_body_decompressor(reader, n, compressed_page_size)?;
let mut bd = state.new_page_decompressor(reader, n, compressed_page_size)?;
let res = bd.decompress(reader, true, dest);
// we need to roll back the body decompressor if this failed
state.body_decompressor = if res.is_ok() { None } else { old_bd };
state.page_decompressor = if res.is_ok() { None } else { old_bd };
res?;
Ok(())
})
Expand Down
Loading

0 comments on commit e9876ce

Please sign in to comment.