Skip to content

Commit

Permalink
perf: improve v2 scan performance (#2604)
Browse files Browse the repository at this point in the history
This PR makes a number of changes:

* The default I/O parallelism is now based on the type of object store
(8 for local, 64 for cloud)
* Individual fragment scans that are part of a larger dataset scan now
share the same ScanScheduler (to avoid over-scheduling I/O)
* The flatten / buffered style of readahead in the v1 scan did not yield
good I/O parallelism in v2 (a fragment wouldn't start scanning until the
previous fragment had finished). We now use a different buffered /
flatten / buffered approach in v2.
* The above bullet point required changing the scheduler's try_open to
be a synchronous method. This changed the decoder middleware to be
synchronous. This temporarily breaks v2 zone maps but they weren't wired
in fully anyways. The correct fix will be to add an initialize step
between open and the first read. We will do that in a future PR. This
initialize step can be used by dictionary & FSST as well to pre-load
metadata buffers.
  • Loading branch information
westonpace committed Jul 17, 2024
1 parent 30af1d8 commit 04d9cc5
Show file tree
Hide file tree
Showing 24 changed files with 369 additions and 186 deletions.
5 changes: 1 addition & 4 deletions python/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,10 +265,7 @@ pub struct LanceFileReader {
impl LanceFileReader {
async fn open(uri_or_path: String) -> PyResult<Self> {
let (object_store, path) = object_store_from_uri_or_path(uri_or_path).await?;
let io_parallelism = std::env::var("IO_THREADS")
.map(|val| val.parse::<u32>().unwrap_or(8))
.unwrap_or(8);
let scheduler = ScanScheduler::new(Arc::new(object_store), io_parallelism);
let scheduler = ScanScheduler::new(Arc::new(object_store));
let file = scheduler.open_file(&path).await.infer_error()?;
let inner = FileReader::try_open(file, None, DecoderMiddlewareChain::default())
.await
Expand Down
2 changes: 1 addition & 1 deletion rust/lance-core/src/utils/tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use futures::{Future, FutureExt};
use tokio::runtime::{Builder, Runtime};
use tracing::Span;

fn get_num_compute_intensive_cpus() -> usize {
pub fn get_num_compute_intensive_cpus() -> usize {
let cpus = num_cpus::get();

if cpus <= *IO_CORE_RESERVATION {
Expand Down
59 changes: 27 additions & 32 deletions rust/lance-encoding-datafusion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ use std::{
};

use arrow_schema::DataType;
use futures::future::BoxFuture;
use futures::FutureExt;
use lance_core::{
datatypes::{Field, Schema},
Result,
Expand Down Expand Up @@ -95,7 +93,7 @@ impl FieldDecoderStrategy for LanceDfFieldDecoderStrategy {
chain: DecoderMiddlewareChainCursor<'a>,
) -> Result<(
DecoderMiddlewareChainCursor<'a>,
BoxFuture<'static, Result<Arc<dyn FieldScheduler>>>,
Result<Arc<dyn FieldScheduler>>,
)> {
let is_root = self.initialize();

Expand All @@ -119,39 +117,36 @@ impl FieldDecoderStrategy for LanceDfFieldDecoderStrategy {
None
};
let schema = self.schema.clone();
let io = chain.io().clone();
let _io = chain.io().clone();

let scheduler_fut = async move {
let next = next.await?;
if is_root {
let state = state.unwrap();
let rows_per_map = state.rows_per_map;
let zone_map_buffers = state.zone_map_buffers;
let num_rows = next.num_rows();
if rows_per_map.is_none() {
// No columns had any pushdown info
Ok(next)
} else {
let mut scheduler = ZoneMapsFieldScheduler::new(
next,
schema,
zone_map_buffers,
rows_per_map.unwrap(),
num_rows,
);
// Load all the zone maps from disk
// TODO: it would be slightly more efficient to do this
// later when we know what columns are actually used
// for filtering.
scheduler.initialize(io.as_ref()).await?;
Ok(Arc::new(scheduler) as Arc<dyn FieldScheduler>)
}
let next = next?;
if is_root {
let state = state.unwrap();
let rows_per_map = state.rows_per_map;
let zone_map_buffers = state.zone_map_buffers;
let num_rows = next.num_rows();
if rows_per_map.is_none() {
// No columns had any pushdown info
Ok((chain, Ok(next)))
} else {
Ok(next)
let mut _scheduler = ZoneMapsFieldScheduler::new(
next,
schema,
zone_map_buffers,
rows_per_map.unwrap(),
num_rows,
);
// Load all the zone maps from disk
// TODO: it would be slightly more efficient to do this
// later when we know what columns are actually used
// for filtering.
// scheduler.initialize(io.as_ref()).await?;
// Ok(Arc::new(scheduler) as Arc<dyn FieldScheduler>)
todo!()
}
} else {
Ok((chain, Ok(next)))
}
.boxed();
Ok((chain, scheduler_fut))
}
}

Expand Down
2 changes: 2 additions & 0 deletions rust/lance-encoding-datafusion/src/zone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,8 @@ mod tests {
};

#[test_log::test(tokio::test)]
#[ignore] // Stats currently disabled until https://github.com/lancedb/lance/issues/2605
// is addressed
async fn test_basic_stats() {
let data = lance_datagen::gen()
.col("0", lance_datagen::array::step::<Int32Type>())
Expand Down
73 changes: 32 additions & 41 deletions rust/lance-encoding/src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,8 @@ use arrow_array::{ArrayRef, RecordBatch};
use arrow_schema::{DataType, Field as ArrowField, Fields, Schema as ArrowSchema};
use bytes::{Bytes, BytesMut};
use futures::future::BoxFuture;
use futures::stream::{BoxStream, FuturesOrdered};
use futures::{FutureExt, StreamExt, TryStreamExt};
use futures::stream::BoxStream;
use futures::{FutureExt, StreamExt};
use lance_arrow::DataTypeExt;
use lance_core::datatypes::{Field, Schema};
use log::trace;
Expand Down Expand Up @@ -368,7 +368,7 @@ pub struct DecoderMiddlewareChainCursor<'a> {

pub type ChosenFieldScheduler<'a> = (
DecoderMiddlewareChainCursor<'a>,
BoxFuture<'static, Result<Arc<dyn FieldScheduler>>>,
Result<Arc<dyn FieldScheduler>>,
);

impl<'a> DecoderMiddlewareChainCursor<'a> {
Expand Down Expand Up @@ -611,7 +611,7 @@ impl FieldDecoderStrategy for CoreFieldDecoderStrategy {
&primitive_col,
buffers,
)?;
return Ok((chain, std::future::ready(Ok(scheduler)).boxed()));
return Ok((chain, Ok(scheduler)));
}
match &data_type {
DataType::FixedSizeList(inner, _dimension) => {
Expand All @@ -625,7 +625,7 @@ impl FieldDecoderStrategy for CoreFieldDecoderStrategy {
&primitive_col,
buffers,
)?;
return Ok((chain, std::future::ready(Ok(scheduler)).boxed()));
Ok((chain, Ok(scheduler)))
} else {
todo!()
}
Expand All @@ -644,6 +644,7 @@ impl FieldDecoderStrategy for CoreFieldDecoderStrategy {
column_infos,
buffers,
)?;
let items_scheduler = items_scheduler?;

let (inner_infos, null_offset_adjustments): (Vec<_>, Vec<_>) = offsets_column
.page_infos
Expand Down Expand Up @@ -684,48 +685,39 @@ impl FieldDecoderStrategy for CoreFieldDecoderStrategy {
DataType::Int64
};
let items_type = items_field.data_type().clone();
let list_scheduler_fut = async move {
let items_scheduler = items_scheduler.await?;
Ok(Arc::new(ListFieldScheduler::new(
inner,
items_scheduler,
item_field_name.clone(),
items_type,
offset_type,
null_offset_adjustments,
)) as Arc<dyn FieldScheduler>)
}
.boxed();
Ok((chain, list_scheduler_fut))
let list_scheduler = Ok(Arc::new(ListFieldScheduler::new(
inner,
items_scheduler,
item_field_name.clone(),
items_type,
offset_type,
null_offset_adjustments,
)) as Arc<dyn FieldScheduler>);
Ok((chain, list_scheduler))
}
DataType::Struct(fields) => {
let column_info = column_infos.pop_front().unwrap();
Self::check_simple_struct(&column_info, chain.current_path()).unwrap();
let (chain, child_schedulers) = field.children.iter().enumerate().try_fold(
(chain, FuturesOrdered::new()),
|(chain, mut fields), (field_idx, field)| {
let (chain, field) =
chain.new_child(field_idx as u32, field, column_infos, buffers)?;
fields.push_back(field);
Result::Ok((chain, fields))
},
)?;
let mut child_schedulers = Vec::with_capacity(field.children.len());
let mut chain = chain;
for (i, field) in field.children.iter().enumerate() {
let (next_chain, field_scheduler) =
chain.new_child(i as u32, field, column_infos, buffers)?;
child_schedulers.push(field_scheduler?);
chain = next_chain;
}

let fields = fields.clone();
let struct_fut = async move {
let child_schedulers = child_schedulers.try_collect::<Vec<_>>().await?;
Ok(
Arc::new(SimpleStructScheduler::new(child_schedulers, fields))
as Arc<dyn FieldScheduler>,
)
}
.boxed();
let struct_scheduler = Ok(Arc::new(SimpleStructScheduler::new(
child_schedulers,
fields,
)) as Arc<dyn FieldScheduler>);

// For now, we don't record nullability for structs. As a result, there is always
// only one "page" of struct data. In the future, this will change. A null-aware
// struct scheduler will need to first calculate how many rows are in the struct page
// and then find the child pages that overlap. This should be doable.
Ok((chain, struct_fut))
Ok((chain, struct_scheduler))
}
// TODO: Still need support for dictionary / RLE
_ => chain.next(field, column_infos, buffers),
Expand Down Expand Up @@ -763,7 +755,7 @@ fn root_column(num_rows: u64) -> ColumnInfo {
impl DecodeBatchScheduler {
/// Creates a new decode scheduler with the expected schema and the column
/// metadata of the file.
pub async fn try_new<'a>(
pub fn try_new<'a>(
schema: &'a Schema,
column_infos: &[Arc<ColumnInfo>],
file_buffer_positions_and_sizes: &'a Vec<(u64, u64)>,
Expand All @@ -781,11 +773,11 @@ impl DecodeBatchScheduler {
columns.extend(column_infos.iter().map(|col| col.as_ref().clone()));
let root_type = DataType::Struct(root_fields.clone());
let root_field = Field::try_from(&ArrowField::new("root", root_type, false))?;
let (_, root_scheduler_fut) =
let (_, root_scheduler) =
decoder_strategy
.cursor(io)
.start(&root_field, &mut columns, buffers)?;
let root_scheduler = root_scheduler_fut.await?;
let root_scheduler = root_scheduler?;
Ok(Self {
root_scheduler,
root_fields,
Expand Down Expand Up @@ -1427,8 +1419,7 @@ pub async fn decode_batch(
batch.num_rows,
field_decoder_strategy,
&io_scheduler,
)
.await?;
)?;
let (tx, rx) = unbounded_channel();
decode_scheduler.schedule_range(0..batch.num_rows, filter, tx, io_scheduler);
#[allow(clippy::single_range_in_vec_init)]
Expand Down
1 change: 0 additions & 1 deletion rust/lance-encoding/src/testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ async fn test_decode(
&DecoderMiddlewareChain::default(),
io,
)
.await
.unwrap();

let (tx, rx) = mpsc::unbounded_channel();
Expand Down
2 changes: 1 addition & 1 deletion rust/lance-file/benches/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ fn bench_reader(c: &mut Criterion) {
let file_path = &file_path;
let data = &data;
rt.block_on(async move {
let store_scheduler = ScanScheduler::new(Arc::new(object_store.clone()), 8);
let store_scheduler = ScanScheduler::new(Arc::new(object_store.clone()));
let scheduler = store_scheduler.open_file(file_path).await.unwrap();
let reader = FileReader::try_open(
scheduler.clone(),
Expand Down
Loading

0 comments on commit 04d9cc5

Please sign in to comment.