Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add equality, null, and range filter #14542

Merged
merged 47 commits into from
Jul 18, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
1ca9cb5
add equality, null, and range filter
clintropolis Jul 7, 2023
97d4c2b
more better
clintropolis Jul 7, 2023
899bf58
more
clintropolis Jul 7, 2023
b545dc2
more stuff
clintropolis Jul 7, 2023
e49a64a
fix stuff, more tests, etc
clintropolis Jul 10, 2023
cfcef49
adjust
clintropolis Jul 10, 2023
3defa8a
fix tests
clintropolis Jul 10, 2023
ab97b3e
remove ignored
clintropolis Jul 10, 2023
3209790
adjust
clintropolis Jul 10, 2023
e9d7e04
fixes
clintropolis Jul 10, 2023
92a3fce
fix
clintropolis Jul 10, 2023
b141f56
Merge remote-tracking branch 'upstream/master' into nicer-filters
clintropolis Jul 10, 2023
328d65e
more test
clintropolis Jul 10, 2023
2d6e9ec
javadoc for sql test filter functions
clintropolis Jul 10, 2023
2af2350
range filter support for arrays, tons more tests, fixes
clintropolis Jul 11, 2023
77a95f1
add dimension selector tests for mixed type roots
clintropolis Jul 11, 2023
f5d4f74
style
clintropolis Jul 11, 2023
da894b4
more coverage maybe
clintropolis Jul 11, 2023
acb18a9
deprecated is lies, at least for this test...
clintropolis Jul 11, 2023
646f4b8
Merge remote-tracking branch 'upstream/master' into nicer-filters
clintropolis Jul 11, 2023
2e9dc0b
fix build
clintropolis Jul 11, 2023
cc80896
fix
clintropolis Jul 11, 2023
1bd0412
adjust
clintropolis Jul 11, 2023
71860ff
fix
clintropolis Jul 11, 2023
c74af4a
fix style
clintropolis Jul 11, 2023
c174905
support json equality
clintropolis Jul 11, 2023
a02bff9
some adjustments, still more to do
clintropolis Jul 12, 2023
574b1e2
opt-in to array processing at ingest time for sketchy stuff
clintropolis Jul 12, 2023
3b235ed
add cooler equality index, fix missing string utf8 index supplier
clintropolis Jul 13, 2023
1b13fc7
style
clintropolis Jul 13, 2023
1d01db5
sql tests
clintropolis Jul 13, 2023
e7cb77c
adjustment
clintropolis Jul 13, 2023
fcc8908
Merge remote-tracking branch 'upstream/master' into nicer-filters
clintropolis Jul 13, 2023
7efcd83
remove agg changes in favor of splitting into a separate PR
clintropolis Jul 14, 2023
7a14e89
simplify
clintropolis Jul 14, 2023
8e46dbc
adjustments
clintropolis Jul 14, 2023
aed2016
refactor a bunch of stuff
clintropolis Jul 15, 2023
bd8eecb
revert
clintropolis Jul 15, 2023
5a00b53
remove bloom filter array support for now
clintropolis Jul 16, 2023
c72f98c
fix test
clintropolis Jul 17, 2023
4806c01
remove unused
clintropolis Jul 17, 2023
0e02f31
expression array comparator stuff
clintropolis Jul 17, 2023
6b33e42
style
clintropolis Jul 17, 2023
f8e5284
missed one
clintropolis Jul 17, 2023
65ad8e4
fix it
clintropolis Jul 17, 2023
affcf7a
backwards compat
clintropolis Jul 18, 2023
927fd71
consistent naming
clintropolis Jul 18, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@
import org.apache.druid.collections.bitmap.RoaringBitmapFactory;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.segment.column.BitmapColumnIndex;
import org.apache.druid.segment.column.IndexedUtf8ValueSetIndex;
import org.apache.druid.segment.column.StringValueSetIndex;
import org.apache.druid.segment.data.BitmapSerdeFactory;
import org.apache.druid.segment.data.GenericIndexed;
import org.apache.druid.segment.data.RoaringBitmapSerdeFactory;
import org.apache.druid.segment.index.BitmapColumnIndex;
import org.apache.druid.segment.index.IndexedUtf8ValueSetIndex;
import org.apache.druid.segment.index.StringValueSetIndex;
import org.apache.druid.segment.serde.StringUtf8ColumnIndexSupplier;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.apache.druid.query.aggregation.tdigestsketch.TDigestSketchToQuantilePostAggregator;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.ordering.StringComparators;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.segment.IndexBuilder;
import org.apache.druid.segment.QueryableIndex;
Expand Down Expand Up @@ -444,7 +443,7 @@ public void testEmptyTimeseriesResults()
Druids.newTimeseriesQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
.filters(bound("dim2", "0", "0", false, false, null, StringComparators.NUMERIC))
.filters(equality("dim2", 0L, ColumnType.LONG))
.granularity(Granularities.ALL)
.aggregators(ImmutableList.of(
new TDigestSketchAggregatorFactory("a0:agg", "m1", TDigestSketchAggregatorFactory.DEFAULT_COMPRESSION),
Expand Down Expand Up @@ -476,19 +475,19 @@ public void testGroupByAggregatorDefaultValues()
GroupByQuery.builder()
.setDataSource(CalciteTests.DATASOURCE1)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setDimFilter(selector("dim2", "a", null))
.setDimFilter(equality("dim2", "a", ColumnType.STRING))
.setGranularity(Granularities.ALL)
.setVirtualColumns(expressionVirtualColumn("v0", "'a'", ColumnType.STRING))
.setDimensions(new DefaultDimensionSpec("v0", "d0", ColumnType.STRING))
.setAggregatorSpecs(
aggregators(
new FilteredAggregatorFactory(
new TDigestSketchAggregatorFactory("a0:agg", "m1", TDigestSketchAggregatorFactory.DEFAULT_COMPRESSION),
selector("dim1", "nonexistent", null)
equality("dim1", "nonexistent", ColumnType.STRING)
),
new FilteredAggregatorFactory(
new TDigestSketchAggregatorFactory("a1:agg", "qsketch_m1", 100),
selector("dim1", "nonexistent", null)
equality("dim1", "nonexistent", ColumnType.STRING)
)
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,17 @@

import org.apache.datasketches.hll.HllSketch;
import org.apache.druid.java.util.common.StringEncoding;
import org.apache.druid.math.expr.ExprEval;
import org.apache.druid.math.expr.ExpressionType;
import org.apache.druid.segment.BaseDoubleColumnValueSelector;
import org.apache.druid.segment.BaseFloatColumnValueSelector;
import org.apache.druid.segment.BaseLongColumnValueSelector;
import org.apache.druid.segment.BaseObjectColumnValueSelector;
import org.apache.druid.segment.ColumnProcessorFactory;
import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.NullableTypeStrategy;
import org.apache.druid.segment.data.IndexedInts;

import java.util.function.Consumer;
Expand Down Expand Up @@ -96,6 +100,25 @@ public Consumer<Supplier<HllSketch>> makeLongProcessor(BaseLongColumnValueSelect
};
}

@Override
public Consumer<Supplier<HllSketch>> makeArrayProcessor(
BaseObjectColumnValueSelector<?> selector,
ColumnCapabilities columnCapabilities
)
{
final ExpressionType expressionType = ExpressionType.fromColumnType(columnCapabilities);
final NullableTypeStrategy<Object> strategy = expressionType.getNullableStrategy();
return sketch -> {
final Object o = selector.getObject();
if (o != null) {
byte[] bytes = ExprEval.toBytes(expressionType, strategy, o);
sketch.get().update(bytes);
}
};
}



@Override
public Consumer<Supplier<HllSketch>> makeComplexProcessor(BaseObjectColumnValueSelector<?> selector)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,22 @@

package org.apache.druid.query.aggregation.datasketches.hll.vector;

import org.apache.datasketches.hll.HllSketch;
import org.apache.druid.java.util.common.StringEncoding;
import org.apache.druid.math.expr.ExprEval;
import org.apache.druid.math.expr.ExpressionType;
import org.apache.druid.query.aggregation.datasketches.hll.HllSketchBuildBufferAggregatorHelper;
import org.apache.druid.segment.VectorColumnProcessorFactory;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.NullableTypeStrategy;
import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector;
import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector;
import org.apache.druid.segment.vector.VectorObjectSelector;
import org.apache.druid.segment.vector.VectorValueSelector;

import javax.annotation.Nullable;
import java.nio.ByteBuffer;

public class HllSketchBuildVectorProcessorFactory implements VectorColumnProcessorFactory<HllSketchBuildVectorProcessor>
{
private final HllSketchBuildBufferAggregatorHelper helper;
Expand Down Expand Up @@ -83,6 +90,49 @@ public HllSketchBuildVectorProcessor makeLongProcessor(ColumnCapabilities capabi
return new LongHllSketchBuildVectorProcessor(helper, selector);
}

@Override
public HllSketchBuildVectorProcessor makeArrayProcessor(
ColumnCapabilities capabilities,
VectorObjectSelector selector
)
{
final ExpressionType expressionType = ExpressionType.fromColumnType(capabilities);
final NullableTypeStrategy<Object> typeStrategy = expressionType.getNullableStrategy();
return new HllSketchBuildVectorProcessor()
{
@Override
public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
{
final Object[] vector = selector.getObjectVector();
final HllSketch sketch = helper.getSketchAtPosition(buf, position);

for (int i = startRow; i < endRow; i++) {
if (vector[i] != null) {
byte[] bytes = ExprEval.toBytes(expressionType, typeStrategy, vector[i]);
sketch.update(bytes);
}
}
}

@Override
public void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset)
{
final Object[] vector = selector.getObjectVector();

for (int i = 0; i < numRows; i++) {
final int idx = rows != null ? rows[i] : i;
final int position = positions[i] + positionOffset;
final HllSketch sketch = helper.getSketchAtPosition(buf, position);

if (vector[idx] != null) {
byte[] bytes = ExprEval.toBytes(expressionType, typeStrategy, vector[idx]);
sketch.update(bytes);
}
}
}
};
}

@Override
public HllSketchBuildVectorProcessor makeObjectProcessor(
ColumnCapabilities capabilities,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ public VectorAggregator makeSingleValueDimensionProcessor(
SingleValueDimensionVectorSelector selector
)
{
return new KllSketchNoOpBufferAggregator<KllDoublesSketch>(getEmptySketch());
return new KllSketchNoOpBufferAggregator<>(getEmptySketch());
}

@Override
Expand All @@ -188,7 +188,7 @@ public VectorAggregator makeMultiValueDimensionProcessor(
MultiValueDimensionVectorSelector selector
)
{
return new KllSketchNoOpBufferAggregator<KllDoublesSketch>(getEmptySketch());
return new KllSketchNoOpBufferAggregator<>(getEmptySketch());
}

@Override
Expand All @@ -209,6 +209,12 @@ public VectorAggregator makeLongProcessor(ColumnCapabilities capabilities, Vecto
return new KllDoublesSketchBuildVectorAggregator(selector, getK(), getMaxIntermediateSizeWithNulls());
}

@Override
public VectorAggregator makeArrayProcessor(ColumnCapabilities capabilities, VectorObjectSelector selector)
{
return new KllSketchNoOpBufferAggregator<>(getEmptySketch());
}

@Override
public VectorAggregator makeObjectProcessor(ColumnCapabilities capabilities, VectorObjectSelector selector)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ public VectorAggregator makeSingleValueDimensionProcessor(
SingleValueDimensionVectorSelector selector
)
{
return new KllSketchNoOpBufferAggregator<KllFloatsSketch>(getEmptySketch());
return new KllSketchNoOpBufferAggregator<>(getEmptySketch());
}

@Override
Expand All @@ -188,7 +188,7 @@ public VectorAggregator makeMultiValueDimensionProcessor(
MultiValueDimensionVectorSelector selector
)
{
return new KllSketchNoOpBufferAggregator<KllFloatsSketch>(getEmptySketch());
return new KllSketchNoOpBufferAggregator<>(getEmptySketch());
}

@Override
Expand All @@ -209,6 +209,12 @@ public VectorAggregator makeLongProcessor(ColumnCapabilities capabilities, Vecto
return new KllFloatsSketchBuildVectorAggregator(selector, getK(), getMaxIntermediateSizeWithNulls());
}

@Override
public VectorAggregator makeArrayProcessor(ColumnCapabilities capabilities, VectorObjectSelector selector)
{
return new KllSketchNoOpBufferAggregator<>(getEmptySketch());
}

@Override
public VectorAggregator makeObjectProcessor(ColumnCapabilities capabilities, VectorObjectSelector selector)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,12 @@ public VectorAggregator makeLongProcessor(ColumnCapabilities capabilities, Vecto
return new DoublesSketchBuildVectorAggregator(selector, k, getMaxIntermediateSizeWithNulls());
}

@Override
public VectorAggregator makeArrayProcessor(ColumnCapabilities capabilities, VectorObjectSelector selector)
{
return new NoopDoublesSketchBufferAggregator();
}

@Override
public VectorAggregator makeObjectProcessor(ColumnCapabilities capabilities, VectorObjectSelector selector)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,12 @@ public Supplier<Object[]> makeLongProcessor(ColumnCapabilities capabilities, Vec
};
}

@Override
public Supplier<Object[]> makeArrayProcessor(ColumnCapabilities capabilities, VectorObjectSelector selector)
{
return selector::getObjectVector;
}

@Override
public Supplier<Object[]> makeObjectProcessor(ColumnCapabilities capabilities, VectorObjectSelector selector)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,10 @@ public class HllSketchSqlAggregatorTest extends BaseCalciteQueryTest
private static final List<AggregatorFactory> EXPECTED_FILTERED_AGGREGATORS =
EXPECTED_PA_AGGREGATORS.stream()
.limit(5)
.map(factory -> new FilteredAggregatorFactory(factory, selector("dim2", "a", null)))
.map(factory -> new FilteredAggregatorFactory(
factory,
equality("dim2", "a", ColumnType.STRING)
))
.collect(Collectors.toList());

/**
Expand Down Expand Up @@ -344,7 +347,7 @@ public void testApproxCountDistinctHllSketch()
new HllSketchBuildAggregatorFactory("a1", "dim2", null, null, null, null, ROUND),
new FilteredAggregatorFactory(
new HllSketchBuildAggregatorFactory("a2", "dim2", null, null, null, null, ROUND),
BaseCalciteQueryTest.not(BaseCalciteQueryTest.selector("dim2", "", null))
not(equality("dim2", "", ColumnType.STRING))
),
new HllSketchBuildAggregatorFactory("a3", "v0", null, null, null, null, ROUND),
new HllSketchBuildAggregatorFactory("a4", "v1", null, null, null, null, ROUND),
Expand Down Expand Up @@ -436,7 +439,7 @@ public void testAvgDailyCountDistinctHllSketch()
new LongSumAggregatorFactory("_a0:sum", "a0"),
new FilteredAggregatorFactory(
new CountAggregatorFactory("_a0:count"),
BaseCalciteQueryTest.not(BaseCalciteQueryTest.selector("a0", null, null))
notNull("a0")
)
)
)
Expand Down Expand Up @@ -480,7 +483,7 @@ public void testApproxCountDistinctHllSketchIsRounded()
new HllSketchBuildAggregatorFactory("a0", "m1", null, null, null, true, true)
)
)
.setHavingSpec(having(selector("a0", "2", null)))
.setHavingSpec(having(equality("a0", 2L, ColumnType.LONG)))
.setContext(QUERY_CONTEXT_DEFAULT)
.build()
),
Expand Down Expand Up @@ -852,7 +855,11 @@ public void testEmptyTimeseriesResults()
ImmutableList.of(Druids.newTimeseriesQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Filtration.eternity()))
.filters(bound("dim2", "0", "0", false, false, null, StringComparators.NUMERIC))
.filters(
NullHandling.replaceWithDefault()
? bound("dim2", "0", "0", false, false, null, StringComparators.NUMERIC)
: equality("dim2", 0L, ColumnType.LONG)
)
.granularity(Granularities.ALL)
.aggregators(
aggregators(
Expand Down Expand Up @@ -895,7 +902,7 @@ public void testGroupByAggregatorDefaultValues()
GroupByQuery.builder()
.setDataSource(CalciteTests.DATASOURCE1)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setDimFilter(selector("dim2", "a", null))
.setDimFilter(equality("dim2", "a", ColumnType.STRING))
.setGranularity(Granularities.ALL)
.setVirtualColumns(expressionVirtualColumn("v0", "'a'", ColumnType.STRING))
.setDimensions(new DefaultDimensionSpec("v0", "d0", ColumnType.STRING))
Expand All @@ -911,7 +918,7 @@ public void testGroupByAggregatorDefaultValues()
null,
true
),
selector("dim1", "nonexistent", null)
equality("dim1", "nonexistent", ColumnType.STRING)
),
new FilteredAggregatorFactory(
new HllSketchBuildAggregatorFactory(
Expand All @@ -923,7 +930,7 @@ public void testGroupByAggregatorDefaultValues()
false,
true
),
selector("dim1", "nonexistent", null)
equality("dim1", "nonexistent", ColumnType.STRING)
)
)
)
Expand Down Expand Up @@ -954,19 +961,19 @@ public void testGroupByAggregatorDefaultValuesFinalizeOuterSketches()
GroupByQuery.builder()
.setDataSource(CalciteTests.DATASOURCE1)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setDimFilter(selector("dim2", "a", null))
.setDimFilter(equality("dim2", "a", ColumnType.STRING))
.setGranularity(Granularities.ALL)
.setVirtualColumns(expressionVirtualColumn("v0", "'a'", ColumnType.STRING))
.setDimensions(new DefaultDimensionSpec("v0", "d0", ColumnType.STRING))
.setAggregatorSpecs(
aggregators(
new FilteredAggregatorFactory(
new HllSketchBuildAggregatorFactory("a0", "v0", null, null, null, null, true),
selector("dim1", "nonexistent", null)
equality("dim1", "nonexistent", ColumnType.STRING)
),
new FilteredAggregatorFactory(
new HllSketchBuildAggregatorFactory("a1", "v0", null, null, null, null, true),
selector("dim1", "nonexistent", null)
equality("dim1", "nonexistent", ColumnType.STRING)
)
)
)
Expand Down
Loading