From 08cd61130fdac89efc030d3b534dc5fe591da8ff Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Mon, 5 Aug 2024 07:05:15 -0700 Subject: [PATCH] Fix tests that were expecting old behavior --- rust/lance/src/dataset/scanner.rs | 275 +++++++++++++++++------------- 1 file changed, 154 insertions(+), 121 deletions(-) diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index cf1ccc5dea..ce6a5215fd 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -1915,10 +1915,18 @@ mod test { if use_filter { builder.filter("i IS NOT NULL").unwrap(); } + let expected_lens = if use_filter { + // If there is a filter then a late materialization pass is added + // and that uses coalesce batches which can yield slightly larger + // than requested batch sizes. + dbg!(vec![8, 10, 10, 10, 10, 10, 10]) + } else { + dbg!(vec![8, 2, 8, 2, 8, 2, 8, 2, 8, 2]) + }; let mut stream = builder.try_into_stream().await.unwrap(); - for expected_len in [8, 2, 8, 2, 8, 2, 8, 2, 8, 2] { + for expected_len in expected_lens { assert_eq!( - stream.next().await.unwrap().unwrap().num_rows(), + dbg!(stream.next().await.unwrap().unwrap().num_rows()), expected_len as usize ); } @@ -4080,8 +4088,9 @@ mod test { }, "ProjectionExec: expr=[s@2 as s] Take: columns=\"i, _rowid, s\" - FilterExec: i@0 > 10 AND i@0 < 20 - LanceScan: uri..., projection=[i], row_id=true, row_addr=false, ordered=true", + CoalesceBatchesExec: target_batch_size=8192 + FilterExec: i@0 > 10 AND i@0 < 20 + LanceScan: uri..., projection=[i], row_id=true, row_addr=false, ordered=true", ) .await?; @@ -4100,10 +4109,11 @@ mod test { |scan| scan.nearest("vec", &q, 5), "ProjectionExec: expr=[i@3 as i, s@4 as s, vec@0 as vec, _distance@2 as _distance] Take: columns=\"vec, _rowid, _distance, i, s\" - FilterExec: _distance@2 IS NOT NULL - SortExec: TopK(fetch=5), expr=... - KNNVectorDistance: metric=l2 - LanceScan: uri=..., projection=[vec], row_id=true, row_addr=false, ordered=false", + CoalesceBatchesExec: target_batch_size=8192 + FilterExec: _distance@2 IS NOT NULL + SortExec: TopK(fetch=5), expr=... + KNNVectorDistance: metric=l2 + LanceScan: uri=..., projection=[vec], row_id=true, row_addr=false, ordered=false", ) .await?; @@ -4115,9 +4125,10 @@ mod test { |scan| scan.nearest("vec", &q, 42), "ProjectionExec: expr=[i@2 as i, s@3 as s, vec@4 as vec, _distance@0 as _distance] Take: columns=\"_distance, _rowid, i, s, vec\" - SortExec: TopK(fetch=42), expr=... - ANNSubIndex: name=..., k=42, deltas=1 - ANNIvfPartition: uuid=..., nprobes=1, deltas=1", + CoalesceBatchesExec: target_batch_size=8192 + SortExec: TopK(fetch=42), expr=... + ANNSubIndex: name=..., k=42, deltas=1 + ANNIvfPartition: uuid=..., nprobes=1, deltas=1", ) .await?; @@ -4126,13 +4137,15 @@ mod test { |scan| Ok(scan.nearest("vec", &q, 10)?.refine(4)), "ProjectionExec: expr=[i@3 as i, s@4 as s, vec@1 as vec, _distance@2 as _distance] Take: columns=\"_rowid, vec, _distance, i, s\" - FilterExec: _distance@... IS NOT NULL - SortExec: TopK(fetch=10), expr=... - KNNVectorDistance: metric=l2 - Take: columns=\"_distance, _rowid, vec\" - SortExec: TopK(fetch=40), expr=... - ANNSubIndex: name=..., k=40, deltas=1 - ANNIvfPartition: uuid=..., nprobes=1, deltas=1", + CoalesceBatchesExec: target_batch_size=8192 + FilterExec: _distance@... IS NOT NULL + SortExec: TopK(fetch=10), expr=... + KNNVectorDistance: metric=l2 + Take: columns=\"_distance, _rowid, vec\" + CoalesceBatchesExec: target_batch_size=8192 + SortExec: TopK(fetch=40), expr=... + ANNSubIndex: name=..., k=40, deltas=1 + ANNIvfPartition: uuid=..., nprobes=1, deltas=1", ) .await?; @@ -4142,10 +4155,11 @@ mod test { |scan| Ok(scan.nearest("vec", &q, 13)?.use_index(false)), "ProjectionExec: expr=[i@3 as i, s@4 as s, vec@0 as vec, _distance@2 as _distance] Take: columns=\"vec, _rowid, _distance, i, s\" - FilterExec: _distance@... IS NOT NULL - SortExec: TopK(fetch=13), expr=... - KNNVectorDistance: metric=l2 - LanceScan: uri=..., projection=[vec], row_id=true, row_addr=false, ordered=false", + CoalesceBatchesExec: target_batch_size=8192 + FilterExec: _distance@... IS NOT NULL + SortExec: TopK(fetch=13), expr=... + KNNVectorDistance: metric=l2 + LanceScan: uri=..., projection=[vec], row_id=true, row_addr=false, ordered=false", ) .await?; @@ -4161,11 +4175,13 @@ mod test { }, "ProjectionExec: expr=[s@3 as s, vec@4 as vec, _distance@0 as _distance, _rowid@1 as _rowid] Take: columns=\"_distance, _rowid, i, s, vec\" - FilterExec: i@2 > 10 - Take: columns=\"_distance, _rowid, i\" - SortExec: TopK(fetch=17), expr=... - ANNSubIndex: name=..., k=17, deltas=1 - ANNIvfPartition: uuid=..., nprobes=1, deltas=1", + CoalesceBatchesExec: target_batch_size=8192 + FilterExec: i@2 > 10 + Take: columns=\"_distance, _rowid, i\" + CoalesceBatchesExec: target_batch_size=8192 + SortExec: TopK(fetch=17), expr=... + ANNSubIndex: name=..., k=17, deltas=1 + ANNIvfPartition: uuid=..., nprobes=1, deltas=1", ) .await?; @@ -4180,11 +4196,12 @@ mod test { }, "ProjectionExec: expr=[i@2 as i, s@3 as s, vec@4 as vec, _distance@0 as _distance] Take: columns=\"_distance, _rowid, i, s, vec\" - SortExec: TopK(fetch=17), expr=... - ANNSubIndex: name=..., k=17, deltas=1 - ANNIvfPartition: uuid=..., nprobes=1, deltas=1 - FilterExec: i@0 > 10 - LanceScan: uri=..., projection=[i], row_id=true, row_addr=false, ordered=false", + CoalesceBatchesExec: target_batch_size=8192 + SortExec: TopK(fetch=17), expr=... + ANNSubIndex: name=..., k=17, deltas=1 + ANNIvfPartition: uuid=..., nprobes=1, deltas=1 + FilterExec: i@0 > 10 + LanceScan: uri=..., projection=[i], row_id=true, row_addr=false, ordered=false", ) .await?; @@ -4196,20 +4213,22 @@ mod test { // by doing it as part of the last Take. This would likely have minimal impact though. "ProjectionExec: expr=[i@3 as i, s@4 as s, vec@1 as vec, _distance@2 as _distance] Take: columns=\"_rowid, vec, _distance, i, s\" - FilterExec: _distance@... IS NOT NULL - SortExec: TopK(fetch=6), expr=... - KNNVectorDistance: metric=l2 - RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=2 - UnionExec - ProjectionExec: expr=[_distance@2 as _distance, _rowid@1 as _rowid, vec@0 as vec] - FilterExec: _distance@... IS NOT NULL - SortExec: TopK(fetch=6), expr=... - KNNVectorDistance: metric=l2 - LanceScan: uri=..., projection=[vec], row_id=true, row_addr=false, ordered=false - Take: columns=\"_distance, _rowid, vec\" - SortExec: TopK(fetch=6), expr=... - ANNSubIndex: name=..., k=6, deltas=1 - ANNIvfPartition: uuid=..., nprobes=1, deltas=1", + CoalesceBatchesExec: target_batch_size=8192 + FilterExec: _distance@... IS NOT NULL + SortExec: TopK(fetch=6), expr=... + KNNVectorDistance: metric=l2 + RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=2 + UnionExec + ProjectionExec: expr=[_distance@2 as _distance, _rowid@1 as _rowid, vec@0 as vec] + FilterExec: _distance@... IS NOT NULL + SortExec: TopK(fetch=6), expr=... + KNNVectorDistance: metric=l2 + LanceScan: uri=..., projection=[vec], row_id=true, row_addr=false, ordered=false + Take: columns=\"_distance, _rowid, vec\" + CoalesceBatchesExec: target_batch_size=8192 + SortExec: TopK(fetch=6), expr=... + ANNSubIndex: name=..., k=6, deltas=1 + ANNIvfPartition: uuid=..., nprobes=1, deltas=1", ) .await?; @@ -4219,22 +4238,25 @@ mod test { |scan| scan.nearest("vec", &q, 15)?.filter("i > 10"), "ProjectionExec: expr=[i@3 as i, s@4 as s, vec@1 as vec, _distance@2 as _distance] Take: columns=\"_rowid, vec, _distance, i, s\" - FilterExec: i@3 > 10 - Take: columns=\"_rowid, vec, _distance, i\" - FilterExec: _distance@... IS NOT NULL - SortExec: TopK(fetch=15), expr=... - KNNVectorDistance: metric=l2 - RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=2 - UnionExec - ProjectionExec: expr=[_distance@2 as _distance, _rowid@1 as _rowid, vec@0 as vec] - FilterExec: _distance@... IS NOT NULL - SortExec: TopK(fetch=15), expr=... - KNNVectorDistance: metric=l2 - LanceScan: uri=..., projection=[vec], row_id=true, row_addr=false, ordered=false - Take: columns=\"_distance, _rowid, vec\" - SortExec: TopK(fetch=15), expr=... - ANNSubIndex: name=..., k=15, deltas=1 - ANNIvfPartition: uuid=..., nprobes=1, deltas=1", + CoalesceBatchesExec: target_batch_size=8192 + FilterExec: i@3 > 10 + Take: columns=\"_rowid, vec, _distance, i\" + CoalesceBatchesExec: target_batch_size=8192 + FilterExec: _distance@... IS NOT NULL + SortExec: TopK(fetch=15), expr=... + KNNVectorDistance: metric=l2 + RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=2 + UnionExec + ProjectionExec: expr=[_distance@2 as _distance, _rowid@1 as _rowid, vec@0 as vec] + FilterExec: _distance@... IS NOT NULL + SortExec: TopK(fetch=15), expr=... + KNNVectorDistance: metric=l2 + LanceScan: uri=..., projection=[vec], row_id=true, row_addr=false, ordered=false + Take: columns=\"_distance, _rowid, vec\" + CoalesceBatchesExec: target_batch_size=8192 + SortExec: TopK(fetch=15), expr=... + ANNSubIndex: name=..., k=15, deltas=1 + ANNIvfPartition: uuid=..., nprobes=1, deltas=1", ) .await?; @@ -4251,23 +4273,25 @@ mod test { // only to be taken again later. We should fix this. "ProjectionExec: expr=[i@3 as i, s@4 as s, vec@1 as vec, _distance@2 as _distance] Take: columns=\"_rowid, vec, _distance, i, s\" - FilterExec: _distance@... IS NOT NULL - SortExec: TopK(fetch=5), expr=... - KNNVectorDistance: metric=l2 - RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=2 - UnionExec - ProjectionExec: expr=[_distance@3 as _distance, _rowid@2 as _rowid, vec@0 as vec] - FilterExec: _distance@... IS NOT NULL - SortExec: TopK(fetch=5), expr=... - KNNVectorDistance: metric=l2 - FilterExec: i@1 > 10 - LanceScan: uri=..., projection=[vec, i], row_id=true, row_addr=false, ordered=false - Take: columns=\"_distance, _rowid, vec\" - SortExec: TopK(fetch=5), expr=... - ANNSubIndex: name=..., k=5, deltas=1 - ANNIvfPartition: uuid=..., nprobes=1, deltas=1 - FilterExec: i@0 > 10 - LanceScan: uri=..., projection=[i], row_id=true, row_addr=false, ordered=false", + CoalesceBatchesExec: target_batch_size=8192 + FilterExec: _distance@... IS NOT NULL + SortExec: TopK(fetch=5), expr=... + KNNVectorDistance: metric=l2 + RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=2 + UnionExec + ProjectionExec: expr=[_distance@3 as _distance, _rowid@2 as _rowid, vec@0 as vec] + FilterExec: _distance@... IS NOT NULL + SortExec: TopK(fetch=5), expr=... + KNNVectorDistance: metric=l2 + FilterExec: i@1 > 10 + LanceScan: uri=..., projection=[vec, i], row_id=true, row_addr=false, ordered=false + Take: columns=\"_distance, _rowid, vec\" + CoalesceBatchesExec: target_batch_size=8192 + SortExec: TopK(fetch=5), expr=... + ANNSubIndex: name=..., k=5, deltas=1 + ANNIvfPartition: uuid=..., nprobes=1, deltas=1 + FilterExec: i@0 > 10 + LanceScan: uri=..., projection=[i], row_id=true, row_addr=false, ordered=false", ) .await?; @@ -4287,10 +4311,11 @@ mod test { }, "ProjectionExec: expr=[i@2 as i, s@3 as s, vec@4 as vec, _distance@0 as _distance] Take: columns=\"_distance, _rowid, i, s, vec\" - SortExec: TopK(fetch=5), expr=... - ANNSubIndex: name=..., k=5, deltas=1 - ANNIvfPartition: uuid=..., nprobes=1, deltas=1 - ScalarIndexQuery: query=i > 10", + CoalesceBatchesExec: target_batch_size=8192 + SortExec: TopK(fetch=5), expr=... + ANNSubIndex: name=..., k=5, deltas=1 + ANNIvfPartition: uuid=..., nprobes=1, deltas=1 + ScalarIndexQuery: query=i > 10", ) .await?; @@ -4306,22 +4331,24 @@ mod test { }, "ProjectionExec: expr=[i@3 as i, s@4 as s, vec@1 as vec, _distance@2 as _distance] Take: columns=\"_rowid, vec, _distance, i, s\" - FilterExec: _distance@... IS NOT NULL - SortExec: TopK(fetch=8), expr=... - KNNVectorDistance: metric=l2 - RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=2 - UnionExec - ProjectionExec: expr=[_distance@3 as _distance, _rowid@2 as _rowid, vec@0 as vec] - FilterExec: _distance@... IS NOT NULL - SortExec: TopK(fetch=8), expr=... - KNNVectorDistance: metric=l2 - FilterExec: i@1 > 10 - LanceScan: uri=..., projection=[vec, i], row_id=true, row_addr=false, ordered=false - Take: columns=\"_distance, _rowid, vec\" - SortExec: TopK(fetch=8), expr=... - ANNSubIndex: name=..., k=8, deltas=1 - ANNIvfPartition: uuid=..., nprobes=1, deltas=1 - ScalarIndexQuery: query=i > 10", + CoalesceBatchesExec: target_batch_size=8192 + FilterExec: _distance@... IS NOT NULL + SortExec: TopK(fetch=8), expr=... + KNNVectorDistance: metric=l2 + RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=2 + UnionExec + ProjectionExec: expr=[_distance@3 as _distance, _rowid@2 as _rowid, vec@0 as vec] + FilterExec: _distance@... IS NOT NULL + SortExec: TopK(fetch=8), expr=... + KNNVectorDistance: metric=l2 + FilterExec: i@1 > 10 + LanceScan: uri=..., projection=[vec, i], row_id=true, row_addr=false, ordered=false + Take: columns=\"_distance, _rowid, vec\" + CoalesceBatchesExec: target_batch_size=8192 + SortExec: TopK(fetch=8), expr=... + ANNSubIndex: name=..., k=8, deltas=1 + ANNIvfPartition: uuid=..., nprobes=1, deltas=1 + ScalarIndexQuery: query=i > 10", ) .await?; @@ -4337,22 +4364,24 @@ mod test { }, "ProjectionExec: expr=[i@3 as i, s@4 as s, vec@1 as vec, _distance@2 as _distance] Take: columns=\"_rowid, vec, _distance, i, s\" - FilterExec: _distance@... IS NOT NULL - SortExec: TopK(fetch=11), expr=... - KNNVectorDistance: metric=l2 - RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=2 - UnionExec - ProjectionExec: expr=[_distance@3 as _distance, _rowid@2 as _rowid, vec@0 as vec] - FilterExec: _distance@... IS NOT NULL - SortExec: TopK(fetch=11), expr=... - KNNVectorDistance: metric=l2 - FilterExec: i@1 > 10 - LanceScan: uri=..., projection=[vec, i], row_id=true, row_addr=false, ordered=false - Take: columns=\"_distance, _rowid, vec\" - SortExec: TopK(fetch=11), expr=... - ANNSubIndex: name=..., k=11, deltas=1 - ANNIvfPartition: uuid=..., nprobes=1, deltas=1 - ScalarIndexQuery: query=i > 10", + CoalesceBatchesExec: target_batch_size=8192 + FilterExec: _distance@... IS NOT NULL + SortExec: TopK(fetch=11), expr=... + KNNVectorDistance: metric=l2 + RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=2 + UnionExec + ProjectionExec: expr=[_distance@3 as _distance, _rowid@2 as _rowid, vec@0 as vec] + FilterExec: _distance@... IS NOT NULL + SortExec: TopK(fetch=11), expr=... + KNNVectorDistance: metric=l2 + FilterExec: i@1 > 10 + LanceScan: uri=..., projection=[vec, i], row_id=true, row_addr=false, ordered=false + Take: columns=\"_distance, _rowid, vec\" + CoalesceBatchesExec: target_batch_size=8192 + SortExec: TopK(fetch=11), expr=... + ANNSubIndex: name=..., k=11, deltas=1 + ANNIvfPartition: uuid=..., nprobes=1, deltas=1 + ScalarIndexQuery: query=i > 10", ) .await?; @@ -4363,7 +4392,8 @@ mod test { |scan| scan.project(&["s"])?.filter("i > 10"), "ProjectionExec: expr=[s@1 as s] Take: columns=\"_rowid, s\" - MaterializeIndex: query=i > 10", + CoalesceBatchesExec: target_batch_size=8192 + MaterializeIndex: query=i > 10", ) .await?; @@ -4390,7 +4420,8 @@ mod test { RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=2 UnionExec Take: columns=\"_rowid, s\" - MaterializeIndex: query=i > 10 + CoalesceBatchesExec: target_batch_size=8192 + MaterializeIndex: query=i > 10 ProjectionExec: expr=[_rowid@2 as _rowid, s@0 as s] FilterExec: i@1 > 10 LanceScan: uri=..., projection=[s, i], row_id=true, row_addr=false, ordered=false", @@ -4449,7 +4480,8 @@ mod test { RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=2 UnionExec Take: columns=\"_rowid, s\" - MaterializeIndex: query=i > 10 + CoalesceBatchesExec: target_batch_size=8192 + MaterializeIndex: query=i > 10 ProjectionExec: expr=[_rowid@2 as _rowid, s@0 as s] FilterExec: i@1 > 10 LanceScan: uri=..., projection=[s, i], row_id=true, row_addr=false, ordered=false", @@ -4521,9 +4553,10 @@ mod test { KNNVectorDistance: metric=l2 LanceScan: uri=..., projection=[vec], row_id=true, row_addr=false, ordered=false Take: columns=\"_distance, _rowid, vec\" - SortExec: TopK(fetch=34), expr=[_distance@0 ASC NULLS LAST]... - ANNSubIndex: name=idx, k=34, deltas=1 - ANNIvfPartition: uuid=..., nprobes=1, deltas=1", + CoalesceBatchesExec: target_batch_size=8192 + SortExec: TopK(fetch=34), expr=[_distance@0 ASC NULLS LAST]... + ANNSubIndex: name=idx, k=34, deltas=1 + ANNIvfPartition: uuid=..., nprobes=1, deltas=1", ) .await .unwrap();