Skip to content

Commit

Permalink
perf: split MaterializeIndex stream into batches (#2770)
Browse files Browse the repository at this point in the history
Fixes #2768

After these changes, we can do the query using <2GB of RAM instead of
33GB! 🚀

<img width="1407" alt="Screenshot 2024-08-21 at 3 41 10 PM"
src="https://github.com/user-attachments/assets/a89c4519-4fd8-492e-8786-01be28264dc1">
  • Loading branch information
wjones127 committed Sep 4, 2024
1 parent c509ed3 commit 9c42903
Showing 1 changed file with 7 additions and 2 deletions.
9 changes: 7 additions & 2 deletions rust/lance/src/io/exec/scalar_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use lance_core::{
},
Error, Result, ROW_ID_FIELD,
};
use lance_datafusion::chunker::break_stream;
use lance_index::{
scalar::{
expression::{ScalarIndexExpr, ScalarIndexLoader},
Expand Down Expand Up @@ -409,7 +410,6 @@ impl MaterializeIndexExec {
dataset: Arc<Dataset>,
fragments: Arc<Vec<Fragment>>,
) -> Result<RecordBatch> {
// TODO: multiple batches, stream without materializing all row ids in memory
let mask = expr.evaluate(dataset.as_ref());
let span = debug_span!("create_prefilter");
let prefilter = span.in_scope(|| {
Expand Down Expand Up @@ -582,9 +582,14 @@ impl ExecutionPlan for MaterializeIndexExec {
.then(|batch_fut| batch_fut.map_err(|err| err.into()))
.boxed()
as BoxStream<'static, datafusion::common::Result<RecordBatch>>;
Ok(Box::pin(RecordBatchStreamAdapter::new(
let stream = Box::pin(RecordBatchStreamAdapter::new(
MATERIALIZE_INDEX_SCHEMA.clone(),
stream,
));
let stream = break_stream(stream, 64 * 1024);
Ok(Box::pin(RecordBatchStreamAdapter::new(
MATERIALIZE_INDEX_SCHEMA.clone(),
stream.map_err(|err| err.into()),
)))
}

Expand Down

0 comments on commit 9c42903

Please sign in to comment.