Skip to content

Commit

Permalink
Use OffsetIndex to prune IO with RowSelection (#2473)
Browse files Browse the repository at this point in the history
* Add struct for in-memory row group with only selected pages

* Read only pages required for row selection

* Remove InMemoryColumnChumk and prune IO for row selection

* Review comments

* Unignore test

* Avoid copies

* Fix docs

* Linting
  • Loading branch information
thinkharderdev committed Aug 17, 2022
1 parent 42e9531 commit 2185ce2
Show file tree
Hide file tree
Showing 4 changed files with 384 additions and 296 deletions.
147 changes: 144 additions & 3 deletions parquet/src/arrow/arrow_reader/selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@

use arrow::array::{Array, BooleanArray};
use arrow::compute::SlicesIterator;
use parquet_format::PageLocation;
use std::cmp::Ordering;
use std::collections::VecDeque;
use std::ops::Range;

/// [`RowSelector`] represents a range of rows to scan from a parquet file
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
/// [`RowSelection`] is a collection of [`RowSelector`] used to skip rows when
/// scanning a parquet file
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub struct RowSelector {
/// The number of rows
pub row_count: usize,
Expand Down Expand Up @@ -116,6 +118,57 @@ impl RowSelection {
Self { selectors }
}

/// Given an offset index, return the offset ranges for all data pages selected by `self`
pub(crate) fn scan_ranges(
&self,
page_locations: &[PageLocation],
) -> Vec<Range<usize>> {
let mut ranges = vec![];
let mut row_offset = 0;

let mut pages = page_locations.iter().peekable();
let mut selectors = self.selectors.iter().cloned();
let mut current_selector = selectors.next();
let mut current_page = pages.next();

let mut current_page_included = false;

while let Some((selector, page)) = current_selector.as_mut().zip(current_page) {
if !(selector.skip || current_page_included) {
let start = page.offset as usize;
let end = start + page.compressed_page_size as usize;
ranges.push(start..end);
current_page_included = true;
}

if let Some(next_page) = pages.peek() {
if row_offset + selector.row_count > next_page.first_row_index as usize {
let remaining_in_page =
next_page.first_row_index as usize - row_offset;
selector.row_count -= remaining_in_page;
row_offset += remaining_in_page;
current_page = pages.next();
current_page_included = false;

continue;
} else {
if row_offset + selector.row_count
== next_page.first_row_index as usize
{
current_page = pages.next();
current_page_included = false;
}
row_offset += selector.row_count;
current_selector = selectors.next();
}
} else {
break;
}
}

ranges
}

/// Splits off the first `row_count` from this [`RowSelection`]
pub fn split_off(&mut self, row_count: usize) -> Self {
let mut total_count = 0;
Expand Down Expand Up @@ -162,7 +215,7 @@ impl RowSelection {
/// self: NNNNNNNNNNNNYYYYYYYYYYYYYYYYYYYYYYNNNYYYYY
/// other: YYYYYNNNNYYYYYYYYYYYYY YYNNN
///
/// returned: NNNNNNNNNNNNYYYYYNNNNYYYYYYYYYYYYYYNNYNNNN
/// returned: NNNNNNNNNNNNYYYYYNNNNYYYYYYYYYYYYYNNNYYNNN
///
///
pub fn and_then(&self, other: &Self) -> Self {
Expand Down Expand Up @@ -423,4 +476,92 @@ mod tests {
assert_eq!(a.and_then(&b), expected);
}
}

#[test]
fn test_scan_ranges() {
let index = vec![
PageLocation {
offset: 0,
compressed_page_size: 10,
first_row_index: 0,
},
PageLocation {
offset: 10,
compressed_page_size: 10,
first_row_index: 10,
},
PageLocation {
offset: 20,
compressed_page_size: 10,
first_row_index: 20,
},
PageLocation {
offset: 30,
compressed_page_size: 10,
first_row_index: 30,
},
PageLocation {
offset: 40,
compressed_page_size: 10,
first_row_index: 40,
},
PageLocation {
offset: 50,
compressed_page_size: 10,
first_row_index: 50,
},
PageLocation {
offset: 60,
compressed_page_size: 10,
first_row_index: 60,
},
];

let selection = RowSelection::from(vec![
// Skip first page
RowSelector::skip(10),
// Multiple selects in same page
RowSelector::select(3),
RowSelector::skip(3),
RowSelector::select(4),
// Select to page boundary
RowSelector::skip(5),
RowSelector::select(5),
// Skip full page past page boundary
RowSelector::skip(12),
// Select across page boundaries
RowSelector::select(12),
// Skip final page
RowSelector::skip(12),
]);

let ranges = selection.scan_ranges(&index);

// assert_eq!(mask, vec![false, true, true, false, true, true, false]);
assert_eq!(ranges, vec![10..20, 20..30, 40..50, 50..60]);

let selection = RowSelection::from(vec![
// Skip first page
RowSelector::skip(10),
// Multiple selects in same page
RowSelector::select(3),
RowSelector::skip(3),
RowSelector::select(4),
// Select to page boundary
RowSelector::skip(5),
RowSelector::select(5),
// Skip full page past page boundary
RowSelector::skip(12),
// Select across page boundaries
RowSelector::select(12),
RowSelector::skip(1),
// Select across page boundaries including final page
RowSelector::select(8),
]);

let ranges = selection.scan_ranges(&index);

// assert_eq!(mask, vec![false, true, true, false, true, true, true]);
assert_eq!(ranges, vec![10..20, 20..30, 40..50, 50..60, 60..70]);
}
}
Loading

0 comments on commit 2185ce2

Please sign in to comment.