Skip to content

Commit

Permalink
Propagate errors to python and drop redundant lists iterations (#8)
Browse files Browse the repository at this point in the history
On branch comments-from-review
 Changes to be committed:
	modified:   src/lib.rs
  • Loading branch information
SemyonSinchenko authored Sep 19, 2024
1 parent 553c517 commit 0a3d486
Showing 1 changed file with 32 additions and 32 deletions.
64 changes: 32 additions & 32 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,16 @@ fn generate_groupby(
nas: i64,
seed: i64,
batch_size: i64,
) -> PyArrowType<RecordBatch> {
let distr_k = Uniform::<i64>::try_from(1..=k).unwrap();
let distr_nk = Uniform::<i64>::try_from(1..=(n / k)).unwrap();
let distr_5 = Uniform::<i64>::try_from(1..=5).unwrap();
let distr_15 = Uniform::<i64>::try_from(1..=15).unwrap();
let distr_float = Uniform::<f64>::try_from(0.0..=100.0).unwrap();
let distr_nas = Uniform::<i64>::try_from(0..=100).unwrap();
) -> PyResult<PyArrowType<RecordBatch>> {
let distr_k = Uniform::<i64>::try_from(1..=k)?;
let distr_nk = Uniform::<i64>::try_from(1..=(n / k))?;
let distr_5 = Uniform::<i64>::try_from(1..=5)?;
let distr_15 = Uniform::<i64>::try_from(1..=15)?;
let distr_float = Uniform::<f64>::try_from(0.0..=100.0)?;
let distr_nas = Uniform::<i64>::try_from(0..=100)?;
let mut rng = ChaCha8Rng::seed_from_u64(seed as u64);

let item_capacity = batch_size as usize;
let item_capacity = batch_size as usize; // validataion is on the python side

let mut id1_builder = StringBuilder::with_capacity(item_capacity, item_capacity * 8 * 5); // id{:03}, utf8
let mut id2_builder = StringBuilder::with_capacity(item_capacity, item_capacity * 8 * 5); // id{:03}, utf8
Expand Down Expand Up @@ -139,10 +139,9 @@ fn generate_groupby(
Arc::new(v2_builder.finish()),
Arc::new(v3_builder.finish()),
],
)
.unwrap();
).unwrap();

PyArrowType(batch)
Ok(PyArrowType(batch))
}

/**
Expand Down Expand Up @@ -170,13 +169,13 @@ fn generate_join_dataset_small(
seed: i64,
keys_seed: i64,
batch_size: i64,
) -> PyArrowType<RecordBatch> {
) -> PyResult<PyArrowType<RecordBatch>> {
let mut rng = ChaCha8Rng::seed_from_u64(seed as u64);
let mut keys_rng = ChaCha8Rng::seed_from_u64(keys_seed as u64);
let mut k1: Vec<i64> = (1..=(n * 11 / 10 / 1_000_000)).collect(); // original R line: key1 = split_xlr(N/1e6)
k1.shuffle(&mut keys_rng);

let distr_float = Uniform::<f64>::try_from(1.0..=100.0).unwrap();
let distr_float = Uniform::<f64>::try_from(1.0..=100.0)?;

// original R line (43:44)
// x = key[seq.int(1, n*0.9)],
Expand All @@ -194,16 +193,16 @@ fn generate_join_dataset_small(

kx.append(&mut kr);

let item_capacity = batch_size as usize;
let len_of_max_key = kx.iter().max().unwrap().to_string().len() + 2; // id{}, where {} is a number from a vec
let item_capacity = batch_size as usize; // validation is on the python side
let len_of_max_key = (n * 11 / 10 / 1_000_000).to_string().len() + 2; // id{}, where {} is a number from a vec

let mut id1_builder = Int64Builder::with_capacity(item_capacity);
let mut id4_builder =
StringBuilder::with_capacity(item_capacity, item_capacity * 8 * len_of_max_key); // utf8
let mut v2_builder = Float64Builder::with_capacity(item_capacity);

for _i in 0..batch_size {
let k1 = kx.choose(&mut rng).unwrap();
let k1 = kx.choose(&mut rng).unwrap(); // we know 100% that kx is non empty
id1_builder.append_value(*k1);
id4_builder.append_value(format!("id{}", k1));
v2_builder.append_value(distr_float.sample(&mut rng));
Expand All @@ -225,7 +224,7 @@ fn generate_join_dataset_small(
)
.unwrap();

PyArrowType(batch)
Ok(PyArrowType(batch))
}

/**
Expand Down Expand Up @@ -253,7 +252,7 @@ fn generate_join_dataset_medium(
seed: i64,
keys_seed: i64,
batch_size: i64,
) -> PyArrowType<RecordBatch> {
) -> PyResult<PyArrowType<RecordBatch>> {
let mut rng = ChaCha8Rng::seed_from_u64(seed as u64);
let mut keys_rng = ChaCha8Rng::seed_from_u64(keys_seed as u64);
let mut k1: Vec<i64> = (1..=(n * 11 / 10 / 1_000_000)).collect(); // original R line: key1 = split_xlr(N/1e6)
Expand All @@ -262,7 +261,7 @@ fn generate_join_dataset_medium(
let mut k2: Vec<i64> = (1..=(n * 11 / 10 / 1_000)).collect(); // original R line: key2 = split_xlr(N/1e3)
k2.shuffle(&mut keys_rng);

let distr_float = Uniform::<f64>::try_from(1.0..=100.0).unwrap();
let distr_float = Uniform::<f64>::try_from(1.0..=100.0)?;

// orginial line (43:44)
// x = key[seq.int(1, n*0.9)],
Expand Down Expand Up @@ -292,8 +291,8 @@ fn generate_join_dataset_medium(
k2x.append(&mut k2r);

let item_capacity = batch_size as usize;
let len_of_max_k1_key = k1x.iter().max().unwrap().to_string().len() + 2; // id{}, where {} is a number from a vec
let len_of_max_k2_key = k2x.iter().max().unwrap().to_string().len() + 2; // the same
let len_of_max_k1_key = (n * 11 / 10 / 1_000_000).to_string().len() + 2; // id{}, where {} is a number from a vec
let len_of_max_k2_key = (n * 11 / 10 / 1_000).to_string().len() + 2; // the same

let mut id1_builder = Int64Builder::with_capacity(item_capacity);
let mut id2_builder = Int64Builder::with_capacity(item_capacity);
Expand All @@ -304,8 +303,8 @@ fn generate_join_dataset_medium(
let mut v2_builder = Float64Builder::with_capacity(item_capacity);

for _i in 0..batch_size {
let k1 = k1x.choose(&mut rng).unwrap();
let k2 = k2x.choose(&mut rng).unwrap();
let k1 = k1x.choose(&mut rng).unwrap(); // we know 100% that k1x is non empty
let k2 = k2x.choose(&mut rng).unwrap(); // we know 100% that k2x is non empty

id1_builder.append_value(*k1);
id2_builder.append_value(*k2);
Expand Down Expand Up @@ -334,11 +333,11 @@ fn generate_join_dataset_medium(
)
.unwrap();

PyArrowType(batch)
Ok(PyArrowType(batch))
}

/**
Generate H2O join medium dataset.
Generate H2O join big dataset.
Running this function multiple time with the same seed
will constantly return exactly the same batch!
Expand Down Expand Up @@ -366,7 +365,7 @@ fn generate_join_dataset_big(
seed: i64,
keys_seed: i64,
batch_size: i64,
) -> PyArrowType<RecordBatch> {
) -> PyResult<PyArrowType<RecordBatch>> {
let mut rng = ChaCha8Rng::seed_from_u64(seed as u64);
let mut keys_rng = ChaCha8Rng::seed_from_u64(keys_seed as u64);
let mut k1: Vec<i64> = (1..=(n * 11 / 10 / 1_000_000)).collect(); // original R line: key1 = split_xlr(N/1e6)
Expand All @@ -378,8 +377,8 @@ fn generate_join_dataset_big(
let mut k3: Vec<i64> = (1..=(n * 11 / 10)).collect(); // original R line: key3 = split_xlr(N)
k3.shuffle(&mut keys_rng);

let distr_float = Uniform::<f64>::try_from(1.0..=100.0).unwrap();
let distr_nas = Uniform::<i64>::try_from(0..=100).unwrap();
let distr_float = Uniform::<f64>::try_from(1.0..=100.0)?;
let distr_nas = Uniform::<i64>::try_from(0..=100)?;

// orginial line (43:44)
// x = key[seq.int(1, n*0.9)],
Expand All @@ -404,9 +403,9 @@ fn generate_join_dataset_big(
.to_vec();

let item_capacity = batch_size as usize;
let len_of_max_k1_key = k1.iter().max().unwrap().to_string().len() + 2; // id{}, where {} is a number from a vec
let len_of_max_k2_key = k2.iter().max().unwrap().to_string().len() + 2; // the same
let len_of_max_k3_key = k3.iter().max().unwrap().to_string().len() + 2; // the same
let len_of_max_k1_key = (n * 11 / 10 / 1_000_000).to_string().len() + 2; // id{}, where {} is a number from a vec
let len_of_max_k2_key = (n * 11 / 10 / 1_000).to_string().len() + 2; // the same
let len_of_max_k3_key = (n * 11 / 10).to_string().len() + 2; // the same

let mut id1_builder = Int64Builder::with_capacity(item_capacity);
let mut id2_builder = Int64Builder::with_capacity(item_capacity);
Expand All @@ -420,6 +419,7 @@ fn generate_join_dataset_big(
let mut v2_builder = Float64Builder::with_capacity(item_capacity);

for _i in 0..batch_size {
// We exactly know at the moment that k1, k2, k3 are non empty
let k1 = k1.choose(&mut rng).unwrap();
let k2 = k2.choose(&mut rng).unwrap();
let k3 = k3.choose(&mut rng).unwrap();
Expand Down Expand Up @@ -473,7 +473,7 @@ fn generate_join_dataset_big(
)
.unwrap();

PyArrowType(batch)
Ok(PyArrowType(batch))
}

#[pymodule]
Expand Down

0 comments on commit 0a3d486

Please sign in to comment.