Skip to content

Commit

Permalink
Use v2 to write sorted part files
Browse files Browse the repository at this point in the history
Avoid expensive concatenation by building partitions in-place

Parallelize shuffling
  • Loading branch information
westonpace committed Aug 8, 2024
1 parent 34a9803 commit 2454f7e
Show file tree
Hide file tree
Showing 6 changed files with 478 additions and 195 deletions.
46 changes: 27 additions & 19 deletions rust/lance-file/src/v2/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use prost::Message;
use prost_types::Any;
use snafu::{location, Location};
use tokio::io::AsyncWriteExt;
use tracing::instrument;

use crate::datatypes::FieldsWithMeta;
use crate::format::pb;
Expand Down Expand Up @@ -175,6 +176,7 @@ impl FileWriter {
Ok(())
}

#[instrument(skip_all, level = "debug")]
async fn write_pages(
&mut self,
mut encoding_tasks: FuturesUnordered<EncodeTask>,
Expand Down Expand Up @@ -263,6 +265,30 @@ impl FileWriter {
Ok(self.schema.as_ref().unwrap())
}

#[instrument(skip_all, level = "debug")]
fn encode_batch(&mut self, batch: &RecordBatch) -> Result<Vec<Vec<EncodeTask>>> {
self.schema
.as_ref()
.unwrap()
.fields
.iter()
.zip(self.column_writers.iter_mut())
.map(|(field, column_writer)| {
let array = batch
.column_by_name(&field.name)
.ok_or(Error::InvalidInput {
source: format!(
"Cannot write batch. The batch was missing the column `{}`",
field.name
)
.into(),
location: location!(),
})?;
column_writer.maybe_encode(array.clone())
})
.collect::<Result<Vec<_>>>()
}

/// Schedule a batch of data to be written to the file
///
/// Note: the future returned by this method may complete before the data has been fully
Expand All @@ -273,7 +299,6 @@ impl FileWriter {
batch.get_array_memory_size()
);
self.ensure_initialized(batch)?;
let schema = self.schema.as_ref().unwrap();
let num_rows = batch.num_rows() as u64;
if num_rows == 0 {
return Ok(());
Expand All @@ -292,24 +317,7 @@ impl FileWriter {
};
// First we push each array into its column writer. This may or may not generate enough
// data to trigger an encoding task. We collect any encoding tasks into a queue.
let encoding_tasks = schema
.fields
.iter()
.zip(self.column_writers.iter_mut())
.map(|(field, column_writer)| {
let array = batch
.column_by_name(&field.name)
.ok_or(Error::InvalidInput {
source: format!(
"Cannot write batch. The batch was missing the column `{}`",
field.name
)
.into(),
location: location!(),
})?;
column_writer.maybe_encode(array.clone())
})
.collect::<Result<Vec<_>>>()?;
let encoding_tasks = self.encode_batch(batch)?;
let encoding_tasks = encoding_tasks
.into_iter()
.flatten()
Expand Down
1 change: 1 addition & 0 deletions rust/lance-index/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ criterion.workspace = true
lance-datagen.workspace = true
lance-testing.workspace = true
tempfile.workspace = true
test-log.workspace = true
datafusion-sql.workspace = true
random_word = { version = "0.4.3", features = ["en"] }

Expand Down
3 changes: 2 additions & 1 deletion rust/lance-index/src/vector/ivf/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::sync::Arc;

use arrow_array::cast::AsArray;
use arrow_array::{Array, FixedSizeListArray, UInt32Array, UInt64Array};
use arrow_schema::SchemaRef;
use futures::TryStreamExt;
use object_store::path::Path;
use snafu::{location, Location};
Expand Down Expand Up @@ -39,7 +40,7 @@ pub struct IvfBuildParams {
/// requires `centroids` to be set
///
/// The input is expected to be (/dir/to/buffers, [buffer1.lance, buffer2.lance, ...])
pub precomputed_shuffle_buffers: Option<(Path, Vec<String>)>,
pub precomputed_shuffle_buffers: Option<(Path, Vec<String>, SchemaRef)>,

pub shuffle_partition_batches: usize,

Expand Down
Loading

0 comments on commit 2454f7e

Please sign in to comment.