Skip to content

Commit

Permalink
Remove extra schema parameter. Tweak shuffling benchmark
Browse files Browse the repository at this point in the history
  • Loading branch information
westonpace committed Aug 8, 2024
1 parent 2454f7e commit eea8d5b
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 61 deletions.
12 changes: 8 additions & 4 deletions python/python/benchmarks/test_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,11 +187,15 @@ def test_transform_vectors_with_precomputed_parts(

@pytest.mark.benchmark(group="shuffle_vectors")
def test_shuffle_vectors(test_large_dataset, tmpdir, benchmark):
ivf = rand_ivf(test_dataset)
pq = rand_pq(test_dataset, ivf)
builder = IndicesBuilder(test_dataset, "vector")
ivf = rand_ivf(test_large_dataset)
pq = rand_pq(test_large_dataset, ivf)
builder = IndicesBuilder(test_large_dataset, "vector")
transformed_uri = str(tmpdir / "output.lance")
builder.transform_vectors(ivf, pq, transformed_uri)
part_ids_path = str(tmpdir / "part_ids")
gen_rand_part_ids(test_large_dataset, part_ids_path)
print("Transforming vectors")
builder.transform_vectors(ivf, pq, transformed_uri, None, part_ids_path)
print("Shuffling")
shuffle_out = str(tmpdir)
benchmark.pedantic(
builder.shuffle_transformed_vectors,
Expand Down
3 changes: 1 addition & 2 deletions rust/lance-index/src/vector/ivf/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use std::sync::Arc;

use arrow_array::cast::AsArray;
use arrow_array::{Array, FixedSizeListArray, UInt32Array, UInt64Array};
use arrow_schema::SchemaRef;
use futures::TryStreamExt;
use object_store::path::Path;
use snafu::{location, Location};
Expand Down Expand Up @@ -40,7 +39,7 @@ pub struct IvfBuildParams {
/// requires `centroids` to be set
///
/// The input is expected to be (/dir/to/buffers, [buffer1.lance, buffer2.lance, ...])
pub precomputed_shuffle_buffers: Option<(Path, Vec<String>, SchemaRef)>,
pub precomputed_shuffle_buffers: Option<(Path, Vec<String>)>,

pub shuffle_partition_batches: usize,

Expand Down
75 changes: 26 additions & 49 deletions rust/lance-index/src/vector/ivf/shuffler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use arrow::datatypes::UInt32Type;
use arrow_array::{cast::AsArray, types::UInt64Type, Array, RecordBatch, UInt32Array};
use arrow_array::{FixedSizeListArray, UInt8Array};
use arrow_array::{ListArray, StructArray, UInt64Array};
use arrow_schema::{DataType, Field, Fields, SchemaRef};
use arrow_schema::{DataType, Field, Fields};
use futures::stream::repeat_with;
use futures::{stream, FutureExt, Stream, StreamExt, TryStreamExt};
use lance_arrow::RecordBatchExt;
Expand Down Expand Up @@ -174,18 +174,14 @@ impl PartitionBuilder {

struct PartitionListBuilder {
partitions: Vec<Option<PartitionBuilder>>,
partition_sizes: Vec<u64>,
}

impl PartitionListBuilder {
fn new(schema: &arrow_schema::Schema, partition_sizes: Vec<u64>) -> Self {
fn new(partition_sizes: Vec<u64>) -> Self {
Self {
partitions: Vec::from_iter(partition_sizes.into_iter().map(|part_size| {
if part_size == 0 {
None
} else {
Some(PartitionBuilder::new(schema, part_size as usize))
}
})),
partitions: Vec::default(),
partition_sizes,
}
}

Expand All @@ -194,6 +190,17 @@ impl PartitionListBuilder {
return;
}

if self.partitions.is_empty() {
let schema = batch.schema();
self.partitions = Vec::from_iter(self.partition_sizes.iter().map(|part_size| {
if *part_size == 0 {
None
} else {
Some(PartitionBuilder::new(schema.as_ref(), *part_size as usize))
}
}))
}

let part_ids = batch
.column_by_name(PART_ID_COLUMN)
.unwrap()
Expand Down Expand Up @@ -243,14 +250,14 @@ pub async fn shuffle_dataset(
num_partitions: u32,
shuffle_partition_batches: usize,
shuffle_partition_concurrency: usize,
precomputed_shuffle_buffers: Option<(Path, Vec<String>, SchemaRef)>,
precomputed_shuffle_buffers: Option<(Path, Vec<String>)>,
) -> Result<Vec<impl Stream<Item = Result<RecordBatch>>>> {
// step 1: either use precomputed shuffle files or write shuffle data to a file
let shuffler = if let Some((path, buffers, schema)) = precomputed_shuffle_buffers {
let shuffler = if let Some((path, buffers)) = precomputed_shuffle_buffers {
info!("Precomputed shuffle files provided, skip calculation of IVF partition.");
let mut shuffler = IvfShuffler::try_new(num_partitions, Some(path), true)?;
unsafe {
shuffler.set_unsorted_buffers(&buffers, schema);
shuffler.set_unsorted_buffers(&buffers);
}

shuffler
Expand Down Expand Up @@ -355,7 +362,6 @@ pub async fn shuffle_dataset(

pub async fn shuffle_vectors(
filenames: Vec<String>,
unsorted_schema: SchemaRef,
dir_path: Path,
ivf_centroids: FixedSizeListArray,
) -> Result<Vec<String>> {
Expand All @@ -365,7 +371,7 @@ pub async fn shuffle_vectors(
let mut shuffler = IvfShuffler::try_new(num_partitions, Some(dir_path), false)?;

unsafe {
shuffler.set_unsorted_buffers(&filenames, unsorted_schema);
shuffler.set_unsorted_buffers(&filenames);
}

let partition_files = shuffler
Expand All @@ -381,9 +387,6 @@ pub struct IvfShuffler {

num_partitions: u32,

// The schema of the partition files. Only initialized after the unsorted parts are written
schema: Option<SchemaRef>,

output_dir: Path,

// whether the lance file is v1 (legacy) or v2
Expand Down Expand Up @@ -412,7 +415,6 @@ impl IvfShuffler {
output_dir,
unsorted_buffers: vec![],
is_legacy,
schema: None,
})
}

Expand All @@ -421,13 +423,8 @@ impl IvfShuffler {
/// # Safety
///
/// user must ensure the buffers are valid.
pub unsafe fn set_unsorted_buffers(
&mut self,
unsorted_buffers: &[impl ToString],
schema: SchemaRef,
) {
pub unsafe fn set_unsorted_buffers(&mut self, unsorted_buffers: &[impl ToString]) {
self.unsorted_buffers = unsorted_buffers.iter().map(|x| x.to_string()).collect();
self.schema = Some(schema);
}

pub async fn write_unsorted_stream(
Expand All @@ -448,7 +445,6 @@ impl IvfShuffler {
return Err(Error::io("empty stream".to_string(), location!()));
}
};
self.schema = Some(schema.clone());

// validate the schema,
// we need to have row ID and partition ID column
Expand Down Expand Up @@ -478,7 +474,7 @@ impl IvfShuffler {
file_writer.finish().await?;

unsafe {
self.set_unsorted_buffers(&[UNSORTED_BUFFER], schema);
self.set_unsorted_buffers(&[UNSORTED_BUFFER]);
}

Ok(())
Expand Down Expand Up @@ -577,15 +573,14 @@ impl IvfShuffler {

async fn shuffle_to_partitions(
&self,
schema: &arrow_schema::Schema,
inputs: &[ShuffleInput],
partition_size: Vec<u64>,
num_batches_to_sort: usize,
) -> Result<Vec<ListArray>> {
info!("Shuffling into memory");

let mut num_processed = 0;
let mut partitions_builder = PartitionListBuilder::new(schema, partition_size);
let mut partitions_builder = PartitionListBuilder::new(partition_size);

for &ShuffleInput {
file_idx,
Expand Down Expand Up @@ -669,11 +664,6 @@ impl IvfShuffler {
) -> Result<Vec<String>> {
let num_batches = self.total_batches().await?;
let total_batches = num_batches.iter().sum();
let schema = self
.schema
.as_ref()
.expect("unsorted data not written yet so schema not initialized")
.clone();

info!(
"Sorting unsorted data into sorted chunks (batches_per_chunk={} concurrent_jobs={})",
Expand All @@ -682,10 +672,8 @@ impl IvfShuffler {
stream::iter((0..total_batches).step_by(batches_per_partition))
.zip(stream::repeat(num_batches))
.map(|(i, num_batches)| {
let schema = schema.clone();
let this = self.clone();
tokio::spawn(async move {
let schema = schema.clone();
// first, calculate which files and ranges needs to be processed
let start = i;
let end = std::cmp::min(i + batches_per_partition, total_batches);
Expand Down Expand Up @@ -729,12 +717,7 @@ impl IvfShuffler {

// third, shuffle the data into each partition
let shuffled = this
.shuffle_to_partitions(
schema.as_ref(),
&input,
size_counts,
num_batches_to_sort,
)
.shuffle_to_partitions(&input, size_counts, num_batches_to_sort)
.await?;

// finally, write the shuffled data to disk
Expand Down Expand Up @@ -1028,13 +1011,10 @@ mod test {
#[tokio::test]
async fn test_shuffler_multi_buffer_single_partition() {
let (stream, mut shuffler) = make_stream_and_shuffler(false);
let unsorted_schema = stream.schema();
shuffler.write_unsorted_stream(stream).await.unwrap();

// set the same buffer twice we should get double the data
unsafe {
shuffler.set_unsorted_buffers(&[UNSORTED_BUFFER, UNSORTED_BUFFER], unsorted_schema)
}
unsafe { shuffler.set_unsorted_buffers(&[UNSORTED_BUFFER, UNSORTED_BUFFER]) }

let partition_files = shuffler.write_partitioned_shuffles(200, 1).await.unwrap();

Expand All @@ -1061,13 +1041,10 @@ mod test {
#[tokio::test]
async fn test_shuffler_multi_buffer_multi_partition() {
let (stream, mut shuffler) = make_stream_and_shuffler(false);
let unsorted_schema = stream.schema();
shuffler.write_unsorted_stream(stream).await.unwrap();

// set the same buffer twice we should get double the data
unsafe {
shuffler.set_unsorted_buffers(&[UNSORTED_BUFFER, UNSORTED_BUFFER], unsorted_schema)
}
unsafe { shuffler.set_unsorted_buffers(&[UNSORTED_BUFFER, UNSORTED_BUFFER]) }

let partition_files = shuffler.write_partitioned_shuffles(1, 32).await.unwrap();
assert_eq!(partition_files.len(), 200);
Expand Down
6 changes: 3 additions & 3 deletions rust/lance/src/index/vector/ivf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use arrow_array::{
Array, FixedSizeListArray, PrimitiveArray, RecordBatch, StructArray, UInt32Array,
};
use arrow_ord::sort::sort_to_indices;
use arrow_schema::{DataType, Schema, SchemaRef};
use arrow_schema::{DataType, Schema};
use arrow_select::{concat::concat_batches, take::take};
use async_trait::async_trait;
use deepsize::DeepSizeOf;
Expand Down Expand Up @@ -1369,7 +1369,7 @@ async fn write_ivf_pq_file(
precomputed_partitions: Option<HashMap<u64, u32>>,
shuffle_partition_batches: usize,
shuffle_partition_concurrency: usize,
precomputed_shuffle_buffers: Option<(Path, Vec<String>, SchemaRef)>,
precomputed_shuffle_buffers: Option<(Path, Vec<String>)>,
) -> Result<()> {
let path = index_dir.child(uuid).child(INDEX_FILE_NAME);
let mut writer = object_store.create(&path).await?;
Expand Down Expand Up @@ -1427,7 +1427,7 @@ async fn write_ivf_hnsw_file(
precomputed_partitions: Option<HashMap<u64, u32>>,
shuffle_partition_batches: usize,
shuffle_partition_concurrency: usize,
precomputed_shuffle_buffers: Option<(Path, Vec<String>, SchemaRef)>,
precomputed_shuffle_buffers: Option<(Path, Vec<String>)>,
) -> Result<()> {
let object_store = dataset.object_store();
let path = dataset.indices_dir().child(uuid).child(INDEX_FILE_NAME);
Expand Down
6 changes: 3 additions & 3 deletions rust/lance/src/index/vector/ivf/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::sync::Arc;
use arrow::array::AsArray;
use arrow::datatypes::UInt64Type;
use arrow_array::{FixedSizeListArray, RecordBatch, UInt32Array, UInt64Array};
use arrow_schema::{DataType, Field as ArrowField, SchemaRef};
use arrow_schema::{DataType, Field as ArrowField};
use futures::{StreamExt, TryStreamExt};
use lance_arrow::{RecordBatchExt, SchemaExt};
use lance_core::utils::address::RowAddress;
Expand Down Expand Up @@ -57,7 +57,7 @@ pub(super) async fn build_partitions(
precomputed_partitons: Option<HashMap<u64, u32>>,
shuffle_partition_batches: usize,
shuffle_partition_concurrency: usize,
precomputed_shuffle_buffers: Option<(Path, Vec<String>, SchemaRef)>,
precomputed_shuffle_buffers: Option<(Path, Vec<String>)>,
) -> Result<()> {
let schema = data.schema();
if schema.column_with_name(column).is_none() {
Expand Down Expand Up @@ -257,7 +257,7 @@ pub(super) async fn build_hnsw_partitions(
precomputed_partitions: Option<HashMap<u64, u32>>,
shuffle_partition_batches: usize,
shuffle_partition_concurrency: usize,
precomputed_shuffle_buffers: Option<(Path, Vec<String>, SchemaRef)>,
precomputed_shuffle_buffers: Option<(Path, Vec<String>)>,
) -> Result<(Vec<HnswMetadata>, IvfModel)> {
let schema = data.schema();
if schema.column_with_name(column).is_none() {
Expand Down

0 comments on commit eea8d5b

Please sign in to comment.