Skip to content

Commit

Permalink
fix filtering bug in filtering unnest cols and dim cols: Received a n…
Browse files Browse the repository at this point in the history
…on-applicable rewrite (#14587)
  • Loading branch information
pranavbhole authored Aug 17, 2023
1 parent f585f0a commit 26d82fd
Show file tree
Hide file tree
Showing 4 changed files with 741 additions and 133 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.data.Indexed;
import org.apache.druid.segment.data.ListIndexed;
import org.apache.druid.segment.filter.AndFilter;
import org.apache.druid.segment.filter.BoundFilter;
import org.apache.druid.segment.filter.Filters;
import org.apache.druid.segment.filter.LikeFilter;
Expand Down Expand Up @@ -314,84 +315,29 @@ to generate filters to be passed to base cursor (filtersPushedDownToBaseCursor)
filtersPushedDownToBaseCursor -> null (as the filter cannot be re-written due to presence of virtual columns)
filtersForPostUnnestCursor -> d12 IN (a,b) or m1 < 10
*/
class FilterSplitter
{
final List<Filter> filtersPushedDownToBaseCursor = new ArrayList<>();
final List<Filter> filtersForPostUnnestCursor = new ArrayList<>();

void addPostFilterWithPreFilterIfRewritePossible(@Nullable final Filter filter, boolean skipPreFilters)
{
if (filter == null) {
return;
}
if (!skipPreFilters) {
final Filter newFilter = rewriteFilterOnUnnestColumnIfPossible(filter, inputColumn, inputColumnCapabilites);
if (newFilter != null) {
// Add the rewritten filter pre-unnest, so we get the benefit of any indexes, and so we avoid unnesting
// any rows that do not match this filter at all.
filtersPushedDownToBaseCursor.add(newFilter);
}
}
// Add original filter post-unnest no matter what: we need to filter out any extraneous unnested values.
filtersForPostUnnestCursor.add(filter);
}

void addPreFilter(@Nullable final Filter filter)
{
if (filter == null) {
return;
}

final Set<String> requiredColumns = filter.getRequiredColumns();

// Run filter post-unnest if it refers to any virtual columns. This is a conservative judgement call
// that perhaps forces the code to use a ValueMatcher where an index would've been available,
// which can have real performance implications. This is an interim choice made to value correctness
// over performance. When we need to optimize this performance, we should be able to
// create a VirtualColumnDatasource that contains all the virtual columns, in which case the query
// itself would stop carrying them and everything should be able to be pushed down.
if (queryVirtualColumns.getVirtualColumns().length > 0) {
for (String column : requiredColumns) {
if (queryVirtualColumns.exists(column)) {
filtersForPostUnnestCursor.add(filter);
return;
}
}
}
filtersPushedDownToBaseCursor.add(filter);

}
}

final FilterSplitter filterSplitter = new FilterSplitter();
final FilterSplitter filterSplitter = new FilterSplitter(inputColumn, inputColumnCapabilites, queryVirtualColumns);

if (queryFilter != null) {
List<Filter> preFilterList = new ArrayList<>();
final int origFilterSize;
if (queryFilter.getRequiredColumns().contains(outputColumnName)) {
// outside filter contains unnested column
// requires check for OR
if (queryFilter instanceof OrFilter) {
origFilterSize = ((OrFilter) queryFilter).getFilters().size();
for (Filter filter : ((OrFilter) queryFilter).getFilters()) {
if (filter.getRequiredColumns().contains(outputColumnName)) {
final Filter newFilter = rewriteFilterOnUnnestColumnIfPossible(
filter,
inputColumn,
inputColumnCapabilites
);
if (newFilter != null) {
preFilterList.add(newFilter);
}
} else {
preFilterList.add(filter);
// requires check for OR and And filters, disqualify rewrite for non-unnest filters
if (queryFilter instanceof BooleanFilter) {
boolean isTopLevelAndFilter = queryFilter instanceof AndFilter;
List<Filter> preFilterList = recursiveRewriteOnUnnestFilters(
(BooleanFilter) queryFilter,
inputColumn,
inputColumnCapabilites,
filterSplitter,
isTopLevelAndFilter
);
// If rewite on entire query filter is successful then add entire filter to preFilter else skip and only add to post filter.
if (filterSplitter.getPreFilterCount() == filterSplitter.getOriginalFilterCount()) {
if (queryFilter instanceof AndFilter) {
filterSplitter.addPreFilter(new AndFilter(preFilterList));
} else if (queryFilter instanceof OrFilter) {
filterSplitter.addPreFilter(new OrFilter(preFilterList));
}
}
if (preFilterList.size() == origFilterSize) {
// there has been successful rewrites
final OrFilter preOrFilter = new OrFilter(preFilterList);
filterSplitter.addPreFilter(preOrFilter);
}
// add the entire query filter to unnest filter to be used in Value matcher
filterSplitter.addPostFilterWithPreFilterIfRewritePossible(queryFilter, true);
} else {
Expand All @@ -412,7 +358,173 @@ void addPreFilter(@Nullable final Filter filter)
);
}

class FilterSplitter
{
private String inputColumn;
private ColumnCapabilities inputColumnCapabilites;
private VirtualColumns queryVirtualColumns;

private int originalFilterCount = 0;
private int preFilterCount = 0;

public FilterSplitter(
String inputColumn,
ColumnCapabilities inputColumnCapabilites,
VirtualColumns queryVirtualColumns
)
{
this.inputColumn = inputColumn;
this.inputColumnCapabilites = inputColumnCapabilites;
this.queryVirtualColumns = queryVirtualColumns;
}

final List<Filter> filtersPushedDownToBaseCursor = new ArrayList<>();
final List<Filter> filtersForPostUnnestCursor = new ArrayList<>();

void addPostFilterWithPreFilterIfRewritePossible(@Nullable final Filter filter, boolean skipPreFilters)
{
if (filter == null) {
return;
}
if (!skipPreFilters) {
final Filter newFilter = rewriteFilterOnUnnestColumnIfPossible(filter, inputColumn, inputColumnCapabilites);
if (newFilter != null) {
// Add the rewritten filter pre-unnest, so we get the benefit of any indexes, and so we avoid unnesting
// any rows that do not match this filter at all.
filtersPushedDownToBaseCursor.add(newFilter);
}
}
// Add original filter post-unnest no matter what: we need to filter out any extraneous unnested values.
filtersForPostUnnestCursor.add(filter);
}

void addPreFilter(@Nullable final Filter filter)
{
if (filter == null) {
return;
}

final Set<String> requiredColumns = filter.getRequiredColumns();

// Run filter post-unnest if it refers to any virtual columns. This is a conservative judgement call
// that perhaps forces the code to use a ValueMatcher where an index would've been available,
// which can have real performance implications. This is an interim choice made to value correctness
// over performance. When we need to optimize this performance, we should be able to
// create a VirtualColumnDatasource that contains all the virtual columns, in which case the query
// itself would stop carrying them and everything should be able to be pushed down.
if (queryVirtualColumns.getVirtualColumns().length > 0) {
for (String column : requiredColumns) {
if (queryVirtualColumns.exists(column)) {
filtersForPostUnnestCursor.add(filter);
return;
}
}
}
filtersPushedDownToBaseCursor.add(filter);

}

public void addToOriginalFilterCount(int c)
{
originalFilterCount += c;
}

public void addToPreFilterCount(int c)
{
preFilterCount += c;
}

public int getOriginalFilterCount()
{
return originalFilterCount;
}

public int getPreFilterCount()
{
return preFilterCount;
}
}

/**
* handles the nested rewrite for unnest columns in recursive way,
* it loops through all and/or filters and rewrite only required filters in the child and add it to preFilter if qualified
* or else skip adding it to preFilters.
* RULES:
* 1. Add to preFilters only when top level filter is AND.
* for example: a=1 and (b=2 or c=2) , In this case a=1 can be added as preFilters but we can not add b=2 as preFilters.
* 2. If Top level is OR filter then we can either choose to add entire top level OR filter to preFilter or skip it all together.
* for example: a=1 or (b=2 and c=2)
* 3. Filters on unnest column which is derived from Array or any other Expression can not be pushe down to base.
* for example: a=1 and vc=3 , lets say vc is ExpressionVirtualColumn, and vc=3 can not be push down to base even if top level is AND filter.
* A. Unnesting a single dimension e.g. select * from foo, UNNEST(MV_TO_ARRAY(dim3)) as u(d3)
* B. Unnesting an expression from multiple columns e.g. select * from foo, UNNEST(ARRAY[dim1,dim2]) as u(d12)
* In case A, d3 is a direct reference to dim3 so any filter using d3 can be rewritten using dim3 and added to pre filter
* while in case B, due to presence of the expression virtual column expressionVirtualColumn("j0.unnest", "array(\"dim1\",\"dim2\")", ColumnType.STRING_ARRAY)
* the filters on d12 cannot be pushed to the pre filters
*
* @param queryFilter query filter passed to makeCursors
* @param inputColumn input column to unnest if it's a direct access; otherwise null
* @param inputColumnCapabilites input column capabilities if known; otherwise null
*/
private List<Filter> recursiveRewriteOnUnnestFilters(
BooleanFilter queryFilter,
final String inputColumn,
final ColumnCapabilities inputColumnCapabilites,
final FilterSplitter filterSplitter,
final boolean isTopLevelAndFilter
)
{
final List<Filter> preFilterList = new ArrayList<>();
for (Filter filter : queryFilter.getFilters()) {
if (filter.getRequiredColumns().contains(outputColumnName)) {
if (filter instanceof AndFilter) {
preFilterList.add(new AndFilter(recursiveRewriteOnUnnestFilters(
(BooleanFilter) filter,
inputColumn,
inputColumnCapabilites,
filterSplitter,
isTopLevelAndFilter
)));
} else if (filter instanceof OrFilter) {
// in case of Or Fiters, we set isTopLevelAndFilter to false that prevents pushing down any child filters to base
List<Filter> orChildFilters = recursiveRewriteOnUnnestFilters(
(BooleanFilter) filter,
inputColumn,
inputColumnCapabilites,
filterSplitter,
false
);
preFilterList.add(new OrFilter(orChildFilters));
} else {
final Filter newFilter = rewriteFilterOnUnnestColumnIfPossible(
filter,
inputColumn,
inputColumnCapabilites
);
if (newFilter != null) {
// this is making sure that we are not pushing the unnest columns filters to base filter without rewriting.
preFilterList.add(newFilter);
filterSplitter.addToPreFilterCount(1);
}
/*
Push down the filters to base only if top level is And Filter
we can not push down if top level filter is OR or unnestColumn is derived expression like arrays
*/
if (isTopLevelAndFilter && getUnnestInputIfDirectAccess(unnestColumn) != null) {
filterSplitter.addPreFilter(newFilter != null ? newFilter : filter);
}
filterSplitter.addToOriginalFilterCount(1);
}
} else {
preFilterList.add(filter);
// for filters on non unnest columns, we still need to count the nested filters if any as we are not traversing it in this method
int filterCount = Filters.countNumberOfFilters(filter);
filterSplitter.addToOriginalFilterCount(filterCount);
filterSplitter.addToPreFilterCount(filterCount);
}
}
return preFilterList;
}
/**
* Returns the input of {@link #unnestColumn}, if it's a direct access; otherwise returns null.
*/
Expand Down Expand Up @@ -465,7 +577,6 @@ static boolean filterMapsOverMultiValueStrings(final Filter filter)
return false;
}
}

return true;
} else if (filter instanceof NotFilter) {
return filterMapsOverMultiValueStrings(((NotFilter) filter).getBaseFilter());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.druid.query.DefaultBitmapResultFactory;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.filter.BooleanFilter;
import org.apache.druid.query.filter.ColumnIndexSelector;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.filter.DruidPredicateFactory;
Expand Down Expand Up @@ -422,4 +423,19 @@ private static LinkedHashSet<Filter> flattenOrChildren(final Collection<Filter>

return retVal;
}

public static int countNumberOfFilters(@Nullable Filter filter)
{
if (filter == null) {
return 0;
}
if (filter instanceof BooleanFilter) {
return ((BooleanFilter) filter).getFilters()
.stream()
.map(f -> countNumberOfFilters(f))
.mapToInt(Integer::intValue)
.sum();
}
return 1;
}
}
Loading

0 comments on commit 26d82fd

Please sign in to comment.