-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Integer Tuple Sketch support #10427
Integer Tuple Sketch support #10427
Conversation
This adds support for `BYTES` columns containing Tuple Sketches with Integer as the summary type. The added classes currently support `Sum` as the semigroup, but are generic so others can be added. Feature breakdown: 1. Add transform functions that can be used to create Integer Tuple Sketches during ingestion, eg. `toIntegerSumTupleSketch(colA, colbB, 16)` 2. Add Codecs that use the Datasketches serialization 3. Add aggregation functions: * `DISTINCT_COUNT_TUPLE_SKETCH` will just get the estimate for the number of unique keys, same as Theta or HLL * `DISTINCT_COUNT_RAW_INTEGER_SUM_TUPLE_SKETCH` will merge the sketches using `Sum` as the semigroup and return the raw sketch * `SUM_VALUES_INTEGER_SUM_TUPLE_SKETCH` will merge the sketches using `Sum` as the semigroup and estimate the sum of the value side * `AVG_VALUES_INTEGER_SUM_TUPLE_SKETCH` will merge the sketches using `Sum` as the semigroup and estimate the average of the value side 4. Add `ValueAggregator<_, _>`s for use in `StarTree` indexes for all 4 above aggregations 5. Add `ValueAggregator`s for use in rollups for all 4 above aggregations
I could do with some advice on the best place to add tests for the aggregation functions, I've been looking through the existing tests and can't find anywhere suitable |
Codecov Report
@@ Coverage Diff @@
## master #10427 +/- ##
=============================================
- Coverage 70.30% 13.64% -56.66%
+ Complexity 6494 439 -6055
=============================================
Files 2158 2110 -48
Lines 116070 113782 -2288
Branches 17566 17294 -272
=============================================
- Hits 81608 15531 -66077
- Misses 28778 96979 +68201
+ Partials 5684 1272 -4412
Flags with carried forward coverage won't be shown. Click here to find out more.
... and 1699 files with indirect coverage changes 📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
pinot-core/src/main/java/org/apache/pinot/core/function/scalar/SketchFunctions.java
Show resolved
Hide resolved
|
||
@Override | ||
public byte[] serializeAggregatedValue(Sketch<IntegerSummary> value) { | ||
return CustomSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.serialize(value); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious to know if there's a reason why we have 2 ser/deser utilities (CustomSerDeUtils, ObjectSerDeUtils) ? @Jackie-Jiang
...nt-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorFactory.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good @andimiller!
@@ -213,6 +216,8 @@ public static ObjectType getObjectType(Object value) { | |||
return ObjectType.VarianceTuple; | |||
} else if (value instanceof PinotFourthMoment) { | |||
return ObjectType.PinotFourthMoment; | |||
} else if (value instanceof org.apache.datasketches.tuple.Sketch) { | |||
return ObjectType.IntegerTupleSketch; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a safe assumption? Is it also necessary to inspect the summary type to verify integer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right now it is, but to add other types of tuple Sketch we'd need to add wrapper types, due to JVM type erasure
pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
Show resolved
Hide resolved
pinot-core/src/main/java/org/apache/pinot/core/function/scalar/SketchFunctions.java
Show resolved
Hide resolved
...c/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
Outdated
Show resolved
Hide resolved
} | ||
double estimate = retainedTotal / union.getResult().getRetainedEntries() * union.getResult().getEstimate(); | ||
return Double.valueOf(estimate).longValue(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the serde always deserialise bytes to a compact sketch? It could be better to use the base Sketch
abstraction for cases where the sketches have been created outside the system and not compacted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can give that a go, I swapped it to all compact because I was having issues with the non-threadsafe nature of Sketch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had the same question as well :).
...he/pinot/core/query/aggregation/function/SumValuesIntegerTupleSketchAggregationFunction.java
Outdated
Show resolved
Hide resolved
...g/apache/pinot/core/query/aggregation/function/AvgIntegerTupleSketchAggregationFunction.java
Outdated
Show resolved
Hide resolved
...g/apache/pinot/core/query/aggregation/function/AvgIntegerTupleSketchAggregationFunction.java
Outdated
Show resolved
Hide resolved
.../org/apache/pinot/core/query/aggregation/function/IntegerTupleSketchAggregationFunction.java
Outdated
Show resolved
Hide resolved
...nt-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorFactory.java
Show resolved
Hide resolved
@@ -96,6 +96,9 @@ public static class Helix { | |||
// https://datasketches.apache.org/docs/Theta/ThetaErrorTable.html | |||
public static final int DEFAULT_THETA_SKETCH_NOMINAL_ENTRIES = 65536; | |||
|
|||
|
|||
public static final int DEFAULT_TUPLE_SKETCH_LGK = 16; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any references that can help explain this value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll add a comment, it's the same as the theta one above, but log 2
is.update((String) key, value); | ||
} else if (key instanceof byte[]) { | ||
is.update((byte[]) key, value); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In case you want to validate/catch invalid types, consider throwing an IllegalStateException/illegalArg exception ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done, added it for theta too and expanded the tests to cover
import org.apache.pinot.spi.data.FieldSpec.DataType; | ||
|
||
|
||
public class IntegerTupleSketchValueAggregator implements ValueAggregator<byte[], Sketch<IntegerSummary>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can the raw type (R) be Sketch, instead of byte[] here ? Looking at the other sketch implementation (DistinctCountThetaSketchValueAggregator), which has Object as the raw type, I just wanted to check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it can be but Sketch
isn't thread-safe, and I swapped this to byte[]
while hunting down some thread safety issues, I will see if I can swap it back now that I've made all the Union
use thread safe
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it may need to be Object, this was a good catch, doing more local testing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
have tested it more locally, it is fine being byte[]
because we only handle aggregated sketches
@@ -299,13 +299,21 @@ public static AggregationFunction getAggregationFunction(FunctionContext functio | |||
return new FourthMomentAggregationFunction(firstArgument, FourthMomentAggregationFunction.Type.KURTOSIS); | |||
case FOURTHMOMENT: | |||
return new FourthMomentAggregationFunction(firstArgument, FourthMomentAggregationFunction.Type.MOMENT); | |||
case DISTINCTCOUNTTUPLESKETCH: | |||
// mode actually doesn't matter here because we only care about keys, not values | |||
return new DistinctCountIntegerTupleSketchAggregationFunction(arguments, IntegerSummary.Mode.Sum); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a reason why we pass IntegerSummary.Mode.Sum as a parameter ? We are already differentiating based on the aggregation implementations IntegerTupleSketchAggregationFunction vs AvgIntegerTupleSketchAggregationFunction vs SumValuesIntegerTupleSketchAggregationFunction
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that is the mode for IntegerSummary
merging, all of these use Sum
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, so there can be functions that can use other summary modes (min, max..) in the future.
.../org/apache/pinot/core/query/aggregation/function/IntegerTupleSketchAggregationFunction.java
Outdated
Show resolved
Hide resolved
import org.apache.pinot.segment.spi.AggregationFunctionType; | ||
|
||
|
||
public class SumValuesIntegerTupleSketchAggregationFunction extends IntegerTupleSketchAggregationFunction { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would composition + delegation make the APIs for Sum, Avg, distinct clearer than inheritance ? That way we know when/how IntegerTupleSketchAggregationFunction is exactly used and it'll decouple the Integer API from the rest.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've followed the way it was implemented for Theta, using the simplest one as the base and inheriting it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes makes sense to keep them consistent.
...che/pinot/core/query/aggregation/function/AvgValueIntegerTupleSketchAggregationFunction.java
Show resolved
Hide resolved
.../org/apache/pinot/core/query/aggregation/function/IntegerTupleSketchAggregationFunction.java
Show resolved
Hide resolved
} | ||
ArrayList<CompactSketch<IntegerSummary>> merged = | ||
new ArrayList<>(intermediateResult1.size() + intermediateResult2.size()); | ||
merged.addAll(intermediateResult1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious - We dont want to do a union here for the merge? Im looking at DistinctCountThetaSketchAggregationFunction for reference.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is an optimisation similar to the one used in the Theta version, where merges can be quite expensive, and it's better to delay the merge til we have a lot of sketches to combine, hence using List
as the intermediate type
} | ||
} | ||
} catch (Exception e) { | ||
throw new RuntimeException("Caught exception while merging Tuple Sketches", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess this is groupBy and not merging tuple sketches ?
byte[] value = valueArray[i]; | ||
CompactSketch<IntegerSummary> newSketch = | ||
ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.deserialize(value).compact(); | ||
for (int groupKey : groupKeysArray[i]) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks exactly the same as aggregateGroupBySV except that we iterate over group keys as it can be multivalued ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yup
import org.apache.pinot.segment.spi.AggregationFunctionType; | ||
|
||
|
||
public class SumValuesIntegerTupleSketchAggregationFunction extends IntegerTupleSketchAggregationFunction { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes makes sense to keep them consistent.
} | ||
double estimate = retainedTotal / union.getResult().getRetainedEntries() * union.getResult().getEstimate(); | ||
return Double.valueOf(estimate).longValue(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had the same question as well :).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Can you please rebase and resolve the conflict, and also respond to the pending comments?
Thanks for taking care of comments ! |
This adds support for
BYTES
columns containing Tuple Sketches with Integer as the summary type.The added classes currently support
Sum
as the semigroup, but are generic so others can be added.Feature breakdown:
toIntegerSumTupleSketch(colA, colbB, 16)
DISTINCT_COUNT_TUPLE_SKETCH
will just get the estimate for the number of unique keys, same as Theta or HLLDISTINCT_COUNT_RAW_INTEGER_SUM_TUPLE_SKETCH
will merge the sketches usingSum
as the semigroup and return the raw sketchSUM_VALUES_INTEGER_SUM_TUPLE_SKETCH
will merge the sketches usingSum
as the semigroup and estimate the sum of the value sideAVG_VALUES_INTEGER_SUM_TUPLE_SKETCH
will merge the sketches usingSum
as the semigroup and estimate the average of the value sideValueAggregator<_, _>
s for use inStarTree
indexes for all 4 above aggregationsValueAggregator
s for use in rollups for all 4 above aggregations