Skip to content

Commit

Permalink
feat: make scalar index training configurable (#2686)
Browse files Browse the repository at this point in the history
Closes #2661.

This PR adds a `scan_unordered_chunks` method to the `TrainingSource`
trait and provides basic implementations where required.
`train_bitmap_index` has been updated to take advantage of this new
method. Please advise if any other indexing algorithms do not require
sorted columns and I'll update accordingly.
  • Loading branch information
dsgibbons committed Aug 5, 2024
1 parent 9c292ac commit 2480164
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 17 deletions.
2 changes: 1 addition & 1 deletion rust/lance-index/src/scalar/bitmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ pub async fn train_bitmap_index(
data_source: Box<dyn TrainingSource + Send>,
index_store: &dyn IndexStore,
) -> Result<()> {
let batches_source = data_source.scan_ordered_chunks(4096).await?;
let batches_source = data_source.scan_unordered_chunks(4096).await?;

// mapping from item to list of the row ids where it is present
let dictionary: HashMap<ScalarValue, RowIdTreeMap> = HashMap::new();
Expand Down
20 changes: 20 additions & 0 deletions rust/lance-index/src/scalar/btree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1064,6 +1064,18 @@ pub trait TrainingSource: Send {
self: Box<Self>,
chunk_size: u32,
) -> Result<SendableRecordBatchStream>;

/// Returns a stream of batches
///
/// Each batch should have chunk_size rows
///
/// The schema for the batch is slightly flexible.
/// The first column may have any name or type, these are the values to index
/// The second column must be the row ids which must be UInt64Type
async fn scan_unordered_chunks(
self: Box<Self>,
chunk_size: u32,
) -> Result<SendableRecordBatchStream>;
}

/// Train a btree index from a stream of sorted page-size batches of values and row ids
Expand Down Expand Up @@ -1153,6 +1165,14 @@ impl TrainingSource for BTreeUpdater {
)?;
Ok(chunk_concat_stream(unchunked, chunk_size as usize))
}

async fn scan_unordered_chunks(
self: Box<Self>,
_chunk_size: u32,
) -> Result<SendableRecordBatchStream> {
// BTree indices will never use unordered scans
unimplemented!()
}
}

/// A stream that reads the original training data back out of the index
Expand Down
37 changes: 26 additions & 11 deletions rust/lance-index/src/scalar/label_list.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

use std::{any::Any, collections::HashMap, fmt::Debug, sync::Arc};
use std::{any::Any, collections::HashMap, fmt::Debug, pin::Pin, sync::Arc};

use arrow::array::AsArray;
use arrow_array::{Array, RecordBatch, UInt64Array};
use arrow_schema::{DataType, Field, Fields, Schema, SchemaRef};
use async_trait::async_trait;
use datafusion::execution::RecordBatchStream;
use datafusion::physical_plan::{stream::RecordBatchStreamAdapter, SendableRecordBatchStream};
use datafusion_common::ScalarValue;
use deepsize::DeepSizeOf;
Expand Down Expand Up @@ -271,17 +272,31 @@ impl TrainingSource for UnnestTrainingSource {
chunk_size: u32,
) -> Result<SendableRecordBatchStream> {
let source = self.source.scan_ordered_chunks(chunk_size).await?;
let unnest_schema = unnest_schema(source.schema().as_ref());
let unnest_schema_copy = unnest_schema.clone();
let source = source.try_filter_map(move |batch| {
std::future::ready(Some(unnest_batch(batch, unnest_schema.clone())).transpose())
});

Ok(Box::pin(RecordBatchStreamAdapter::new(
unnest_schema_copy.clone(),
source,
)))
unnest_chunks(source)
}

async fn scan_unordered_chunks(
self: Box<Self>,
chunk_size: u32,
) -> Result<SendableRecordBatchStream> {
let source = self.source.scan_unordered_chunks(chunk_size).await?;
unnest_chunks(source)
}
}

fn unnest_chunks(
source: Pin<Box<dyn RecordBatchStream + Send>>,
) -> Result<SendableRecordBatchStream> {
let unnest_schema = unnest_schema(source.schema().as_ref());
let unnest_schema_copy = unnest_schema.clone();
let source = source.try_filter_map(move |batch| {
std::future::ready(Some(unnest_batch(batch, unnest_schema.clone())).transpose())
});

Ok(Box::pin(RecordBatchStreamAdapter::new(
unnest_schema_copy.clone(),
source,
)))
}

/// Trains a new label list index
Expand Down
7 changes: 7 additions & 0 deletions rust/lance-index/src/scalar/lance_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,13 @@ mod tests {
) -> Result<SendableRecordBatchStream> {
Ok(self.data)
}

async fn scan_unordered_chunks(
self: Box<Self>,
_chunk_size: u32,
) -> Result<SendableRecordBatchStream> {
Ok(self.data)
}
}

async fn train_index(
Expand Down
7 changes: 7 additions & 0 deletions rust/lance/benches/scalar_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,13 @@ impl TrainingSource for BenchmarkDataSource {
) -> Result<SendableRecordBatchStream> {
Ok(reader_to_stream(Box::new(Self::test_data())))
}

async fn scan_unordered_chunks(
self: Box<Self>,
_chunk_size: u32,
) -> Result<SendableRecordBatchStream> {
Ok(reader_to_stream(Box::new(Self::test_data())))
}
}

impl BenchmarkFixture {
Expand Down
31 changes: 26 additions & 5 deletions rust/lance/src/index/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,22 +38,43 @@ impl TrainingSource for TrainingRequest {
async fn scan_ordered_chunks(
self: Box<Self>,
chunk_size: u32,
) -> Result<SendableRecordBatchStream> {
self.scan_chunks(chunk_size, true).await
}

async fn scan_unordered_chunks(
self: Box<Self>,
chunk_size: u32,
) -> Result<SendableRecordBatchStream> {
self.scan_chunks(chunk_size, false).await
}
}

impl TrainingRequest {
async fn scan_chunks(
self: Box<Self>,
chunk_size: u32,
sort: bool,
) -> Result<SendableRecordBatchStream> {
let mut scan = self.dataset.scan();

let ordering = match sort {
true => Some(vec![ColumnOrdering::asc_nulls_first(self.column.clone())]),
false => None,
};

let scan = scan
.with_row_id()
.order_by(Some(vec![ColumnOrdering::asc_nulls_first(
self.column.clone(),
)]))?
.order_by(ordering)?
.project(&[&self.column])?;

let ordered_batches = scan
let batches = scan
.try_into_dfstream(LanceExecutionOptions {
use_spilling: true,
..Default::default()
})
.await?;
Ok(chunk_concat_stream(ordered_batches, chunk_size as usize))
Ok(chunk_concat_stream(batches, chunk_size as usize))
}
}

Expand Down

0 comments on commit 2480164

Please sign in to comment.