Skip to content

Commit

Permalink
Move load_partitioned_shuffles to class method
Browse files Browse the repository at this point in the history
  • Loading branch information
westonpace committed Aug 9, 2024
1 parent f3719a4 commit 1d074d2
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 29 deletions.
4 changes: 2 additions & 2 deletions python/src/indices.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use arrow_array::{Array, FixedSizeListArray};
use arrow_data::ArrayData;
use lance::index::vector::ivf::builder::write_vector_storage;
use lance::io::ObjectStore;
use lance_index::vector::ivf::shuffler::{load_partitioned_shuffles, shuffle_vectors};
use lance_index::vector::ivf::shuffler::shuffle_vectors;
use lance_index::vector::{
ivf::{storage::IvfModel, IvfBuildParams},
pq::{PQBuildParams, ProductQuantizer},
Expand Down Expand Up @@ -300,7 +300,7 @@ async fn do_load_shuffled_vectors(
pq_model: ProductQuantizer,
) -> PyResult<()> {
let (_, path) = object_store_from_uri_or_path(dir_path).await?;
let streams = load_partitioned_shuffles(path.clone(), filenames)
let streams = IvfShuffler::load_partitioned_shuffles(&path, filenames)
.await
.infer_error()?;

Expand Down
55 changes: 28 additions & 27 deletions rust/lance-index/src/vector/ivf/shuffler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,8 @@ pub async fn shuffle_dataset(

// step 3: load the sorted chunks, consumers are expect to be responsible for merging the streams
let start = std::time::Instant::now();
let stream = shuffler.load_partitioned_shuffles(partition_files).await?;
let stream =
IvfShuffler::load_partitioned_shuffles(&shuffler.output_dir, partition_files).await?;
info!("merged partitioned shuffles in {:?}", start.elapsed());

Ok(stream)
Expand Down Expand Up @@ -785,15 +786,15 @@ impl IvfShuffler {
}

pub async fn load_partitioned_shuffles(
&self,
basedir: &Path,
files: Vec<String>,
) -> Result<Vec<impl Stream<Item = Result<RecordBatch>>>> {
// impl RecordBatchStream
let mut streams = vec![];

for file in files {
let object_store = Arc::new(ObjectStore::local());
let path = self.output_dir.child(file);
let path = basedir.child(file);
let scan_scheduler = ScanScheduler::new(
object_store,
SchedulerConfig::fast_and_not_too_ram_intensive(),
Expand Down Expand Up @@ -958,10 +959,10 @@ mod test {

assert_eq!(partition_files.len(), 1);

let mut result_stream = shuffler
.load_partitioned_shuffles(partition_files)
.await
.unwrap();
let mut result_stream =
IvfShuffler::load_partitioned_shuffles(&shuffler.output_dir, partition_files)
.await
.unwrap();

let mut num_batches = 0;
let mut stream = result_stream.pop().unwrap();
Expand All @@ -983,10 +984,10 @@ mod test {

assert_eq!(partition_files.len(), 1);

let mut result_stream = shuffler
.load_partitioned_shuffles(partition_files)
.await
.unwrap();
let mut result_stream =
IvfShuffler::load_partitioned_shuffles(&shuffler.output_dir, partition_files)
.await
.unwrap();

let mut num_batches = 0;
let mut stream = result_stream.pop().unwrap();
Expand All @@ -1008,10 +1009,10 @@ mod test {

assert_eq!(partition_files.len(), 100);

let mut result_stream = shuffler
.load_partitioned_shuffles(partition_files)
.await
.unwrap();
let mut result_stream =
IvfShuffler::load_partitioned_shuffles(&shuffler.output_dir, partition_files)
.await
.unwrap();

let mut num_batches = 0;
result_stream.reverse();
Expand All @@ -1038,10 +1039,10 @@ mod test {

assert_eq!(partition_files.len(), 1);

let mut result_stream = shuffler
.load_partitioned_shuffles(partition_files)
.await
.unwrap();
let mut result_stream =
IvfShuffler::load_partitioned_shuffles(&shuffler.output_dir, partition_files)
.await
.unwrap();

let mut num_batches = 0;
result_stream.reverse();
Expand All @@ -1067,10 +1068,10 @@ mod test {
let partition_files = shuffler.write_partitioned_shuffles(1, 32).await.unwrap();
assert_eq!(partition_files.len(), 200);

let mut result_stream = shuffler
.load_partitioned_shuffles(partition_files)
.await
.unwrap();
let mut result_stream =
IvfShuffler::load_partitioned_shuffles(&shuffler.output_dir, partition_files)
.await
.unwrap();

let mut num_batches = 0;
result_stream.reverse();
Expand Down Expand Up @@ -1145,10 +1146,10 @@ mod test {

assert_eq!(partition_files.len(), expected_num_part_files as usize);

let mut result_stream = shuffler
.load_partitioned_shuffles(partition_files)
.await
.unwrap();
let mut result_stream =
IvfShuffler::load_partitioned_shuffles(&shuffler.output_dir, partition_files)
.await
.unwrap();

let mut num_batches = 0;
result_stream.reverse();
Expand Down

0 comments on commit 1d074d2

Please sign in to comment.