Skip to content

Commit

Permalink
Refactor logic for unnest filters/query filters and add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
pranavbhole committed Jul 31, 2023
1 parent 6ad7f27 commit dd1f92d
Show file tree
Hide file tree
Showing 4 changed files with 266 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -311,71 +311,30 @@ to generate filters to be passed to base cursor (filtersPushedDownToBaseCursor)
filtersPushedDownToBaseCursor -> null (as the filter cannot be re-written due to presence of virtual columns)
filtersForPostUnnestCursor -> d12 IN (a,b) or m1 < 10
*/
class FilterSplitter
{
final List<Filter> filtersPushedDownToBaseCursor = new ArrayList<>();
final List<Filter> filtersForPostUnnestCursor = new ArrayList<>();

void addPostFilterWithPreFilterIfRewritePossible(@Nullable final Filter filter, boolean skipPreFilters)
{
if (filter == null) {
return;
}
if (!skipPreFilters) {
final Filter newFilter = rewriteFilterOnUnnestColumnIfPossible(filter, inputColumn, inputColumnCapabilites);
if (newFilter != null) {
// Add the rewritten filter pre-unnest, so we get the benefit of any indexes, and so we avoid unnesting
// any rows that do not match this filter at all.
filtersPushedDownToBaseCursor.add(newFilter);
}
}
// Add original filter post-unnest no matter what: we need to filter out any extraneous unnested values.
filtersForPostUnnestCursor.add(filter);
}

void addPreFilter(@Nullable final Filter filter)
{
if (filter == null) {
return;
}

final Set<String> requiredColumns = filter.getRequiredColumns();

// Run filter post-unnest if it refers to any virtual columns. This is a conservative judgement call
// that perhaps forces the code to use a ValueMatcher where an index would've been available,
// which can have real performance implications. This is an interim choice made to value correctness
// over performance. When we need to optimize this performance, we should be able to
// create a VirtualColumnDatasource that contains all the virtual columns, in which case the query
// itself would stop carrying them and everything should be able to be pushed down.
if (queryVirtualColumns.getVirtualColumns().length > 0) {
for (String column : requiredColumns) {
if (queryVirtualColumns.exists(column)) {
filtersForPostUnnestCursor.add(filter);
return;
}
}
}
filtersPushedDownToBaseCursor.add(filter);

}
}

final FilterSplitter filterSplitter = new FilterSplitter();
final FilterSplitter filterSplitter = new FilterSplitter(inputColumn, inputColumnCapabilites, queryVirtualColumns);

if (queryFilter != null) {
if (queryFilter.getRequiredColumns().contains(outputColumnName)) {
// outside filter contains unnested column
// requires check for OR and And filters, disqualify rewrite for non-unnest filters
if (queryFilter instanceof BooleanFilter) {
int originalFilterCount = Filters.countNumberOfFilters(queryFilter);
boolean isTopLevelAndFilter = queryFilter instanceof AndFilter;
List<Filter> preFilterList = recursiveRewriteOnUnnestFilters(
(BooleanFilter) queryFilter,
inputColumn,
inputColumnCapabilites
inputColumnCapabilites,
filterSplitter,
isTopLevelAndFilter
);
if (queryFilter instanceof AndFilter) {
filterSplitter.addPreFilter(new AndFilter(preFilterList));
} else if (queryFilter instanceof OrFilter) {
filterSplitter.addPreFilter(new OrFilter(preFilterList));
int preFilterSize = preFilterList.stream().map(f -> Filters.countNumberOfFilters(f)).mapToInt(Integer::intValue).sum();
// If rewite on entire query filter is successful then add entire filter to preFilter else skip and only add to post filter.
if (originalFilterCount == preFilterSize) {
if (queryFilter instanceof AndFilter) {
filterSplitter.addPreFilter(new AndFilter(preFilterList));
} else if (queryFilter instanceof OrFilter) {
filterSplitter.addPreFilter(new OrFilter(preFilterList));
}
}
// add the entire query filter to unnest filter to be used in Value matcher
filterSplitter.addPostFilterWithPreFilterIfRewritePossible(queryFilter, true);
Expand All @@ -397,9 +356,81 @@ void addPreFilter(@Nullable final Filter filter)
);
}

class FilterSplitter
{
private String inputColumn;
private ColumnCapabilities inputColumnCapabilites;
private VirtualColumns queryVirtualColumns;

public FilterSplitter(
String inputColumn,
ColumnCapabilities inputColumnCapabilites, VirtualColumns queryVirtualColumns
)
{
this.inputColumn = inputColumn;
this.inputColumnCapabilites = inputColumnCapabilites;
this.queryVirtualColumns = queryVirtualColumns;
}

final List<Filter> filtersPushedDownToBaseCursor = new ArrayList<>();
final List<Filter> filtersForPostUnnestCursor = new ArrayList<>();

void addPostFilterWithPreFilterIfRewritePossible(@Nullable final Filter filter, boolean skipPreFilters)
{
if (filter == null) {
return;
}
if (!skipPreFilters) {
final Filter newFilter = rewriteFilterOnUnnestColumnIfPossible(filter, inputColumn, inputColumnCapabilites);
if (newFilter != null) {
// Add the rewritten filter pre-unnest, so we get the benefit of any indexes, and so we avoid unnesting
// any rows that do not match this filter at all.
filtersPushedDownToBaseCursor.add(newFilter);
}
}
// Add original filter post-unnest no matter what: we need to filter out any extraneous unnested values.
filtersForPostUnnestCursor.add(filter);
}

void addPreFilter(@Nullable final Filter filter)
{
if (filter == null) {
return;
}

final Set<String> requiredColumns = filter.getRequiredColumns();

// Run filter post-unnest if it refers to any virtual columns. This is a conservative judgement call
// that perhaps forces the code to use a ValueMatcher where an index would've been available,
// which can have real performance implications. This is an interim choice made to value correctness
// over performance. When we need to optimize this performance, we should be able to
// create a VirtualColumnDatasource that contains all the virtual columns, in which case the query
// itself would stop carrying them and everything should be able to be pushed down.
if (queryVirtualColumns.getVirtualColumns().length > 0) {
for (String column : requiredColumns) {
if (queryVirtualColumns.exists(column)) {
filtersForPostUnnestCursor.add(filter);
return;
}
}
}
filtersPushedDownToBaseCursor.add(filter);

}
}

/**
* handles the nested rewrite for unnest columns in recursive way,
* it loops through all and/or filters and rewrite only required filters in the child and skip others
* it loops through all and/or filters and rewrite only required filters in the child and add it to preFilter if qualified
* or else skip adding it to preFilters.
* RULES:
* 1. Add to preFilters only when top level filter is AND.
* for example: a=1 and (b=2 or c=2) , In this case a=1 can be added as preFilters but we can not add b=2 as preFilters.
* 2. If Top level is OR filter then we can either choose to add entire top level OR filter to preFilter or skip it all together.
* for example: a=1 or (b=2 and c=2)
* 3. Filters on unnest column which is derived from Array or any other Expression can not be pushe down to base.
* for example: a=1 and vc=3 , lets say vc is ExpressionVirtualColumn, and vc=3 can not be push down to base even if top level is AND filter.
* 4.
*
* @param queryFilter query filter passed to makeCursors
* @param inputColumn input column to unnest if it's a direct access; otherwise null
Expand All @@ -408,7 +439,9 @@ void addPreFilter(@Nullable final Filter filter)
private List<Filter> recursiveRewriteOnUnnestFilters(
BooleanFilter queryFilter,
final String inputColumn,
final ColumnCapabilities inputColumnCapabilites
final ColumnCapabilities inputColumnCapabilites,
final FilterSplitter filterSplitter,
final boolean isTopLevelAndFilter
)
{
final List<Filter> preFilterList = new ArrayList<>();
Expand All @@ -418,14 +451,20 @@ private List<Filter> recursiveRewriteOnUnnestFilters(
preFilterList.add(new AndFilter(recursiveRewriteOnUnnestFilters(
(BooleanFilter) filter,
inputColumn,
inputColumnCapabilites
inputColumnCapabilites,
filterSplitter,
isTopLevelAndFilter
)));
} else if (filter instanceof OrFilter) {
preFilterList.add(new OrFilter(recursiveRewriteOnUnnestFilters(
// in case of Or Fiters, we set isTopLevelAndFilter to false that prevents pushing down any child filters to base
List<Filter> orChildFilters = recursiveRewriteOnUnnestFilters(
(BooleanFilter) filter,
inputColumn,
inputColumnCapabilites
)));
inputColumnCapabilites,
filterSplitter,
false
);
preFilterList.add(new OrFilter(orChildFilters));
} else {
final Filter newFilter = rewriteFilterOnUnnestColumnIfPossible(
filter,
Expand All @@ -434,8 +473,13 @@ private List<Filter> recursiveRewriteOnUnnestFilters(
);
if (newFilter != null) {
preFilterList.add(newFilter);
} else {
preFilterList.add(filter);
}
/*
Push down the filters to base only if top level is And Filter
we can not push down if top level filter is OR or unnestColumn is derived expression like arrays
*/
if (isTopLevelAndFilter && getUnnestInputIfDirectAccess(unnestColumn) != null) {
filterSplitter.addPreFilter(newFilter != null ? newFilter : filter);
}
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,14 @@
import org.apache.druid.query.DefaultBitmapResultFactory;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.extraction.ExtractionFn;
import org.apache.druid.query.filter.BooleanFilter;
import org.apache.druid.query.filter.ColumnIndexSelector;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.filter.DruidPredicateFactory;
import org.apache.druid.query.filter.Filter;
import org.apache.druid.query.filter.FilterTuning;
import org.apache.druid.query.filter.SelectorDimFilter;
import org.apache.druid.query.filter.ValueMatcher;
import org.apache.druid.segment.ColumnProcessors;
import org.apache.druid.segment.ColumnSelector;
Expand Down Expand Up @@ -422,4 +425,33 @@ private static LinkedHashSet<Filter> flattenOrChildren(final Collection<Filter>

return retVal;
}

public static int countNumberOfFilters(Filter filter)
{
if (filter == null) {
return 0;
}
if (filter instanceof BooleanFilter) {
return ((BooleanFilter) filter).getFilters()
.stream()
.map(f -> countNumberOfFilters(f))
.mapToInt(Integer::intValue)
.sum();
}
return 1;
}

public static Filter sdf(String dimension, String value)
{
return new SelectorDimFilter(dimension, value, null).toFilter();
}

public static Filter sdf(String dimension, String value, ExtractionFn extractionFn)
{
return new SelectorDimFilter(dimension, value, extractionFn).toFilter();
}
public static DimFilter sdfd(String dimension, String value, ExtractionFn extractionFn)
{
return new SelectorDimFilter(dimension, value, extractionFn);
}
}
Loading

0 comments on commit dd1f92d

Please sign in to comment.