Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: make scalar index training configurable #2686

Merged
merged 5 commits into from
Aug 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rust/lance-index/src/scalar/bitmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ pub async fn train_bitmap_index(
data_source: Box<dyn TrainingSource + Send>,
index_store: &dyn IndexStore,
) -> Result<()> {
let batches_source = data_source.scan_ordered_chunks(4096).await?;
let batches_source = data_source.scan_unordered_chunks(4096).await?;

// mapping from item to list of the row ids where it is present
let dictionary: HashMap<ScalarValue, RowIdTreeMap> = HashMap::new();
Expand Down
20 changes: 20 additions & 0 deletions rust/lance-index/src/scalar/btree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1064,6 +1064,18 @@ pub trait TrainingSource: Send {
self: Box<Self>,
chunk_size: u32,
) -> Result<SendableRecordBatchStream>;

/// Returns a stream of batches
///
/// Each batch should have chunk_size rows
///
/// The schema for the batch is slightly flexible.
/// The first column may have any name or type, these are the values to index
/// The second column must be the row ids which must be UInt64Type
async fn scan_unordered_chunks(
self: Box<Self>,
chunk_size: u32,
) -> Result<SendableRecordBatchStream>;
}

/// Train a btree index from a stream of sorted page-size batches of values and row ids
Expand Down Expand Up @@ -1153,6 +1165,14 @@ impl TrainingSource for BTreeUpdater {
)?;
Ok(chunk_concat_stream(unchunked, chunk_size as usize))
}

async fn scan_unordered_chunks(
self: Box<Self>,
_chunk_size: u32,
) -> Result<SendableRecordBatchStream> {
// BTree indices will never use unordered scans
unimplemented!()
}
}

/// A stream that reads the original training data back out of the index
Expand Down
37 changes: 26 additions & 11 deletions rust/lance-index/src/scalar/label_list.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

use std::{any::Any, collections::HashMap, fmt::Debug, sync::Arc};
use std::{any::Any, collections::HashMap, fmt::Debug, pin::Pin, sync::Arc};

use arrow::array::AsArray;
use arrow_array::{Array, RecordBatch, UInt64Array};
use arrow_schema::{DataType, Field, Fields, Schema, SchemaRef};
use async_trait::async_trait;
use datafusion::execution::RecordBatchStream;
use datafusion::physical_plan::{stream::RecordBatchStreamAdapter, SendableRecordBatchStream};
use datafusion_common::ScalarValue;
use deepsize::DeepSizeOf;
Expand Down Expand Up @@ -271,17 +272,31 @@ impl TrainingSource for UnnestTrainingSource {
chunk_size: u32,
) -> Result<SendableRecordBatchStream> {
let source = self.source.scan_ordered_chunks(chunk_size).await?;
let unnest_schema = unnest_schema(source.schema().as_ref());
let unnest_schema_copy = unnest_schema.clone();
let source = source.try_filter_map(move |batch| {
std::future::ready(Some(unnest_batch(batch, unnest_schema.clone())).transpose())
});

Ok(Box::pin(RecordBatchStreamAdapter::new(
unnest_schema_copy.clone(),
source,
)))
unnest_chunks(source)
}

async fn scan_unordered_chunks(
self: Box<Self>,
chunk_size: u32,
) -> Result<SendableRecordBatchStream> {
let source = self.source.scan_unordered_chunks(chunk_size).await?;
unnest_chunks(source)
}
}

fn unnest_chunks(
source: Pin<Box<dyn RecordBatchStream + Send>>,
) -> Result<SendableRecordBatchStream> {
let unnest_schema = unnest_schema(source.schema().as_ref());
let unnest_schema_copy = unnest_schema.clone();
let source = source.try_filter_map(move |batch| {
std::future::ready(Some(unnest_batch(batch, unnest_schema.clone())).transpose())
});

Ok(Box::pin(RecordBatchStreamAdapter::new(
unnest_schema_copy.clone(),
source,
)))
}

/// Trains a new label list index
Expand Down
7 changes: 7 additions & 0 deletions rust/lance-index/src/scalar/lance_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,13 @@ mod tests {
) -> Result<SendableRecordBatchStream> {
Ok(self.data)
}

async fn scan_unordered_chunks(
self: Box<Self>,
_chunk_size: u32,
) -> Result<SendableRecordBatchStream> {
Ok(self.data)
}
}

async fn train_index(
Expand Down
7 changes: 7 additions & 0 deletions rust/lance/benches/scalar_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,13 @@ impl TrainingSource for BenchmarkDataSource {
) -> Result<SendableRecordBatchStream> {
Ok(reader_to_stream(Box::new(Self::test_data())))
}

async fn scan_unordered_chunks(
self: Box<Self>,
_chunk_size: u32,
) -> Result<SendableRecordBatchStream> {
Ok(reader_to_stream(Box::new(Self::test_data())))
}
}

impl BenchmarkFixture {
Expand Down
31 changes: 26 additions & 5 deletions rust/lance/src/index/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,22 +38,43 @@ impl TrainingSource for TrainingRequest {
async fn scan_ordered_chunks(
self: Box<Self>,
chunk_size: u32,
) -> Result<SendableRecordBatchStream> {
self.scan_chunks(chunk_size, true).await
}

async fn scan_unordered_chunks(
self: Box<Self>,
chunk_size: u32,
) -> Result<SendableRecordBatchStream> {
self.scan_chunks(chunk_size, false).await
}
}

impl TrainingRequest {
async fn scan_chunks(
self: Box<Self>,
chunk_size: u32,
sort: bool,
) -> Result<SendableRecordBatchStream> {
let mut scan = self.dataset.scan();

let ordering = match sort {
true => Some(vec![ColumnOrdering::asc_nulls_first(self.column.clone())]),
false => None,
};

let scan = scan
.with_row_id()
.order_by(Some(vec![ColumnOrdering::asc_nulls_first(
self.column.clone(),
)]))?
.order_by(ordering)?
.project(&[&self.column])?;

let ordered_batches = scan
let batches = scan
.try_into_dfstream(LanceExecutionOptions {
use_spilling: true,
..Default::default()
})
.await?;
Ok(chunk_concat_stream(ordered_batches, chunk_size as usize))
Ok(chunk_concat_stream(batches, chunk_size as usize))
}
}

Expand Down
Loading