From 2454f7e872c951d1fea596434426218e9b732b4e Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Wed, 19 Jun 2024 08:10:24 -0700 Subject: [PATCH] Use v2 to write sorted part files Avoid expensive concatenation by building partitions in-place Parallelize shuffling --- rust/lance-file/src/v2/writer.rs | 46 +- rust/lance-index/Cargo.toml | 1 + rust/lance-index/src/vector/ivf/builder.rs | 3 +- rust/lance-index/src/vector/ivf/shuffler.rs | 611 ++++++++++++++------ rust/lance/src/index/vector/ivf.rs | 6 +- rust/lance/src/index/vector/ivf/builder.rs | 6 +- 6 files changed, 478 insertions(+), 195 deletions(-) diff --git a/rust/lance-file/src/v2/writer.rs b/rust/lance-file/src/v2/writer.rs index 2f46380837..3481df8884 100644 --- a/rust/lance-file/src/v2/writer.rs +++ b/rust/lance-file/src/v2/writer.rs @@ -24,6 +24,7 @@ use prost::Message; use prost_types::Any; use snafu::{location, Location}; use tokio::io::AsyncWriteExt; +use tracing::instrument; use crate::datatypes::FieldsWithMeta; use crate::format::pb; @@ -175,6 +176,7 @@ impl FileWriter { Ok(()) } + #[instrument(skip_all, level = "debug")] async fn write_pages( &mut self, mut encoding_tasks: FuturesUnordered, @@ -263,6 +265,30 @@ impl FileWriter { Ok(self.schema.as_ref().unwrap()) } + #[instrument(skip_all, level = "debug")] + fn encode_batch(&mut self, batch: &RecordBatch) -> Result>> { + self.schema + .as_ref() + .unwrap() + .fields + .iter() + .zip(self.column_writers.iter_mut()) + .map(|(field, column_writer)| { + let array = batch + .column_by_name(&field.name) + .ok_or(Error::InvalidInput { + source: format!( + "Cannot write batch. The batch was missing the column `{}`", + field.name + ) + .into(), + location: location!(), + })?; + column_writer.maybe_encode(array.clone()) + }) + .collect::>>() + } + /// Schedule a batch of data to be written to the file /// /// Note: the future returned by this method may complete before the data has been fully @@ -273,7 +299,6 @@ impl FileWriter { batch.get_array_memory_size() ); self.ensure_initialized(batch)?; - let schema = self.schema.as_ref().unwrap(); let num_rows = batch.num_rows() as u64; if num_rows == 0 { return Ok(()); @@ -292,24 +317,7 @@ impl FileWriter { }; // First we push each array into its column writer. This may or may not generate enough // data to trigger an encoding task. We collect any encoding tasks into a queue. - let encoding_tasks = schema - .fields - .iter() - .zip(self.column_writers.iter_mut()) - .map(|(field, column_writer)| { - let array = batch - .column_by_name(&field.name) - .ok_or(Error::InvalidInput { - source: format!( - "Cannot write batch. The batch was missing the column `{}`", - field.name - ) - .into(), - location: location!(), - })?; - column_writer.maybe_encode(array.clone()) - }) - .collect::>>()?; + let encoding_tasks = self.encode_batch(batch)?; let encoding_tasks = encoding_tasks .into_iter() .flatten() diff --git a/rust/lance-index/Cargo.toml b/rust/lance-index/Cargo.toml index a15dd089d6..01cb010c56 100644 --- a/rust/lance-index/Cargo.toml +++ b/rust/lance-index/Cargo.toml @@ -64,6 +64,7 @@ criterion.workspace = true lance-datagen.workspace = true lance-testing.workspace = true tempfile.workspace = true +test-log.workspace = true datafusion-sql.workspace = true random_word = { version = "0.4.3", features = ["en"] } diff --git a/rust/lance-index/src/vector/ivf/builder.rs b/rust/lance-index/src/vector/ivf/builder.rs index 79d48eec17..24b3815bb5 100644 --- a/rust/lance-index/src/vector/ivf/builder.rs +++ b/rust/lance-index/src/vector/ivf/builder.rs @@ -8,6 +8,7 @@ use std::sync::Arc; use arrow_array::cast::AsArray; use arrow_array::{Array, FixedSizeListArray, UInt32Array, UInt64Array}; +use arrow_schema::SchemaRef; use futures::TryStreamExt; use object_store::path::Path; use snafu::{location, Location}; @@ -39,7 +40,7 @@ pub struct IvfBuildParams { /// requires `centroids` to be set /// /// The input is expected to be (/dir/to/buffers, [buffer1.lance, buffer2.lance, ...]) - pub precomputed_shuffle_buffers: Option<(Path, Vec)>, + pub precomputed_shuffle_buffers: Option<(Path, Vec, SchemaRef)>, pub shuffle_partition_batches: usize, diff --git a/rust/lance-index/src/vector/ivf/shuffler.rs b/rust/lance-index/src/vector/ivf/shuffler.rs index 9a7669216f..427d4ed978 100644 --- a/rust/lance-index/src/vector/ivf/shuffler.rs +++ b/rust/lance-index/src/vector/ivf/shuffler.rs @@ -14,17 +14,24 @@ use std::collections::HashMap; use std::sync::Arc; +use arrow::array::{ + ArrayBuilder, FixedSizeListBuilder, StructBuilder, UInt32Builder, UInt64Builder, UInt8Builder, +}; +use arrow::buffer::{OffsetBuffer, ScalarBuffer}; use arrow::compute::sort_to_indices; -use arrow_array::FixedSizeListArray; +use arrow::datatypes::UInt32Type; use arrow_array::{cast::AsArray, types::UInt64Type, Array, RecordBatch, UInt32Array}; -use arrow_schema::Field; +use arrow_array::{FixedSizeListArray, UInt8Array}; +use arrow_array::{ListArray, StructArray, UInt64Array}; +use arrow_schema::{DataType, Field, Fields, SchemaRef}; use futures::stream::repeat_with; -use futures::{stream, Stream, StreamExt, TryStreamExt}; +use futures::{stream, FutureExt, Stream, StreamExt, TryStreamExt}; use lance_arrow::RecordBatchExt; use lance_core::{datatypes::Schema, Error, Result, ROW_ID}; -use lance_encoding::decoder::FilterExpression; +use lance_encoding::decoder::{DecoderMiddlewareChain, FilterExpression}; use lance_file::reader::FileReader; use lance_file::v2::reader::FileReader as Lancev2FileReader; +use lance_file::v2::writer::FileWriterOptions; use lance_file::writer::FileWriter; use lance_io::object_store::ObjectStore; use lance_io::scheduler::{ScanScheduler, SchedulerConfig}; @@ -54,6 +61,161 @@ fn get_temp_dir() -> Result { Ok(tmp_dir_path) } +/// A builder for a partition of data +/// +/// After we sort a batch of data into partitions we append those slices into this builder. +/// +/// The builder is pre-allocated and so this extend operation should only be a memcpy +#[derive(Debug)] +struct PartitionBuilder { + builder: StructBuilder, +} + +// Fork of arrow_array::builder::make_builder that handles FixedSizeList >_< +// +// Not really suitable for upstreaming because FixedSizeListBuilder> is +// awkward and the entire make_builder function needs some overhaul (dyn ArrayBuilder should have +// an extend(array: &dyn Array) method). +fn make_builder(datatype: &DataType, capacity: usize) -> Box { + if let DataType::FixedSizeList(inner, dim) = datatype { + let inner_builder = + arrow_array::builder::make_builder(inner.data_type(), capacity * (*dim) as usize); + Box::new(FixedSizeListBuilder::new(inner_builder, *dim)) + } else { + arrow_array::builder::make_builder(datatype, capacity) + } +} + +// Fork of StructBuilder::from_fields that handles FixedSizeList >_< +fn from_fields(fields: impl Into, capacity: usize) -> StructBuilder { + let fields = fields.into(); + let mut builders = Vec::with_capacity(fields.len()); + for field in &fields { + builders.push(make_builder(field.data_type(), capacity)); + } + StructBuilder::new(fields, builders) +} + +impl PartitionBuilder { + fn new(schema: &arrow_schema::Schema, initial_capacity: usize) -> Self { + let builder = from_fields(schema.fields.clone(), initial_capacity); + Self { builder } + } + + fn extend(&mut self, batch: &RecordBatch) { + for _ in 0..batch.num_rows() { + self.builder.append(true); + } + let schema = batch.schema_ref(); + for (field_idx, (col, field)) in batch.columns().iter().zip(schema.fields()).enumerate() { + match field.data_type() { + DataType::UInt32 => { + let col = col.as_any().downcast_ref::().unwrap(); + self.builder + .field_builder::(field_idx) + .unwrap() + .append_slice(col.values()); + } + DataType::UInt64 => { + let col = col.as_any().downcast_ref::().unwrap(); + self.builder + .field_builder::(field_idx) + .unwrap() + .append_slice(col.values()); + } + DataType::FixedSizeList(inner, _) => { + let col = col.as_any().downcast_ref::().unwrap(); + match inner.data_type() { + DataType::UInt8 => { + let values = + col.values().as_any().downcast_ref::().unwrap(); + let fsl_builder = self + .builder + .field_builder::>>( + field_idx, + ) + .unwrap(); + // TODO: Upstream an append_many to FSL builder + for _ in 0..col.len() { + fsl_builder.append(true); + } + fsl_builder + .values() + .as_any_mut() + .downcast_mut::() + .unwrap() + .append_slice(values.values()); + } + _ => panic!("Unexpected fixed size list item type in shuffled file"), + } + } + _ => panic!("Unexpected column type in shuffled file"), + } + } + } + + // Convert the partition builder into a list array with 1 row + fn finish(mut self) -> Result { + let struct_array = Arc::new(self.builder.finish()); + + let item_field = Arc::new(Field::new("item", struct_array.data_type().clone(), true)); + + Ok(ListArray::try_new( + item_field, + OffsetBuffer::new(ScalarBuffer::::from(vec![ + 0, + struct_array.len() as i32, + ])), + struct_array, + None, + )?) + } +} + +struct PartitionListBuilder { + partitions: Vec>, +} + +impl PartitionListBuilder { + fn new(schema: &arrow_schema::Schema, partition_sizes: Vec) -> Self { + Self { + partitions: Vec::from_iter(partition_sizes.into_iter().map(|part_size| { + if part_size == 0 { + None + } else { + Some(PartitionBuilder::new(schema, part_size as usize)) + } + })), + } + } + + fn extend(&mut self, batch: &RecordBatch) { + if batch.num_rows() == 0 { + return; + } + + let part_ids = batch + .column_by_name(PART_ID_COLUMN) + .unwrap() + .as_primitive::(); + + let part_id = part_ids.value(0) as usize; + + let builder = &mut self.partitions[part_id]; + builder + .as_mut() + .expect("partition size was zero but received data for partition") + .extend(batch); + } + + fn finish(self) -> Result> { + self.partitions + .into_iter() + .filter_map(|builder| builder.map(|builder| builder.finish())) + .collect() + } +} + /// Disk-based shuffle for a stream of [RecordBatch] into each IVF partition. /// Sub-quantizer will be applied if provided. /// @@ -81,14 +243,14 @@ pub async fn shuffle_dataset( num_partitions: u32, shuffle_partition_batches: usize, shuffle_partition_concurrency: usize, - precomputed_shuffle_buffers: Option<(Path, Vec)>, + precomputed_shuffle_buffers: Option<(Path, Vec, SchemaRef)>, ) -> Result>>> { // step 1: either use precomputed shuffle files or write shuffle data to a file - let shuffler = if let Some((path, buffers)) = precomputed_shuffle_buffers { + let shuffler = if let Some((path, buffers, schema)) = precomputed_shuffle_buffers { info!("Precomputed shuffle files provided, skip calculation of IVF partition."); let mut shuffler = IvfShuffler::try_new(num_partitions, Some(path), true)?; unsafe { - shuffler.set_unsorted_buffers(&buffers); + shuffler.set_unsorted_buffers(&buffers, schema); } shuffler @@ -193,6 +355,7 @@ pub async fn shuffle_dataset( pub async fn shuffle_vectors( filenames: Vec, + unsorted_schema: SchemaRef, dir_path: Path, ivf_centroids: FixedSizeListArray, ) -> Result> { @@ -202,7 +365,7 @@ pub async fn shuffle_vectors( let mut shuffler = IvfShuffler::try_new(num_partitions, Some(dir_path), false)?; unsafe { - shuffler.set_unsorted_buffers(&filenames); + shuffler.set_unsorted_buffers(&filenames, unsorted_schema); } let partition_files = shuffler @@ -212,11 +375,15 @@ pub async fn shuffle_vectors( Ok(partition_files) } +#[derive(Clone)] pub struct IvfShuffler { unsorted_buffers: Vec, num_partitions: u32, + // The schema of the partition files. Only initialized after the unsorted parts are written + schema: Option, + output_dir: Path, // whether the lance file is v1 (legacy) or v2 @@ -245,6 +412,7 @@ impl IvfShuffler { output_dir, unsorted_buffers: vec![], is_legacy, + schema: None, }) } @@ -253,8 +421,13 @@ impl IvfShuffler { /// # Safety /// /// user must ensure the buffers are valid. - pub unsafe fn set_unsorted_buffers(&mut self, unsorted_buffers: &[impl ToString]) { + pub unsafe fn set_unsorted_buffers( + &mut self, + unsorted_buffers: &[impl ToString], + schema: SchemaRef, + ) { self.unsorted_buffers = unsorted_buffers.iter().map(|x| x.to_string()).collect(); + self.schema = Some(schema); } pub async fn write_unsorted_stream( @@ -275,6 +448,7 @@ impl IvfShuffler { return Err(Error::io("empty stream".to_string(), location!())); } }; + self.schema = Some(schema.clone()); // validate the schema, // we need to have row ID and partition ID column @@ -304,7 +478,7 @@ impl IvfShuffler { file_writer.finish().await?; unsafe { - self.set_unsorted_buffers(&[UNSORTED_BUFFER]); + self.set_unsorted_buffers(&[UNSORTED_BUFFER], schema); } Ok(()) @@ -401,47 +575,17 @@ impl IvfShuffler { Ok(partition_sizes) } - async fn process_batch_in_shuffle( - batch: RecordBatch, - partitioned_batches: &mut [Vec], - ) -> Result<()> { - let part_ids: &UInt32Array = batch - .column_by_name(PART_ID_COLUMN) - .expect("Partition ID column not found") - .as_primitive(); - let indices = sort_to_indices(&part_ids, None, None)?; - let batch = batch.take(&indices)?; - - let mut start = 0; - while start < batch.num_rows() { - let part_id = part_ids.value(indices.value(start) as usize); - let mut end = start + 1; - while end < batch.num_rows() && part_ids.value(indices.value(end) as usize) == part_id { - end += 1; - } - - let part_batches = &mut partitioned_batches[part_id as usize]; - part_batches.push(batch.slice(start, end - start)); - start = end; - } - - Ok(()) - } - async fn shuffle_to_partitions( &self, + schema: &arrow_schema::Schema, inputs: &[ShuffleInput], partition_size: Vec, num_batches_to_sort: usize, - ) -> Result>> { - let mut partitioned_batches = Vec::with_capacity(partition_size.len()); - for _ in 0..partition_size.len() { - partitioned_batches.push(Vec::new()); - } - + ) -> Result> { info!("Shuffling into memory"); let mut num_processed = 0; + let mut partitions_builder = PartitionListBuilder::new(schema, partition_size); for &ShuffleInput { file_idx, @@ -452,31 +596,19 @@ impl IvfShuffler { let object_store = ObjectStore::local(); let file_name = &self.unsorted_buffers[file_idx]; let path = self.output_dir.child(file_name.as_str()); + let mut _reader_handle = None; - if self.is_legacy { - let reader = FileReader::try_new_self_described(&object_store, &path, None).await?; + let mut stream = if self.is_legacy { + _reader_handle = + Some(FileReader::try_new_self_described(&object_store, &path, None).await?); - let mut stream = stream::iter(start..end) + stream::iter(start..end) .map(|i| { + let reader = _reader_handle.as_ref().unwrap(); reader.read_batch(i as i32, ReadBatchParams::RangeFull, reader.schema()) }) - .buffered(16); - - while let Some(batch) = stream.next().await { - if num_processed % 100 == 0 { - info!("Shuffle Progress {}/{}", num_processed, num_batches_to_sort); - } - num_processed += 1; - - let batch = batch?; - - // skip empty batches - if batch.num_rows() == 0 { - continue; - } - - Self::process_batch_in_shuffle(batch, &mut partitioned_batches).await?; - } + .buffered(16) + .boxed() } else { let scheduler = ScanScheduler::new( Arc::new(object_store), @@ -484,7 +616,7 @@ impl IvfShuffler { ); let file = scheduler.open_file(&path).await?; let reader = Lancev2FileReader::try_open(file, None, Default::default()).await?; - let mut stream = reader + reader .read_stream( lance_io::ReadBatchParams::Range( (start * SHUFFLE_BATCH_SIZE)..(end * SHUFFLE_BATCH_SIZE), @@ -492,28 +624,42 @@ impl IvfShuffler { SHUFFLE_BATCH_SIZE as u32, 16, FilterExpression::no_filter(), - ) - .unwrap(); + )? + .boxed() + }; - while let Some(batch) = stream.next().await { - if num_processed % 100 == 0 { - info!("Shuffle Progress {}/{}", num_processed, num_batches_to_sort); - } - num_processed += 1; + while let Some(batch) = stream.next().await { + if num_processed % 100 == 0 { + info!("Shuffle Progress {}/{}", num_processed, num_batches_to_sort); + } + num_processed += 1; - let batch = batch?; + let batch = batch?; - // skip empty batches - if batch.num_rows() == 0 { - continue; - } + if batch.num_rows() == 0 { + continue; + } + + let part_ids: &UInt32Array = batch[PART_ID_COLUMN].as_primitive(); + let indices = sort_to_indices(&part_ids, None, None)?; + let batch = batch.take(&indices)?; + + let sorted_part_ids: &UInt32Array = batch[PART_ID_COLUMN].as_primitive(); - Self::process_batch_in_shuffle(batch, &mut partitioned_batches).await?; + let mut start = 0; + let mut prev_id = sorted_part_ids.value(0); + for (idx, part_id) in sorted_part_ids.values().iter().enumerate() { + if *part_id != prev_id { + partitions_builder.extend(&batch.slice(start, idx - start)); + start = idx; + prev_id = *part_id; + } } + partitions_builder.extend(&batch.slice(start, sorted_part_ids.len() - start)); } } - Ok(partitioned_batches) + partitions_builder.finish() } pub async fn write_partitioned_shuffles( @@ -523,6 +669,11 @@ impl IvfShuffler { ) -> Result> { let num_batches = self.total_batches().await?; let total_batches = num_batches.iter().sum(); + let schema = self + .schema + .as_ref() + .expect("unsorted data not written yet so schema not initialized") + .clone(); info!( "Sorting unsorted data into sorted chunks (batches_per_chunk={} concurrent_jobs={})", @@ -530,87 +681,102 @@ impl IvfShuffler { ); stream::iter((0..total_batches).step_by(batches_per_partition)) .zip(stream::repeat(num_batches)) - .map(|(i, num_batches)| async move { - // first, calculate which files and ranges needs to be processed - let start = i; - let end = std::cmp::min(i + batches_per_partition, total_batches); - let num_batches_to_sort = end - start; - let mut input = vec![]; - - let mut cumulative_size = 0; - for (file_idx, partition_size) in num_batches.iter().enumerate() { - let cur_start = cumulative_size; - let cur_end = cumulative_size + partition_size; - - cumulative_size += partition_size; - - let should_include_file = start < cur_end && end > cur_start; + .map(|(i, num_batches)| { + let schema = schema.clone(); + let this = self.clone(); + tokio::spawn(async move { + let schema = schema.clone(); + // first, calculate which files and ranges needs to be processed + let start = i; + let end = std::cmp::min(i + batches_per_partition, total_batches); + let num_batches_to_sort = end - start; + let mut input = vec![]; + + let mut cumulative_size = 0; + for (file_idx, partition_size) in num_batches.iter().enumerate() { + let cur_start = cumulative_size; + let cur_end = cumulative_size + partition_size; + + cumulative_size += partition_size; + + let should_include_file = start < cur_end && end > cur_start; + + if !should_include_file { + continue; + } - if !should_include_file { - continue; - } + // the currnet part doesn't overlap with the current batch + if start >= cur_end { + continue; + } - // the currnet part doesn't overlap with the current batch - if start >= cur_end { - continue; + let local_start = if start < cur_start { + 0 + } else { + start - cur_start + }; + let local_end = std::cmp::min(end - cur_start, *partition_size); + + input.push(ShuffleInput { + file_idx, + start: local_start, + end: local_end, + }); } - let local_start = if start < cur_start { - 0 - } else { - start - cur_start - }; - let local_end = std::cmp::min(end - cur_start, *partition_size); - - input.push(ShuffleInput { - file_idx, - start: local_start, - end: local_end, - }); - } - - // second, count the number of rows in each partition - let size_counts = self.count_partition_size(&input).await?; - - // third, shuffle the data into each partition - let shuffled = self - .shuffle_to_partitions(&input, size_counts, num_batches_to_sort) - .await?; - let schema = shuffled - .iter() - .find(|batches| !batches.is_empty()) - .ok_or(Error::io("empty input to shuffle".to_owned(), location!()))?[0] - .schema(); - - // finally, write the shuffled data to disk - let object_store = ObjectStore::local(); - let output_file = format!("sorted_{}.lance", i); - let path = self.output_dir.child(output_file.clone()); - let writer = object_store.create(&path).await?; - - info!( - "Chunk loaded into memory and sorted, writing to disk at {}", - path - ); - - // TODO: The result can be lance v1 or v2. Currently it is v1 - let mut file_writer = FileWriter::::with_object_writer( - writer, - Schema::try_from(schema.as_ref())?, - &Default::default(), - )?; - - for batches_and_idx in shuffled.into_iter().enumerate() { - let (idx, batches) = batches_and_idx; - if idx % 1000 == 0 { - info!("Writing partition {}/{}", idx, self.num_partitions); + // second, count the number of rows in each partition + let size_counts = this.count_partition_size(&input).await?; + + // third, shuffle the data into each partition + let shuffled = this + .shuffle_to_partitions( + schema.as_ref(), + &input, + size_counts, + num_batches_to_sort, + ) + .await?; + + // finally, write the shuffled data to disk + let object_store = ObjectStore::local(); + let output_file = format!("sorted_{}.lance", i); + let path = this.output_dir.child(output_file.clone()); + let writer = object_store.create(&path).await?; + + info!( + "Chunk loaded into memory and sorted, writing to disk at {}", + path + ); + + let sorted_file_schema = Arc::new(arrow_schema::Schema::new(vec![Field::new( + "partitions", + shuffled.first().unwrap().data_type().clone(), + true, + )])); + let lance_schema = Schema::try_from(sorted_file_schema.as_ref())?; + let mut file_writer = lance_file::v2::writer::FileWriter::try_new( + writer, + lance_schema, + FileWriterOptions::default(), + )?; + + for partition_and_idx in shuffled.into_iter().enumerate() { + let (idx, partition) = partition_and_idx; + if idx % 1000 == 0 { + info!("Writing partition {}/{}", idx, this.num_partitions); + } + let batch = RecordBatch::try_new( + sorted_file_schema.clone(), + vec![Arc::new(partition)], + )?; + file_writer.write_batch(&batch).await?; } - file_writer.write(&batches).await?; - } - file_writer.finish().await?; + file_writer.finish().await?; - Ok(output_file) as Result + Ok(output_file) as Result + }) + .map(|join_res| join_res.unwrap()) }) .buffered(concurrent_jobs) .try_collect() @@ -625,19 +791,42 @@ impl IvfShuffler { let mut streams = vec![]; for file in files { - let object_store = ObjectStore::local(); + let object_store = Arc::new(ObjectStore::local()); let path = self.output_dir.child(file); - let reader = FileReader::try_new_self_described(&object_store, &path, None).await?; - let reader = Arc::new(reader); - - let stream = stream::iter(0..reader.num_batches()) - .zip(stream::repeat(reader)) - .map(|(i, reader)| async move { - reader - .read_batch(i as i32, ReadBatchParams::RangeFull, reader.schema()) - .await - }) - .buffered(4); + let scan_scheduler = ScanScheduler::new( + object_store, + SchedulerConfig::fast_and_not_too_ram_intensive(), + ); + let file_scheduler = scan_scheduler.open_file(&path).await?; + let reader = lance_file::v2::reader::FileReader::try_open( + file_scheduler, + None, + DecoderMiddlewareChain::default(), + ) + .await?; + let stream = reader + .read_stream( + ReadBatchParams::RangeFull, + /*batch_size=*/ 1, + /*batch_readahead=*/ 32, + FilterExpression::no_filter(), + )? + .and_then(|batch| { + let list_array = batch + .column(0) + .as_any() + .downcast_ref::() + .expect("ListArray expected"); + let struct_array = list_array + .values() + .as_any() + .downcast_ref::() + .expect("StructArray expected") + .clone(); + let batch: RecordBatch = struct_array.into(); + std::future::ready(Ok(batch)) + }); + streams.push(stream); } @@ -655,12 +844,13 @@ mod test { use lance_arrow::FixedSizeListArrayExt; use lance_core::ROW_ID_FIELD; use lance_io::stream::RecordBatchStreamAdapter; + use rand::RngCore; use crate::vector::PQ_CODE_COLUMN; use super::*; - fn make_schema() -> Arc { + fn make_schema(pq_dim: u32) -> Arc { Arc::new(arrow_schema::Schema::new(vec![ ROW_ID_FIELD.clone(), arrow_schema::Field::new(PART_ID_COLUMN, DataType::UInt32, true), @@ -668,7 +858,7 @@ mod test { PQ_CODE_COLUMN, DataType::FixedSizeList( Arc::new(arrow_schema::Field::new("item", DataType::UInt8, true)), - 32, + pq_dim as i32, ), false, ), @@ -678,7 +868,7 @@ mod test { fn make_stream_and_shuffler( include_empty_batches: bool, ) -> (impl RecordBatchStream, IvfShuffler) { - let schema = make_schema(); + let schema = make_schema(32); let schema2 = schema.clone(); @@ -838,10 +1028,13 @@ mod test { #[tokio::test] async fn test_shuffler_multi_buffer_single_partition() { let (stream, mut shuffler) = make_stream_and_shuffler(false); + let unsorted_schema = stream.schema(); shuffler.write_unsorted_stream(stream).await.unwrap(); // set the same buffer twice we should get double the data - unsafe { shuffler.set_unsorted_buffers(&[UNSORTED_BUFFER, UNSORTED_BUFFER]) } + unsafe { + shuffler.set_unsorted_buffers(&[UNSORTED_BUFFER, UNSORTED_BUFFER], unsorted_schema) + } let partition_files = shuffler.write_partitioned_shuffles(200, 1).await.unwrap(); @@ -868,10 +1061,13 @@ mod test { #[tokio::test] async fn test_shuffler_multi_buffer_multi_partition() { let (stream, mut shuffler) = make_stream_and_shuffler(false); + let unsorted_schema = stream.schema(); shuffler.write_unsorted_stream(stream).await.unwrap(); // set the same buffer twice we should get double the data - unsafe { shuffler.set_unsorted_buffers(&[UNSORTED_BUFFER, UNSORTED_BUFFER]) } + unsafe { + shuffler.set_unsorted_buffers(&[UNSORTED_BUFFER, UNSORTED_BUFFER], unsorted_schema) + } let partition_files = shuffler.write_partitioned_shuffles(1, 32).await.unwrap(); assert_eq!(partition_files.len(), 200); @@ -893,4 +1089,81 @@ mod test { assert_eq!(num_batches, 200); } + + fn make_big_stream_and_shuffler( + num_batches: u32, + num_partitions: u32, + pq_dim: u32, + ) -> (impl RecordBatchStream, IvfShuffler) { + let schema = make_schema(pq_dim); + + let schema2 = schema.clone(); + + let stream = stream::iter(0..num_batches).map(move |idx| { + let mut rng = rand::thread_rng(); + let row_ids = Arc::new(UInt64Array::from_iter( + (idx * 1024..(idx + 1) * 1024).map(u64::from), + )); + + let part_id = Arc::new(UInt32Array::from_iter( + (idx * 1024..(idx + 1) * 1024).map(|_| rng.next_u32() % num_partitions), + )); + + let values = Arc::new(UInt8Array::from_iter((0..pq_dim * 1024).map(|_| idx as u8))); + let pq_codes = Arc::new( + FixedSizeListArray::try_new_from_values(values as Arc, pq_dim as i32) + .unwrap(), + ); + + Ok(RecordBatch::try_new(schema2.clone(), vec![row_ids, part_id, pq_codes]).unwrap()) + }); + + let stream = RecordBatchStreamAdapter::new(schema.clone(), stream); + + let shuffler = IvfShuffler::try_new(num_partitions, None, true).unwrap(); + + (stream, shuffler) + } + + // Change NUM_BATCHES = 1000 * 1024 and NUM_PARTITIONS to 35000 to test 1B shuffle + const NUM_BATCHES: u32 = 1 * 100; + const NUM_PARTITIONS: u32 = 1000; + const PQ_DIM: u32 = 48; + const BATCHES_PER_PARTITION: u32 = 10200; + const NUM_CONCURRENT_JOBS: u32 = 16; + + #[test_log::test(tokio::test(flavor = "multi_thread"))] + async fn test_big_shuffle() { + let (stream, mut shuffler) = + make_big_stream_and_shuffler(NUM_BATCHES, NUM_PARTITIONS, PQ_DIM); + + shuffler.write_unsorted_stream(stream).await.unwrap(); + let partition_files = shuffler + .write_partitioned_shuffles( + BATCHES_PER_PARTITION as usize, + NUM_CONCURRENT_JOBS as usize, + ) + .await + .unwrap(); + + let expected_num_part_files = NUM_BATCHES.div_ceil(BATCHES_PER_PARTITION); + + assert_eq!(partition_files.len(), expected_num_part_files as usize); + + let mut result_stream = shuffler + .load_partitioned_shuffles(partition_files) + .await + .unwrap(); + + let mut num_batches = 0; + result_stream.reverse(); + + while let Some(mut stream) = result_stream.pop() { + while let Some(_) = stream.next().await { + num_batches += 1 + } + } + + assert_eq!(num_batches, NUM_PARTITIONS * expected_num_part_files); + } } diff --git a/rust/lance/src/index/vector/ivf.rs b/rust/lance/src/index/vector/ivf.rs index e18d85ae9b..553f021623 100644 --- a/rust/lance/src/index/vector/ivf.rs +++ b/rust/lance/src/index/vector/ivf.rs @@ -16,7 +16,7 @@ use arrow_array::{ Array, FixedSizeListArray, PrimitiveArray, RecordBatch, StructArray, UInt32Array, }; use arrow_ord::sort::sort_to_indices; -use arrow_schema::{DataType, Schema}; +use arrow_schema::{DataType, Schema, SchemaRef}; use arrow_select::{concat::concat_batches, take::take}; use async_trait::async_trait; use deepsize::DeepSizeOf; @@ -1369,7 +1369,7 @@ async fn write_ivf_pq_file( precomputed_partitions: Option>, shuffle_partition_batches: usize, shuffle_partition_concurrency: usize, - precomputed_shuffle_buffers: Option<(Path, Vec)>, + precomputed_shuffle_buffers: Option<(Path, Vec, SchemaRef)>, ) -> Result<()> { let path = index_dir.child(uuid).child(INDEX_FILE_NAME); let mut writer = object_store.create(&path).await?; @@ -1427,7 +1427,7 @@ async fn write_ivf_hnsw_file( precomputed_partitions: Option>, shuffle_partition_batches: usize, shuffle_partition_concurrency: usize, - precomputed_shuffle_buffers: Option<(Path, Vec)>, + precomputed_shuffle_buffers: Option<(Path, Vec, SchemaRef)>, ) -> Result<()> { let object_store = dataset.object_store(); let path = dataset.indices_dir().child(uuid).child(INDEX_FILE_NAME); diff --git a/rust/lance/src/index/vector/ivf/builder.rs b/rust/lance/src/index/vector/ivf/builder.rs index ac3114a4c0..5f9734a6ed 100644 --- a/rust/lance/src/index/vector/ivf/builder.rs +++ b/rust/lance/src/index/vector/ivf/builder.rs @@ -8,7 +8,7 @@ use std::sync::Arc; use arrow::array::AsArray; use arrow::datatypes::UInt64Type; use arrow_array::{FixedSizeListArray, RecordBatch, UInt32Array, UInt64Array}; -use arrow_schema::{DataType, Field as ArrowField}; +use arrow_schema::{DataType, Field as ArrowField, SchemaRef}; use futures::{StreamExt, TryStreamExt}; use lance_arrow::{RecordBatchExt, SchemaExt}; use lance_core::utils::address::RowAddress; @@ -57,7 +57,7 @@ pub(super) async fn build_partitions( precomputed_partitons: Option>, shuffle_partition_batches: usize, shuffle_partition_concurrency: usize, - precomputed_shuffle_buffers: Option<(Path, Vec)>, + precomputed_shuffle_buffers: Option<(Path, Vec, SchemaRef)>, ) -> Result<()> { let schema = data.schema(); if schema.column_with_name(column).is_none() { @@ -257,7 +257,7 @@ pub(super) async fn build_hnsw_partitions( precomputed_partitions: Option>, shuffle_partition_batches: usize, shuffle_partition_concurrency: usize, - precomputed_shuffle_buffers: Option<(Path, Vec)>, + precomputed_shuffle_buffers: Option<(Path, Vec, SchemaRef)>, ) -> Result<(Vec, IvfModel)> { let schema = data.schema(); if schema.column_with_name(column).is_none() {