Skip to content

Commit

Permalink
benchmarks and cli dependencies
Browse files Browse the repository at this point in the history
  • Loading branch information
mwlon committed Jul 8, 2023
1 parent 4b35004 commit 41145fd
Show file tree
Hide file tree
Showing 9 changed files with 595 additions and 235 deletions.
756 changes: 560 additions & 196 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,4 @@ members = [
"pco_cli",
"quantile-compression/ffi",
"quantile-compression/q_compress",
"quantile-compression/q_compress_cli",
]
2 changes: 1 addition & 1 deletion bench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@ pco = {path = "../pco" }
q_compress = {path = "../quantile-compression/q_compress" }
structopt = "0.3.26"
tabled = "0.12.2"
zstd = "0.10"
zstd = "0.12"
18 changes: 8 additions & 10 deletions bench/benchmarks.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
All figured reported here are calculated using a single thread on a
2.8GHz i5 CPU, operating on in-memory data, using Rust 1.66.
2.8GHz i5 CPU, operating on in-memory data, using Rust 1.70.
Benchmarks were done by averaging 100 runs on a dataset of 1M numbers
with `compression_level` 8.

Expand All @@ -8,15 +8,13 @@ per second with 2 significant figures.
Compression ratio is reported with 3 significant figures.
For the `i64` heavy-tail integers, a Lomax distribution with alpha parameter 0.5 and median 1000 was used.

| dataset | compression speed / (million/s) | decompression speed / (million/s) | compression ratio |
|--------------------------------|---------------------------------|-----------------------------------|-------------------|
| `i64` constant | 62 | 480 | 216,000 |
| `i64` sparse | 77 | 290 | 597 |
| `i64` uniform (incompressible) | 14 | 68 | 1.00 |
| `i64` heavy-tail integers | 14 | 48 | 4.63 |
| `f64` standard normal | 11 | 40 | 1.15 |
| `f64` slow cosine | 13 | 40 | 4.36 |
| `TimestampMicros` millis | 11 | 47 | 2.14 |
| dataset | compression speed / (million/s) | decompression speed / (million/s) | compression ratio |
|--------------------|---------------------------------|-----------------------------------|-------------------|
| `f64_decimal` | 10 | 63 | 4.67 |
| `f64_slow_cosine` | 13 | 91 | 4.35 |
| `i64_lomax05_long` | 14 | 140 | 4.62 |
| `i64_sparse` | 36 | 220 | 792 |
| `micros_millis` | 11 | 120 | 2.08 |

`i64` and `f64` are each 8 bytes, so for the more interesting distributions
(e.g. heavy-tail integers and standard normal),
Expand Down
7 changes: 4 additions & 3 deletions pco_cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ path = "src/main.rs"

[dependencies]
anyhow = "1.0.53"
arrow = {version = "9.0.2", features = ["csv"], default-features=false}
parquet = {version = "9.0.2", features = ["arrow", "base64", "snap", "zstd"], default-features=false}
arrow = {version = "43.0.0", features = ["csv"], default-features=false}
num-complex = "0.4.3"
parquet = {version = "43.0.0", features = ["arrow", "base64", "snap", "zstd"], default-features=false}
structopt = "0.3.26"
pco = {version = "0.0.0-alpha.0", path = "../pco" }

[dev-dependencies]
enum-iterator = "0.7.0"
enum-iterator = "1.4.1"
2 changes: 1 addition & 1 deletion pco_cli/src/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ fn infer_csv_schema(path: &Path, opt: &CompressOpt) -> Result<Schema> {
));
}
_ => {
fields.push(field.clone());
fields.push(field.as_ref().clone());
}
}
}
Expand Down
34 changes: 17 additions & 17 deletions pco_cli/src/compress_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,14 @@ use std::fs::{File, OpenOptions};
use std::io::Write;
use std::marker::PhantomData;
use std::path::Path;
use std::sync::Arc;

use anyhow::Result;
use arrow::csv;
use arrow::csv::Reader as CsvReader;
use arrow::datatypes::{Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use parquet::arrow::arrow_reader::ParquetRecordBatchReader;
use parquet::arrow::{ArrowReader, ParquetFileArrowReader};
use parquet::file::reader::SerializedFileReader;
use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
use parquet::arrow::ProjectionMask;

use pco::data_types::NumberLike;
use pco::standalone::Compressor;
Expand Down Expand Up @@ -125,10 +124,16 @@ struct ParquetColumnReader<T> {

impl<P: NumberLikeArrow> ColumnReader<P> for ParquetColumnReader<P> {
fn new(schema: &Schema, path: &Path, opt: &CompressOpt) -> Result<Self> {
let reader = SerializedFileReader::new(File::open(path)?)?;
let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(reader));
let file = File::open(path)?;
let batch_reader_builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
let parquet_schema = parquet::arrow::arrow_to_parquet_schema(schema)?;
let col_idx = utils::find_col_idx(schema, opt);
let batch_reader = arrow_reader.get_record_reader_by_columns(vec![col_idx], opt.chunk_size)?;
let batch_reader = batch_reader_builder
.with_projection(ProjectionMask::leaves(
&parquet_schema,
vec![col_idx],
))
.build()?;
Ok(Self {
batch_reader,
phantom: PhantomData,
Expand Down Expand Up @@ -156,16 +161,11 @@ impl<P: NumberLikeArrow> ColumnReader<P> for CsvColumnReader<P> {
where
Self: Sized,
{
let csv_reader = CsvReader::from_reader(
File::open(path)?,
SchemaRef::new(schema.clone()),
opt.csv_has_header()?,
Some(opt.delimiter as u8),
opt.chunk_size,
None,
None,
Some(opt.timestamp_format.clone()),
);
let csv_reader = csv::ReaderBuilder::new(SchemaRef::new(schema.clone()))
.has_header(opt.csv_has_header()?)
.with_batch_size(opt.chunk_size)
.with_delimiter(opt.delimiter as u8)
.build(File::open(path)?)?;
let col_idx = utils::find_col_idx(schema, opt);

Ok(Self {
Expand Down
8 changes: 4 additions & 4 deletions pco_cli/src/dtype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use arrow::datatypes::{DataType as ArrowDataType, TimeUnit};
use pco::data_types::NumberLike;

#[derive(Clone, Copy, Debug, Eq, PartialEq)]
#[cfg_attr(test, derive(enum_iterator::IntoEnumIterator))]
#[cfg_attr(test, derive(enum_iterator::Sequence))]
pub enum DType {
F32,
F64,
Expand Down Expand Up @@ -111,13 +111,13 @@ mod tests {
use std::str::FromStr;

use anyhow::Result;
use enum_iterator::IntoEnumIterator;
use enum_iterator::all;

use crate::dtype::DType;

#[test]
fn test_arrow_dtypes_consistent() -> Result<()> {
for dtype in DType::into_enum_iter() {
for dtype in all::<DType>() {
if let Ok(arrow_dtype) = dtype.to_arrow() {
assert_eq!(DType::from_arrow(&arrow_dtype)?, dtype);
}
Expand All @@ -127,7 +127,7 @@ mod tests {

#[test]
fn test_dtype_nameable() -> Result<()> {
for dtype in DType::into_enum_iter() {
for dtype in all::<DType>() {
let name = format!("{:?}", dtype);
let recovered = DType::from_str(&name)?;
assert_eq!(recovered, dtype);
Expand Down
2 changes: 0 additions & 2 deletions quantile-compression/q_compress/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@ timestamps_96 = []
[dev-dependencies]
futures = "0.3.21"
rand = "0.8.4"
structopt = "0.3.26"
tokio = {version = "1.19.2", features = ["full"]}
zstd = "0.10"

[[example]]
name="wrapped_time_series"
Expand Down

0 comments on commit 41145fd

Please sign in to comment.