From a220d3bfa932ba7a9413cbecf0bbeeb5d3d18c10 Mon Sep 17 00:00:00 2001 From: dsgibbons Date: Sun, 4 Aug 2024 22:21:29 +0930 Subject: [PATCH 1/5] perf: add scan_unordered_chunks to TrainingSource trait for indices that do not require sorting --- rust/lance-index/src/scalar/bitmap.rs | 2 +- rust/lance-index/src/scalar/btree.rs | 19 +++++++++++++ rust/lance-index/src/scalar/label_list.rs | 7 +++++ rust/lance-index/src/scalar/lance_format.rs | 7 +++++ rust/lance/benches/scalar_index.rs | 7 +++++ rust/lance/src/index/scalar.rs | 31 +++++++++++++++++---- 6 files changed, 67 insertions(+), 6 deletions(-) diff --git a/rust/lance-index/src/scalar/bitmap.rs b/rust/lance-index/src/scalar/bitmap.rs index 2a9b3c957d..123805ce6b 100644 --- a/rust/lance-index/src/scalar/bitmap.rs +++ b/rust/lance-index/src/scalar/bitmap.rs @@ -339,7 +339,7 @@ pub async fn train_bitmap_index( data_source: Box, index_store: &dyn IndexStore, ) -> Result<()> { - let batches_source = data_source.scan_ordered_chunks(4096).await?; + let batches_source = data_source.scan_unordered_chunks(4096).await?; // mapping from item to list of the row ids where it is present let dictionary: HashMap = HashMap::new(); diff --git a/rust/lance-index/src/scalar/btree.rs b/rust/lance-index/src/scalar/btree.rs index 3257b936e4..8d72d3b953 100644 --- a/rust/lance-index/src/scalar/btree.rs +++ b/rust/lance-index/src/scalar/btree.rs @@ -1064,6 +1064,18 @@ pub trait TrainingSource: Send { self: Box, chunk_size: u32, ) -> Result; + + /// Returns a stream of batches + /// + /// Each batch should have chunk_size rows + /// + /// The schema for the batch is slightly flexible. + /// The first column may have any name or type, these are the values to index + /// The second column must be the row ids which must be UInt64Type + async fn scan_unordered_chunks( + self: Box, + chunk_size: u32, + ) -> Result; } /// Train a btree index from a stream of sorted page-size batches of values and row ids @@ -1153,6 +1165,13 @@ impl TrainingSource for BTreeUpdater { )?; Ok(chunk_concat_stream(unchunked, chunk_size as usize)) } + + async fn scan_unordered_chunks( + self: Box, + _chunk_size: u32, + ) -> Result { + unimplemented!() + } } /// A stream that reads the original training data back out of the index diff --git a/rust/lance-index/src/scalar/label_list.rs b/rust/lance-index/src/scalar/label_list.rs index e55de8f9cb..025a9b9b58 100644 --- a/rust/lance-index/src/scalar/label_list.rs +++ b/rust/lance-index/src/scalar/label_list.rs @@ -282,6 +282,13 @@ impl TrainingSource for UnnestTrainingSource { source, ))) } + + async fn scan_unordered_chunks( + self: Box, + _chunk_size: u32, + ) -> Result { + unimplemented!() + } } /// Trains a new label list index diff --git a/rust/lance-index/src/scalar/lance_format.rs b/rust/lance-index/src/scalar/lance_format.rs index 9dade0db40..bdb3a20c4c 100644 --- a/rust/lance-index/src/scalar/lance_format.rs +++ b/rust/lance-index/src/scalar/lance_format.rs @@ -333,6 +333,13 @@ mod tests { ) -> Result { Ok(self.data) } + + async fn scan_unordered_chunks( + self: Box, + _chunk_size: u32, + ) -> Result { + Ok(self.data) + } } async fn train_index( diff --git a/rust/lance/benches/scalar_index.rs b/rust/lance/benches/scalar_index.rs index 63949396fd..9bdd734049 100644 --- a/rust/lance/benches/scalar_index.rs +++ b/rust/lance/benches/scalar_index.rs @@ -50,6 +50,13 @@ impl TrainingSource for BenchmarkDataSource { ) -> Result { Ok(reader_to_stream(Box::new(Self::test_data()))) } + + async fn scan_unordered_chunks( + self: Box, + _chunk_size: u32, + ) -> Result { + Ok(reader_to_stream(Box::new(Self::test_data()))) + } } impl BenchmarkFixture { diff --git a/rust/lance/src/index/scalar.rs b/rust/lance/src/index/scalar.rs index da5281a76a..9a2240e002 100644 --- a/rust/lance/src/index/scalar.rs +++ b/rust/lance/src/index/scalar.rs @@ -38,22 +38,43 @@ impl TrainingSource for TrainingRequest { async fn scan_ordered_chunks( self: Box, chunk_size: u32, + ) -> Result { + self._scan_chunks(chunk_size, true).await + } + + async fn scan_unordered_chunks( + self: Box, + chunk_size: u32, + ) -> Result { + self._scan_chunks(chunk_size, false).await + } +} + +impl TrainingRequest { + async fn _scan_chunks( + self: Box, + chunk_size: u32, + order: bool, ) -> Result { let mut scan = self.dataset.scan(); + + let ordering = match order { + true => Some(vec![ColumnOrdering::asc_nulls_first(self.column.clone())]), + false => None, + }; + let scan = scan .with_row_id() - .order_by(Some(vec![ColumnOrdering::asc_nulls_first( - self.column.clone(), - )]))? + .order_by(ordering)? .project(&[&self.column])?; - let ordered_batches = scan + let batches = scan .try_into_dfstream(LanceExecutionOptions { use_spilling: true, ..Default::default() }) .await?; - Ok(chunk_concat_stream(ordered_batches, chunk_size as usize)) + Ok(chunk_concat_stream(batches, chunk_size as usize)) } } From 351072a95242409a90155ff3551aa50de9a87a92 Mon Sep 17 00:00:00 2001 From: dsgibbons Date: Sun, 4 Aug 2024 22:25:48 +0930 Subject: [PATCH 2/5] chore: rename _scan_chunks order argument --- rust/lance/src/index/scalar.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rust/lance/src/index/scalar.rs b/rust/lance/src/index/scalar.rs index 9a2240e002..87cf368b6a 100644 --- a/rust/lance/src/index/scalar.rs +++ b/rust/lance/src/index/scalar.rs @@ -54,11 +54,11 @@ impl TrainingRequest { async fn _scan_chunks( self: Box, chunk_size: u32, - order: bool, + sort: bool, ) -> Result { let mut scan = self.dataset.scan(); - let ordering = match order { + let ordering = match sort { true => Some(vec![ColumnOrdering::asc_nulls_first(self.column.clone())]), false => None, }; From 41a09e26798db9b3f641cdc20ad223e675381c95 Mon Sep 17 00:00:00 2001 From: dsgibbons Date: Sun, 4 Aug 2024 22:41:50 +0930 Subject: [PATCH 3/5] chore: implement scan_unordered_chunks for UnnestTrainingSource --- rust/lance-index/src/scalar/label_list.rs | 34 ++++++++++++++--------- 1 file changed, 21 insertions(+), 13 deletions(-) diff --git a/rust/lance-index/src/scalar/label_list.rs b/rust/lance-index/src/scalar/label_list.rs index 025a9b9b58..c1869b1c0f 100644 --- a/rust/lance-index/src/scalar/label_list.rs +++ b/rust/lance-index/src/scalar/label_list.rs @@ -1,12 +1,13 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors -use std::{any::Any, collections::HashMap, fmt::Debug, sync::Arc}; +use std::{any::Any, collections::HashMap, fmt::Debug, pin::Pin, sync::Arc}; use arrow::array::AsArray; use arrow_array::{Array, RecordBatch, UInt64Array}; use arrow_schema::{DataType, Field, Fields, Schema, SchemaRef}; use async_trait::async_trait; +use datafusion::execution::RecordBatchStream; use datafusion::physical_plan::{stream::RecordBatchStreamAdapter, SendableRecordBatchStream}; use datafusion_common::ScalarValue; use deepsize::DeepSizeOf; @@ -271,26 +272,33 @@ impl TrainingSource for UnnestTrainingSource { chunk_size: u32, ) -> Result { let source = self.source.scan_ordered_chunks(chunk_size).await?; - let unnest_schema = unnest_schema(source.schema().as_ref()); - let unnest_schema_copy = unnest_schema.clone(); - let source = source.try_filter_map(move |batch| { - std::future::ready(Some(unnest_batch(batch, unnest_schema.clone())).transpose()) - }); - - Ok(Box::pin(RecordBatchStreamAdapter::new( - unnest_schema_copy.clone(), - source, - ))) + _scan_chunks(source) } async fn scan_unordered_chunks( self: Box, - _chunk_size: u32, + chunk_size: u32, ) -> Result { - unimplemented!() + let source = self.source.scan_unordered_chunks(chunk_size).await?; + _scan_chunks(source) } } +fn _scan_chunks( + source: Pin>, +) -> Result { + let unnest_schema = unnest_schema(source.schema().as_ref()); + let unnest_schema_copy = unnest_schema.clone(); + let source = source.try_filter_map(move |batch| { + std::future::ready(Some(unnest_batch(batch, unnest_schema.clone())).transpose()) + }); + + Ok(Box::pin(RecordBatchStreamAdapter::new( + unnest_schema_copy.clone(), + source, + ))) +} + /// Trains a new label list index pub async fn train_label_list_index( data_source: Box, From 04a066674c8d26d4935653a2e31d3f50c5325c8c Mon Sep 17 00:00:00 2001 From: dsgibbons Date: Sun, 4 Aug 2024 22:47:06 +0930 Subject: [PATCH 4/5] chore: remove unnecessary underscore prefix for scan_chunks --- rust/lance-index/src/scalar/label_list.rs | 6 +++--- rust/lance/src/index/scalar.rs | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/rust/lance-index/src/scalar/label_list.rs b/rust/lance-index/src/scalar/label_list.rs index c1869b1c0f..bc19182921 100644 --- a/rust/lance-index/src/scalar/label_list.rs +++ b/rust/lance-index/src/scalar/label_list.rs @@ -272,7 +272,7 @@ impl TrainingSource for UnnestTrainingSource { chunk_size: u32, ) -> Result { let source = self.source.scan_ordered_chunks(chunk_size).await?; - _scan_chunks(source) + scan_chunks(source) } async fn scan_unordered_chunks( @@ -280,11 +280,11 @@ impl TrainingSource for UnnestTrainingSource { chunk_size: u32, ) -> Result { let source = self.source.scan_unordered_chunks(chunk_size).await?; - _scan_chunks(source) + scan_chunks(source) } } -fn _scan_chunks( +fn scan_chunks( source: Pin>, ) -> Result { let unnest_schema = unnest_schema(source.schema().as_ref()); diff --git a/rust/lance/src/index/scalar.rs b/rust/lance/src/index/scalar.rs index 87cf368b6a..dab9afe1a1 100644 --- a/rust/lance/src/index/scalar.rs +++ b/rust/lance/src/index/scalar.rs @@ -39,19 +39,19 @@ impl TrainingSource for TrainingRequest { self: Box, chunk_size: u32, ) -> Result { - self._scan_chunks(chunk_size, true).await + self.scan_chunks(chunk_size, true).await } async fn scan_unordered_chunks( self: Box, chunk_size: u32, ) -> Result { - self._scan_chunks(chunk_size, false).await + self.scan_chunks(chunk_size, false).await } } impl TrainingRequest { - async fn _scan_chunks( + async fn scan_chunks( self: Box, chunk_size: u32, sort: bool, From 5105a0105ff2c0b9197fbc73505cd912ab2c3ad3 Mon Sep 17 00:00:00 2001 From: dsgibbons Date: Tue, 6 Aug 2024 07:14:29 +0930 Subject: [PATCH 5/5] chore: address renaming feedback --- rust/lance-index/src/scalar/btree.rs | 1 + rust/lance-index/src/scalar/label_list.rs | 6 +++--- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/rust/lance-index/src/scalar/btree.rs b/rust/lance-index/src/scalar/btree.rs index 8d72d3b953..e01718344f 100644 --- a/rust/lance-index/src/scalar/btree.rs +++ b/rust/lance-index/src/scalar/btree.rs @@ -1170,6 +1170,7 @@ impl TrainingSource for BTreeUpdater { self: Box, _chunk_size: u32, ) -> Result { + // BTree indices will never use unordered scans unimplemented!() } } diff --git a/rust/lance-index/src/scalar/label_list.rs b/rust/lance-index/src/scalar/label_list.rs index bc19182921..2b7a33b5fe 100644 --- a/rust/lance-index/src/scalar/label_list.rs +++ b/rust/lance-index/src/scalar/label_list.rs @@ -272,7 +272,7 @@ impl TrainingSource for UnnestTrainingSource { chunk_size: u32, ) -> Result { let source = self.source.scan_ordered_chunks(chunk_size).await?; - scan_chunks(source) + unnest_chunks(source) } async fn scan_unordered_chunks( @@ -280,11 +280,11 @@ impl TrainingSource for UnnestTrainingSource { chunk_size: u32, ) -> Result { let source = self.source.scan_unordered_chunks(chunk_size).await?; - scan_chunks(source) + unnest_chunks(source) } } -fn scan_chunks( +fn unnest_chunks( source: Pin>, ) -> Result { let unnest_schema = unnest_schema(source.schema().as_ref());