From 1d074d2fedeb13c0b03fc84dd8ebb42ebe5bec79 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 9 Aug 2024 10:10:47 -0700 Subject: [PATCH] Move load_partitioned_shuffles to class method --- python/src/indices.rs | 4 +- rust/lance-index/src/vector/ivf/shuffler.rs | 55 +++++++++++---------- 2 files changed, 30 insertions(+), 29 deletions(-) diff --git a/python/src/indices.rs b/python/src/indices.rs index 07f82fdd03..36206318a7 100644 --- a/python/src/indices.rs +++ b/python/src/indices.rs @@ -6,7 +6,7 @@ use arrow_array::{Array, FixedSizeListArray}; use arrow_data::ArrayData; use lance::index::vector::ivf::builder::write_vector_storage; use lance::io::ObjectStore; -use lance_index::vector::ivf::shuffler::{load_partitioned_shuffles, shuffle_vectors}; +use lance_index::vector::ivf::shuffler::shuffle_vectors; use lance_index::vector::{ ivf::{storage::IvfModel, IvfBuildParams}, pq::{PQBuildParams, ProductQuantizer}, @@ -300,7 +300,7 @@ async fn do_load_shuffled_vectors( pq_model: ProductQuantizer, ) -> PyResult<()> { let (_, path) = object_store_from_uri_or_path(dir_path).await?; - let streams = load_partitioned_shuffles(path.clone(), filenames) + let streams = IvfShuffler::load_partitioned_shuffles(&path, filenames) .await .infer_error()?; diff --git a/rust/lance-index/src/vector/ivf/shuffler.rs b/rust/lance-index/src/vector/ivf/shuffler.rs index bc9b2b0d11..5197a88463 100644 --- a/rust/lance-index/src/vector/ivf/shuffler.rs +++ b/rust/lance-index/src/vector/ivf/shuffler.rs @@ -354,7 +354,8 @@ pub async fn shuffle_dataset( // step 3: load the sorted chunks, consumers are expect to be responsible for merging the streams let start = std::time::Instant::now(); - let stream = shuffler.load_partitioned_shuffles(partition_files).await?; + let stream = + IvfShuffler::load_partitioned_shuffles(&shuffler.output_dir, partition_files).await?; info!("merged partitioned shuffles in {:?}", start.elapsed()); Ok(stream) @@ -785,7 +786,7 @@ impl IvfShuffler { } pub async fn load_partitioned_shuffles( - &self, + basedir: &Path, files: Vec, ) -> Result>>> { // impl RecordBatchStream @@ -793,7 +794,7 @@ impl IvfShuffler { for file in files { let object_store = Arc::new(ObjectStore::local()); - let path = self.output_dir.child(file); + let path = basedir.child(file); let scan_scheduler = ScanScheduler::new( object_store, SchedulerConfig::fast_and_not_too_ram_intensive(), @@ -958,10 +959,10 @@ mod test { assert_eq!(partition_files.len(), 1); - let mut result_stream = shuffler - .load_partitioned_shuffles(partition_files) - .await - .unwrap(); + let mut result_stream = + IvfShuffler::load_partitioned_shuffles(&shuffler.output_dir, partition_files) + .await + .unwrap(); let mut num_batches = 0; let mut stream = result_stream.pop().unwrap(); @@ -983,10 +984,10 @@ mod test { assert_eq!(partition_files.len(), 1); - let mut result_stream = shuffler - .load_partitioned_shuffles(partition_files) - .await - .unwrap(); + let mut result_stream = + IvfShuffler::load_partitioned_shuffles(&shuffler.output_dir, partition_files) + .await + .unwrap(); let mut num_batches = 0; let mut stream = result_stream.pop().unwrap(); @@ -1008,10 +1009,10 @@ mod test { assert_eq!(partition_files.len(), 100); - let mut result_stream = shuffler - .load_partitioned_shuffles(partition_files) - .await - .unwrap(); + let mut result_stream = + IvfShuffler::load_partitioned_shuffles(&shuffler.output_dir, partition_files) + .await + .unwrap(); let mut num_batches = 0; result_stream.reverse(); @@ -1038,10 +1039,10 @@ mod test { assert_eq!(partition_files.len(), 1); - let mut result_stream = shuffler - .load_partitioned_shuffles(partition_files) - .await - .unwrap(); + let mut result_stream = + IvfShuffler::load_partitioned_shuffles(&shuffler.output_dir, partition_files) + .await + .unwrap(); let mut num_batches = 0; result_stream.reverse(); @@ -1067,10 +1068,10 @@ mod test { let partition_files = shuffler.write_partitioned_shuffles(1, 32).await.unwrap(); assert_eq!(partition_files.len(), 200); - let mut result_stream = shuffler - .load_partitioned_shuffles(partition_files) - .await - .unwrap(); + let mut result_stream = + IvfShuffler::load_partitioned_shuffles(&shuffler.output_dir, partition_files) + .await + .unwrap(); let mut num_batches = 0; result_stream.reverse(); @@ -1145,10 +1146,10 @@ mod test { assert_eq!(partition_files.len(), expected_num_part_files as usize); - let mut result_stream = shuffler - .load_partitioned_shuffles(partition_files) - .await - .unwrap(); + let mut result_stream = + IvfShuffler::load_partitioned_shuffles(&shuffler.output_dir, partition_files) + .await + .unwrap(); let mut num_batches = 0; result_stream.reverse();