Skip to content

Commit

Permalink
perf: coalesce ids before executing take (#2680)
Browse files Browse the repository at this point in the history
Late materialization is a great benefit when executing a highly
selective filter. However, if a filter is highly selective it means that
each input batch will probably only have a few matching rows. The
current implementation executes take for each filtered batch. E.g.
instead of a single call of `take(500, 10000, 300000)` we get three
calls `take(500)`, `take(10000)`, and `take(300000)`. This means:

 * We can't coalesce
 * More CPU overhead (many calls to take_ranges)
 * Very small output batches (user's batch size is not respected)

On cloud storage I see a 10x plus benefit in scan performance.

We have a benchmark for this (EDA search plot 4) which should assist
with preventing regression in the future:
https://bencher.dev/console/projects/weston-lancedb/plots
  • Loading branch information
westonpace committed Aug 13, 2024
1 parent 8cfb6a5 commit 711bad7
Show file tree
Hide file tree
Showing 4 changed files with 204 additions and 125 deletions.
30 changes: 28 additions & 2 deletions python/python/benchmarks/test_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def create_table(num_rows, offset) -> pa.Table:
"vector": vectors,
"filterable": filterable,
"category": categories,
"category_no_index": categories,
"genres": genres,
}
)
Expand All @@ -77,9 +78,11 @@ def create_base_dataset(data_dir: Path) -> lance.LanceDataset:
rows_remaining -= next_batch_length
table = create_table(next_batch_length, offset)
if offset == 0:
dataset = lance.write_dataset(table, tmp_path)
dataset = lance.write_dataset(table, tmp_path, use_legacy_format=False)
else:
dataset = lance.write_dataset(table, tmp_path, mode="append")
dataset = lance.write_dataset(
table, tmp_path, mode="append", use_legacy_format=False
)
offset += next_batch_length

dataset.create_index(
Expand Down Expand Up @@ -479,3 +482,26 @@ def test_label_list_index_prefilter(test_dataset, benchmark, filter: str):
prefilter=True,
filter=filter,
)


@pytest.mark.benchmark(group="late_materialization")
@pytest.mark.parametrize(
"use_index",
(False, True),
ids=["no_index", "with_index"],
)
def test_late_materialization(test_dataset, benchmark, use_index):
column = "category" if use_index else "category_no_index"
print(
test_dataset.scanner(
columns=["vector"],
filter=f"{column} = 0",
batch_size=32,
).explain_plan(True)
)
benchmark(
test_dataset.to_table,
columns=["vector"],
filter=f"{column} = 0",
batch_size=32,
)
12 changes: 12 additions & 0 deletions python/python/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import lance
import lance.fragment
import numpy as np
import pandas as pd
import pandas.testing as tm
import polars as pl
Expand Down Expand Up @@ -1994,3 +1995,14 @@ def test_legacy_dataset(tmp_path: Path):

fragment = list(dataset.get_fragments())[0]
assert "minor_version: 3" not in format_fragment(fragment.metadata, dataset)


def test_late_materialization_batch_size(tmp_path: Path):
table = pa.table({"filter": np.arange(32 * 32), "values": np.arange(32 * 32)})
dataset = lance.write_dataset(
table, tmp_path, data_storage_version="stable", max_rows_per_file=10000
)
for batch in dataset.to_batches(
columns=["values"], filter="filter % 2 == 0", batch_size=32
):
assert batch.num_rows == 32
8 changes: 7 additions & 1 deletion rust/lance-encoding/src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -920,7 +920,13 @@ impl DecodeBatchScheduler {
mut schedule_action: impl FnMut(Result<DecoderMessage>),
) {
let rows_requested = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
trace!("Scheduling ranges {:?} ({} rows)", ranges, rows_requested);
trace!(
"Scheduling {} ranges across {}..{} ({} rows)",
ranges.len(),
ranges.first().unwrap().start,
ranges.last().unwrap().end,
rows_requested
);

let mut context = SchedulerContext::new(io);
let maybe_root_job = self.root_scheduler.schedule_ranges(ranges, filter);
Expand Down
Loading

0 comments on commit 711bad7

Please sign in to comment.