Skip to content

Commit

Permalink
add equality, null, and range filter (apache#14542)
Browse files Browse the repository at this point in the history
changes:
* new filters that preserve match value typing to better handle filtering different column types
* sql planner uses new filters by default in sql compatible null handling mode
* remove isFilterable from column capabilities
* proper handling of array filtering, add array processor to column processors
* javadoc for sql test filter functions
* range filter support for arrays, tons more tests, fixes
* add dimension selector tests for mixed type roots
* support json equality
* rename semantic index maker thingys to mostly have plural names since they typically make many indexes, e.g. StringValueSetIndex -> StringValueSetIndexes
* add cooler equality index maker, ValueIndexes 
* fix missing string utf8 index supplier
* expression array comparator stuff
  • Loading branch information
clintropolis authored and sergioferragut committed Jul 21, 2023
1 parent 14b3385 commit 81209df
Show file tree
Hide file tree
Showing 197 changed files with 9,784 additions and 2,206 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@
import org.apache.druid.collections.bitmap.RoaringBitmapFactory;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.segment.column.BitmapColumnIndex;
import org.apache.druid.segment.column.IndexedUtf8ValueSetIndex;
import org.apache.druid.segment.column.StringValueSetIndex;
import org.apache.druid.segment.data.BitmapSerdeFactory;
import org.apache.druid.segment.data.GenericIndexed;
import org.apache.druid.segment.data.RoaringBitmapSerdeFactory;
import org.apache.druid.segment.index.BitmapColumnIndex;
import org.apache.druid.segment.index.IndexedUtf8ValueIndexes;
import org.apache.druid.segment.index.semantic.StringValueSetIndexes;
import org.apache.druid.segment.serde.StringUtf8ColumnIndexSupplier;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
Expand Down Expand Up @@ -72,7 +72,7 @@ public class DictionaryEncodedStringIndexSupplierBenchmark
public static class BenchmarkState
{
@Nullable
private IndexedUtf8ValueSetIndex<?> stringValueSetIndex;
private IndexedUtf8ValueIndexes<?> stringValueSetIndex;
private final TreeSet<ByteBuffer> values = new TreeSet<>();
private static final int START_INT = 10_000_000;

Expand Down Expand Up @@ -112,7 +112,7 @@ public void setup()
);
StringUtf8ColumnIndexSupplier<?> indexSupplier =
new StringUtf8ColumnIndexSupplier<>(bitmapFactory, dictionaryUtf8::singleThreaded, bitmaps, null);
stringValueSetIndex = (IndexedUtf8ValueSetIndex<?>) indexSupplier.as(StringValueSetIndex.class);
stringValueSetIndex = (IndexedUtf8ValueIndexes<?>) indexSupplier.as(StringValueSetIndexes.class);
List<Integer> filterValues = new ArrayList<>();
List<Integer> nonFilterValues = new ArrayList<>();
for (int i = 0; i < dictionarySize; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.apache.druid.query.aggregation.tdigestsketch.TDigestSketchToQuantilePostAggregator;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.ordering.StringComparators;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.segment.IndexBuilder;
import org.apache.druid.segment.QueryableIndex;
Expand Down Expand Up @@ -444,7 +443,7 @@ public void testEmptyTimeseriesResults()
Druids.newTimeseriesQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
.filters(bound("dim2", "0", "0", false, false, null, StringComparators.NUMERIC))
.filters(numericEquality("dim2", 0L, ColumnType.LONG))
.granularity(Granularities.ALL)
.aggregators(ImmutableList.of(
new TDigestSketchAggregatorFactory("a0:agg", "m1", TDigestSketchAggregatorFactory.DEFAULT_COMPRESSION),
Expand Down Expand Up @@ -476,19 +475,19 @@ public void testGroupByAggregatorDefaultValues()
GroupByQuery.builder()
.setDataSource(CalciteTests.DATASOURCE1)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setDimFilter(selector("dim2", "a", null))
.setDimFilter(equality("dim2", "a", ColumnType.STRING))
.setGranularity(Granularities.ALL)
.setVirtualColumns(expressionVirtualColumn("v0", "'a'", ColumnType.STRING))
.setDimensions(new DefaultDimensionSpec("v0", "d0", ColumnType.STRING))
.setAggregatorSpecs(
aggregators(
new FilteredAggregatorFactory(
new TDigestSketchAggregatorFactory("a0:agg", "m1", TDigestSketchAggregatorFactory.DEFAULT_COMPRESSION),
selector("dim1", "nonexistent", null)
equality("dim1", "nonexistent", ColumnType.STRING)
),
new FilteredAggregatorFactory(
new TDigestSketchAggregatorFactory("a1:agg", "qsketch_m1", 100),
selector("dim1", "nonexistent", null)
equality("dim1", "nonexistent", ColumnType.STRING)
)
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.datasketches.hll.HllSketch;
import org.apache.datasketches.hll.TgtHllType;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringEncoding;
import org.apache.druid.query.aggregation.Aggregator;
Expand Down Expand Up @@ -221,6 +222,8 @@ private HllSketchUpdater formulateSketchUpdater(ColumnSelectorFactory columnSele
}
};
break;
case ARRAY:
throw InvalidInput.exception("ARRAY types are not supported for hll sketch");
default:
updater = sketch -> {
Object obj = selector.getObject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.druid.query.aggregation.datasketches.hll.vector;

import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.StringEncoding;
import org.apache.druid.query.aggregation.datasketches.hll.HllSketchBuildBufferAggregatorHelper;
import org.apache.druid.segment.VectorColumnProcessorFactory;
Expand Down Expand Up @@ -83,6 +84,15 @@ public HllSketchBuildVectorProcessor makeLongProcessor(ColumnCapabilities capabi
return new LongHllSketchBuildVectorProcessor(helper, selector);
}

@Override
public HllSketchBuildVectorProcessor makeArrayProcessor(
ColumnCapabilities capabilities,
VectorObjectSelector selector
)
{
throw DruidException.defensive("ARRAY types are not supported for hll sketch");
}

@Override
public HllSketchBuildVectorProcessor makeObjectProcessor(
ColumnCapabilities capabilities,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ public VectorAggregator makeSingleValueDimensionProcessor(
SingleValueDimensionVectorSelector selector
)
{
return new KllSketchNoOpBufferAggregator<KllDoublesSketch>(getEmptySketch());
return new KllSketchNoOpBufferAggregator<>(getEmptySketch());
}

@Override
Expand All @@ -188,7 +188,7 @@ public VectorAggregator makeMultiValueDimensionProcessor(
MultiValueDimensionVectorSelector selector
)
{
return new KllSketchNoOpBufferAggregator<KllDoublesSketch>(getEmptySketch());
return new KllSketchNoOpBufferAggregator<>(getEmptySketch());
}

@Override
Expand All @@ -209,6 +209,12 @@ public VectorAggregator makeLongProcessor(ColumnCapabilities capabilities, Vecto
return new KllDoublesSketchBuildVectorAggregator(selector, getK(), getMaxIntermediateSizeWithNulls());
}

@Override
public VectorAggregator makeArrayProcessor(ColumnCapabilities capabilities, VectorObjectSelector selector)
{
return new KllSketchNoOpBufferAggregator<>(getEmptySketch());
}

@Override
public VectorAggregator makeObjectProcessor(ColumnCapabilities capabilities, VectorObjectSelector selector)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ public VectorAggregator makeSingleValueDimensionProcessor(
SingleValueDimensionVectorSelector selector
)
{
return new KllSketchNoOpBufferAggregator<KllFloatsSketch>(getEmptySketch());
return new KllSketchNoOpBufferAggregator<>(getEmptySketch());
}

@Override
Expand All @@ -188,7 +188,7 @@ public VectorAggregator makeMultiValueDimensionProcessor(
MultiValueDimensionVectorSelector selector
)
{
return new KllSketchNoOpBufferAggregator<KllFloatsSketch>(getEmptySketch());
return new KllSketchNoOpBufferAggregator<>(getEmptySketch());
}

@Override
Expand All @@ -209,6 +209,12 @@ public VectorAggregator makeLongProcessor(ColumnCapabilities capabilities, Vecto
return new KllFloatsSketchBuildVectorAggregator(selector, getK(), getMaxIntermediateSizeWithNulls());
}

@Override
public VectorAggregator makeArrayProcessor(ColumnCapabilities capabilities, VectorObjectSelector selector)
{
return new KllSketchNoOpBufferAggregator<>(getEmptySketch());
}

@Override
public VectorAggregator makeObjectProcessor(ColumnCapabilities capabilities, VectorObjectSelector selector)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,12 @@ public VectorAggregator makeLongProcessor(ColumnCapabilities capabilities, Vecto
return new DoublesSketchBuildVectorAggregator(selector, k, getMaxIntermediateSizeWithNulls());
}

@Override
public VectorAggregator makeArrayProcessor(ColumnCapabilities capabilities, VectorObjectSelector selector)
{
return new NoopDoublesSketchBufferAggregator();
}

@Override
public VectorAggregator makeObjectProcessor(ColumnCapabilities capabilities, VectorObjectSelector selector)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.datasketches.theta.SetOperation;
import org.apache.datasketches.theta.Union;
import org.apache.datasketches.thetacommon.ThetaUtil;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.aggregation.AggregateCombiner;
import org.apache.druid.query.aggregation.Aggregator;
Expand All @@ -39,6 +40,7 @@
import org.apache.druid.segment.ColumnInspector;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -78,13 +80,21 @@ public SketchAggregatorFactory(String name, String fieldName, Integer size, byte
@Override
public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
ColumnCapabilities capabilities = metricFactory.getColumnCapabilities(fieldName);
if (capabilities != null && capabilities.isArray()) {
throw InvalidInput.exception("ARRAY types are not supported for theta sketch");
}
BaseObjectColumnValueSelector selector = metricFactory.makeColumnValueSelector(fieldName);
return new SketchAggregator(selector, size);
}

@Override
public AggregatorAndSize factorizeWithSize(ColumnSelectorFactory metricFactory)
{
ColumnCapabilities capabilities = metricFactory.getColumnCapabilities(fieldName);
if (capabilities != null && capabilities.isArray()) {
throw InvalidInput.exception("ARRAY types are not supported for theta sketch");
}
BaseObjectColumnValueSelector selector = metricFactory.makeColumnValueSelector(fieldName);
final SketchAggregator aggregator = new SketchAggregator(selector, size);
return new AggregatorAndSize(aggregator, aggregator.getInitialSizeBytes());
Expand All @@ -94,6 +104,10 @@ public AggregatorAndSize factorizeWithSize(ColumnSelectorFactory metricFactory)
@Override
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
{
ColumnCapabilities capabilities = metricFactory.getColumnCapabilities(fieldName);
if (capabilities != null && capabilities.isArray()) {
throw InvalidInput.exception("ARRAY types are not supported for theta sketch");
}
BaseObjectColumnValueSelector selector = metricFactory.makeColumnValueSelector(fieldName);
return new SketchBufferAggregator(selector, size, getMaxIntermediateSizeWithNulls());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,12 @@ public Supplier<Object[]> makeLongProcessor(ColumnCapabilities capabilities, Vec
};
}

@Override
public Supplier<Object[]> makeArrayProcessor(ColumnCapabilities capabilities, VectorObjectSelector selector)
{
return selector::getObjectVector;
}

@Override
public Supplier<Object[]> makeObjectProcessor(ColumnCapabilities capabilities, VectorObjectSelector selector)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ public class HllSketchSqlAggregatorTest extends BaseCalciteQueryTest
private static final List<AggregatorFactory> EXPECTED_FILTERED_AGGREGATORS =
EXPECTED_PA_AGGREGATORS.stream()
.limit(5)
.map(factory -> new FilteredAggregatorFactory(factory, selector("dim2", "a", null)))
.map(factory -> new FilteredAggregatorFactory(factory, equality("dim2", "a", ColumnType.STRING)))
.collect(Collectors.toList());

/**
Expand Down Expand Up @@ -344,7 +344,7 @@ public void testApproxCountDistinctHllSketch()
new HllSketchBuildAggregatorFactory("a1", "dim2", null, null, null, null, ROUND),
new FilteredAggregatorFactory(
new HllSketchBuildAggregatorFactory("a2", "dim2", null, null, null, null, ROUND),
BaseCalciteQueryTest.not(BaseCalciteQueryTest.selector("dim2", "", null))
not(equality("dim2", "", ColumnType.STRING))
),
new HllSketchBuildAggregatorFactory("a3", "v0", null, null, null, null, ROUND),
new HllSketchBuildAggregatorFactory("a4", "v1", null, null, null, null, ROUND),
Expand Down Expand Up @@ -436,7 +436,7 @@ public void testAvgDailyCountDistinctHllSketch()
new LongSumAggregatorFactory("_a0:sum", "a0"),
new FilteredAggregatorFactory(
new CountAggregatorFactory("_a0:count"),
BaseCalciteQueryTest.not(BaseCalciteQueryTest.selector("a0", null, null))
notNull("a0")
)
)
)
Expand Down Expand Up @@ -480,7 +480,7 @@ public void testApproxCountDistinctHllSketchIsRounded()
new HllSketchBuildAggregatorFactory("a0", "m1", null, null, null, true, true)
)
)
.setHavingSpec(having(selector("a0", "2", null)))
.setHavingSpec(having(equality("a0", 2L, ColumnType.LONG)))
.setContext(QUERY_CONTEXT_DEFAULT)
.build()
),
Expand Down Expand Up @@ -852,7 +852,7 @@ public void testEmptyTimeseriesResults()
ImmutableList.of(Druids.newTimeseriesQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Filtration.eternity()))
.filters(bound("dim2", "0", "0", false, false, null, StringComparators.NUMERIC))
.filters(numericEquality("dim2", 0L, ColumnType.LONG))
.granularity(Granularities.ALL)
.aggregators(
aggregators(
Expand Down Expand Up @@ -895,7 +895,7 @@ public void testGroupByAggregatorDefaultValues()
GroupByQuery.builder()
.setDataSource(CalciteTests.DATASOURCE1)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setDimFilter(selector("dim2", "a", null))
.setDimFilter(equality("dim2", "a", ColumnType.STRING))
.setGranularity(Granularities.ALL)
.setVirtualColumns(expressionVirtualColumn("v0", "'a'", ColumnType.STRING))
.setDimensions(new DefaultDimensionSpec("v0", "_d0", ColumnType.STRING))
Expand All @@ -911,7 +911,7 @@ public void testGroupByAggregatorDefaultValues()
null,
true
),
selector("dim1", "nonexistent", null)
equality("dim1", "nonexistent", ColumnType.STRING)
),
new FilteredAggregatorFactory(
new HllSketchBuildAggregatorFactory(
Expand All @@ -923,7 +923,7 @@ public void testGroupByAggregatorDefaultValues()
false,
true
),
selector("dim1", "nonexistent", null)
equality("dim1", "nonexistent", ColumnType.STRING)
)
)
)
Expand Down Expand Up @@ -954,19 +954,19 @@ public void testGroupByAggregatorDefaultValuesFinalizeOuterSketches()
GroupByQuery.builder()
.setDataSource(CalciteTests.DATASOURCE1)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setDimFilter(selector("dim2", "a", null))
.setDimFilter(equality("dim2", "a", ColumnType.STRING))
.setGranularity(Granularities.ALL)
.setVirtualColumns(expressionVirtualColumn("v0", "'a'", ColumnType.STRING))
.setDimensions(new DefaultDimensionSpec("v0", "_d0", ColumnType.STRING))
.setAggregatorSpecs(
aggregators(
new FilteredAggregatorFactory(
new HllSketchBuildAggregatorFactory("a0", "v0", null, null, null, null, true),
selector("dim1", "nonexistent", null)
equality("dim1", "nonexistent", ColumnType.STRING)
),
new FilteredAggregatorFactory(
new HllSketchBuildAggregatorFactory("a1", "v0", null, null, null, null, true),
selector("dim1", "nonexistent", null)
equality("dim1", "nonexistent", ColumnType.STRING)
)
)
)
Expand Down
Loading

0 comments on commit 81209df

Please sign in to comment.