From eea8d5b082b2cc08b83e5dc7973ab8040df68445 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Thu, 8 Aug 2024 10:06:46 -0700 Subject: [PATCH] Remove extra schema parameter. Tweak shuffling benchmark --- python/python/benchmarks/test_index.py | 12 ++-- rust/lance-index/src/vector/ivf/builder.rs | 3 +- rust/lance-index/src/vector/ivf/shuffler.rs | 75 +++++++-------------- rust/lance/src/index/vector/ivf.rs | 6 +- rust/lance/src/index/vector/ivf/builder.rs | 6 +- 5 files changed, 41 insertions(+), 61 deletions(-) diff --git a/python/python/benchmarks/test_index.py b/python/python/benchmarks/test_index.py index c9777aefc1..4c0c2b3ff2 100644 --- a/python/python/benchmarks/test_index.py +++ b/python/python/benchmarks/test_index.py @@ -187,11 +187,15 @@ def test_transform_vectors_with_precomputed_parts( @pytest.mark.benchmark(group="shuffle_vectors") def test_shuffle_vectors(test_large_dataset, tmpdir, benchmark): - ivf = rand_ivf(test_dataset) - pq = rand_pq(test_dataset, ivf) - builder = IndicesBuilder(test_dataset, "vector") + ivf = rand_ivf(test_large_dataset) + pq = rand_pq(test_large_dataset, ivf) + builder = IndicesBuilder(test_large_dataset, "vector") transformed_uri = str(tmpdir / "output.lance") - builder.transform_vectors(ivf, pq, transformed_uri) + part_ids_path = str(tmpdir / "part_ids") + gen_rand_part_ids(test_large_dataset, part_ids_path) + print("Transforming vectors") + builder.transform_vectors(ivf, pq, transformed_uri, None, part_ids_path) + print("Shuffling") shuffle_out = str(tmpdir) benchmark.pedantic( builder.shuffle_transformed_vectors, diff --git a/rust/lance-index/src/vector/ivf/builder.rs b/rust/lance-index/src/vector/ivf/builder.rs index 24b3815bb5..79d48eec17 100644 --- a/rust/lance-index/src/vector/ivf/builder.rs +++ b/rust/lance-index/src/vector/ivf/builder.rs @@ -8,7 +8,6 @@ use std::sync::Arc; use arrow_array::cast::AsArray; use arrow_array::{Array, FixedSizeListArray, UInt32Array, UInt64Array}; -use arrow_schema::SchemaRef; use futures::TryStreamExt; use object_store::path::Path; use snafu::{location, Location}; @@ -40,7 +39,7 @@ pub struct IvfBuildParams { /// requires `centroids` to be set /// /// The input is expected to be (/dir/to/buffers, [buffer1.lance, buffer2.lance, ...]) - pub precomputed_shuffle_buffers: Option<(Path, Vec, SchemaRef)>, + pub precomputed_shuffle_buffers: Option<(Path, Vec)>, pub shuffle_partition_batches: usize, diff --git a/rust/lance-index/src/vector/ivf/shuffler.rs b/rust/lance-index/src/vector/ivf/shuffler.rs index 427d4ed978..e07cb918c0 100644 --- a/rust/lance-index/src/vector/ivf/shuffler.rs +++ b/rust/lance-index/src/vector/ivf/shuffler.rs @@ -23,7 +23,7 @@ use arrow::datatypes::UInt32Type; use arrow_array::{cast::AsArray, types::UInt64Type, Array, RecordBatch, UInt32Array}; use arrow_array::{FixedSizeListArray, UInt8Array}; use arrow_array::{ListArray, StructArray, UInt64Array}; -use arrow_schema::{DataType, Field, Fields, SchemaRef}; +use arrow_schema::{DataType, Field, Fields}; use futures::stream::repeat_with; use futures::{stream, FutureExt, Stream, StreamExt, TryStreamExt}; use lance_arrow::RecordBatchExt; @@ -174,18 +174,14 @@ impl PartitionBuilder { struct PartitionListBuilder { partitions: Vec>, + partition_sizes: Vec, } impl PartitionListBuilder { - fn new(schema: &arrow_schema::Schema, partition_sizes: Vec) -> Self { + fn new(partition_sizes: Vec) -> Self { Self { - partitions: Vec::from_iter(partition_sizes.into_iter().map(|part_size| { - if part_size == 0 { - None - } else { - Some(PartitionBuilder::new(schema, part_size as usize)) - } - })), + partitions: Vec::default(), + partition_sizes, } } @@ -194,6 +190,17 @@ impl PartitionListBuilder { return; } + if self.partitions.is_empty() { + let schema = batch.schema(); + self.partitions = Vec::from_iter(self.partition_sizes.iter().map(|part_size| { + if *part_size == 0 { + None + } else { + Some(PartitionBuilder::new(schema.as_ref(), *part_size as usize)) + } + })) + } + let part_ids = batch .column_by_name(PART_ID_COLUMN) .unwrap() @@ -243,14 +250,14 @@ pub async fn shuffle_dataset( num_partitions: u32, shuffle_partition_batches: usize, shuffle_partition_concurrency: usize, - precomputed_shuffle_buffers: Option<(Path, Vec, SchemaRef)>, + precomputed_shuffle_buffers: Option<(Path, Vec)>, ) -> Result>>> { // step 1: either use precomputed shuffle files or write shuffle data to a file - let shuffler = if let Some((path, buffers, schema)) = precomputed_shuffle_buffers { + let shuffler = if let Some((path, buffers)) = precomputed_shuffle_buffers { info!("Precomputed shuffle files provided, skip calculation of IVF partition."); let mut shuffler = IvfShuffler::try_new(num_partitions, Some(path), true)?; unsafe { - shuffler.set_unsorted_buffers(&buffers, schema); + shuffler.set_unsorted_buffers(&buffers); } shuffler @@ -355,7 +362,6 @@ pub async fn shuffle_dataset( pub async fn shuffle_vectors( filenames: Vec, - unsorted_schema: SchemaRef, dir_path: Path, ivf_centroids: FixedSizeListArray, ) -> Result> { @@ -365,7 +371,7 @@ pub async fn shuffle_vectors( let mut shuffler = IvfShuffler::try_new(num_partitions, Some(dir_path), false)?; unsafe { - shuffler.set_unsorted_buffers(&filenames, unsorted_schema); + shuffler.set_unsorted_buffers(&filenames); } let partition_files = shuffler @@ -381,9 +387,6 @@ pub struct IvfShuffler { num_partitions: u32, - // The schema of the partition files. Only initialized after the unsorted parts are written - schema: Option, - output_dir: Path, // whether the lance file is v1 (legacy) or v2 @@ -412,7 +415,6 @@ impl IvfShuffler { output_dir, unsorted_buffers: vec![], is_legacy, - schema: None, }) } @@ -421,13 +423,8 @@ impl IvfShuffler { /// # Safety /// /// user must ensure the buffers are valid. - pub unsafe fn set_unsorted_buffers( - &mut self, - unsorted_buffers: &[impl ToString], - schema: SchemaRef, - ) { + pub unsafe fn set_unsorted_buffers(&mut self, unsorted_buffers: &[impl ToString]) { self.unsorted_buffers = unsorted_buffers.iter().map(|x| x.to_string()).collect(); - self.schema = Some(schema); } pub async fn write_unsorted_stream( @@ -448,7 +445,6 @@ impl IvfShuffler { return Err(Error::io("empty stream".to_string(), location!())); } }; - self.schema = Some(schema.clone()); // validate the schema, // we need to have row ID and partition ID column @@ -478,7 +474,7 @@ impl IvfShuffler { file_writer.finish().await?; unsafe { - self.set_unsorted_buffers(&[UNSORTED_BUFFER], schema); + self.set_unsorted_buffers(&[UNSORTED_BUFFER]); } Ok(()) @@ -577,7 +573,6 @@ impl IvfShuffler { async fn shuffle_to_partitions( &self, - schema: &arrow_schema::Schema, inputs: &[ShuffleInput], partition_size: Vec, num_batches_to_sort: usize, @@ -585,7 +580,7 @@ impl IvfShuffler { info!("Shuffling into memory"); let mut num_processed = 0; - let mut partitions_builder = PartitionListBuilder::new(schema, partition_size); + let mut partitions_builder = PartitionListBuilder::new(partition_size); for &ShuffleInput { file_idx, @@ -669,11 +664,6 @@ impl IvfShuffler { ) -> Result> { let num_batches = self.total_batches().await?; let total_batches = num_batches.iter().sum(); - let schema = self - .schema - .as_ref() - .expect("unsorted data not written yet so schema not initialized") - .clone(); info!( "Sorting unsorted data into sorted chunks (batches_per_chunk={} concurrent_jobs={})", @@ -682,10 +672,8 @@ impl IvfShuffler { stream::iter((0..total_batches).step_by(batches_per_partition)) .zip(stream::repeat(num_batches)) .map(|(i, num_batches)| { - let schema = schema.clone(); let this = self.clone(); tokio::spawn(async move { - let schema = schema.clone(); // first, calculate which files and ranges needs to be processed let start = i; let end = std::cmp::min(i + batches_per_partition, total_batches); @@ -729,12 +717,7 @@ impl IvfShuffler { // third, shuffle the data into each partition let shuffled = this - .shuffle_to_partitions( - schema.as_ref(), - &input, - size_counts, - num_batches_to_sort, - ) + .shuffle_to_partitions(&input, size_counts, num_batches_to_sort) .await?; // finally, write the shuffled data to disk @@ -1028,13 +1011,10 @@ mod test { #[tokio::test] async fn test_shuffler_multi_buffer_single_partition() { let (stream, mut shuffler) = make_stream_and_shuffler(false); - let unsorted_schema = stream.schema(); shuffler.write_unsorted_stream(stream).await.unwrap(); // set the same buffer twice we should get double the data - unsafe { - shuffler.set_unsorted_buffers(&[UNSORTED_BUFFER, UNSORTED_BUFFER], unsorted_schema) - } + unsafe { shuffler.set_unsorted_buffers(&[UNSORTED_BUFFER, UNSORTED_BUFFER]) } let partition_files = shuffler.write_partitioned_shuffles(200, 1).await.unwrap(); @@ -1061,13 +1041,10 @@ mod test { #[tokio::test] async fn test_shuffler_multi_buffer_multi_partition() { let (stream, mut shuffler) = make_stream_and_shuffler(false); - let unsorted_schema = stream.schema(); shuffler.write_unsorted_stream(stream).await.unwrap(); // set the same buffer twice we should get double the data - unsafe { - shuffler.set_unsorted_buffers(&[UNSORTED_BUFFER, UNSORTED_BUFFER], unsorted_schema) - } + unsafe { shuffler.set_unsorted_buffers(&[UNSORTED_BUFFER, UNSORTED_BUFFER]) } let partition_files = shuffler.write_partitioned_shuffles(1, 32).await.unwrap(); assert_eq!(partition_files.len(), 200); diff --git a/rust/lance/src/index/vector/ivf.rs b/rust/lance/src/index/vector/ivf.rs index 553f021623..e18d85ae9b 100644 --- a/rust/lance/src/index/vector/ivf.rs +++ b/rust/lance/src/index/vector/ivf.rs @@ -16,7 +16,7 @@ use arrow_array::{ Array, FixedSizeListArray, PrimitiveArray, RecordBatch, StructArray, UInt32Array, }; use arrow_ord::sort::sort_to_indices; -use arrow_schema::{DataType, Schema, SchemaRef}; +use arrow_schema::{DataType, Schema}; use arrow_select::{concat::concat_batches, take::take}; use async_trait::async_trait; use deepsize::DeepSizeOf; @@ -1369,7 +1369,7 @@ async fn write_ivf_pq_file( precomputed_partitions: Option>, shuffle_partition_batches: usize, shuffle_partition_concurrency: usize, - precomputed_shuffle_buffers: Option<(Path, Vec, SchemaRef)>, + precomputed_shuffle_buffers: Option<(Path, Vec)>, ) -> Result<()> { let path = index_dir.child(uuid).child(INDEX_FILE_NAME); let mut writer = object_store.create(&path).await?; @@ -1427,7 +1427,7 @@ async fn write_ivf_hnsw_file( precomputed_partitions: Option>, shuffle_partition_batches: usize, shuffle_partition_concurrency: usize, - precomputed_shuffle_buffers: Option<(Path, Vec, SchemaRef)>, + precomputed_shuffle_buffers: Option<(Path, Vec)>, ) -> Result<()> { let object_store = dataset.object_store(); let path = dataset.indices_dir().child(uuid).child(INDEX_FILE_NAME); diff --git a/rust/lance/src/index/vector/ivf/builder.rs b/rust/lance/src/index/vector/ivf/builder.rs index 5f9734a6ed..ac3114a4c0 100644 --- a/rust/lance/src/index/vector/ivf/builder.rs +++ b/rust/lance/src/index/vector/ivf/builder.rs @@ -8,7 +8,7 @@ use std::sync::Arc; use arrow::array::AsArray; use arrow::datatypes::UInt64Type; use arrow_array::{FixedSizeListArray, RecordBatch, UInt32Array, UInt64Array}; -use arrow_schema::{DataType, Field as ArrowField, SchemaRef}; +use arrow_schema::{DataType, Field as ArrowField}; use futures::{StreamExt, TryStreamExt}; use lance_arrow::{RecordBatchExt, SchemaExt}; use lance_core::utils::address::RowAddress; @@ -57,7 +57,7 @@ pub(super) async fn build_partitions( precomputed_partitons: Option>, shuffle_partition_batches: usize, shuffle_partition_concurrency: usize, - precomputed_shuffle_buffers: Option<(Path, Vec, SchemaRef)>, + precomputed_shuffle_buffers: Option<(Path, Vec)>, ) -> Result<()> { let schema = data.schema(); if schema.column_with_name(column).is_none() { @@ -257,7 +257,7 @@ pub(super) async fn build_hnsw_partitions( precomputed_partitions: Option>, shuffle_partition_batches: usize, shuffle_partition_concurrency: usize, - precomputed_shuffle_buffers: Option<(Path, Vec, SchemaRef)>, + precomputed_shuffle_buffers: Option<(Path, Vec)>, ) -> Result<(Vec, IvfModel)> { let schema = data.schema(); if schema.column_with_name(column).is_none() {