From d72d65e8e512a236183252a8cbb8706d06fd07bc Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Thu, 28 Apr 2016 07:53:45 -0700 Subject: [PATCH] Sketch cache key should include size, isInputThetaSketch. --- .../theta/SketchAggregatorFactory.java | 7 +++- .../theta/SketchMergeAggregatorFactory.java | 20 ++++++++--- .../theta/SketchAggregationTest.java | 33 +++++++++++++++++++ 3 files changed, 55 insertions(+), 5 deletions(-) diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java index 1e1047d9c7dd..457d5f6b12b5 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import com.google.common.primitives.Doubles; +import com.google.common.primitives.Ints; import com.metamx.common.IAE; import com.yahoo.sketches.Family; import com.yahoo.sketches.Util; @@ -169,7 +170,11 @@ public List requiredFields() public byte[] getCacheKey() { byte[] fieldNameBytes = fieldName.getBytes(); - return ByteBuffer.allocate(1 + fieldNameBytes.length).put(cacheId).put(fieldNameBytes).array(); + return ByteBuffer.allocate(1 + Ints.BYTES + fieldNameBytes.length) + .put(cacheId) + .putInt(size) + .put(fieldNameBytes) + .array(); } @Override diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchMergeAggregatorFactory.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchMergeAggregatorFactory.java index 68383dd629ff..401c7da134e1 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchMergeAggregatorFactory.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchMergeAggregatorFactory.java @@ -25,6 +25,7 @@ import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactoryNotMergeableException; +import java.nio.ByteBuffer; import java.util.Collections; import java.util.List; @@ -104,7 +105,7 @@ public boolean getIsInputThetaSketch() { return isInputThetaSketch; } - + @JsonProperty public Integer getErrorBoundsStdDev() { @@ -126,9 +127,9 @@ public Object finalizeComputation(Object object) Sketch sketch = (Sketch) object; if (errorBoundsStdDev != null) { SketchEstimateWithErrorBounds result = new SketchEstimateWithErrorBounds( - sketch.getEstimate(), - sketch.getUpperBound(errorBoundsStdDev), - sketch.getLowerBound(errorBoundsStdDev), + sketch.getEstimate(), + sketch.getUpperBound(errorBoundsStdDev), + sketch.getLowerBound(errorBoundsStdDev), errorBoundsStdDev); return result; } else { @@ -149,6 +150,17 @@ public String getTypeName() } } + @Override + public byte[] getCacheKey() + { + final byte[] superCacheKey = super.getCacheKey(); + + return ByteBuffer.allocate(superCacheKey.length + 1) + .put(superCacheKey) + .put(isInputThetaSketch ? (byte) 1 : (byte) 0) + .array(); + } + @Override public boolean equals(Object o) { diff --git a/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/SketchAggregationTest.java b/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/SketchAggregationTest.java index 2266e8f227a0..ad1c79be6b14 100644 --- a/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/SketchAggregationTest.java +++ b/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/SketchAggregationTest.java @@ -46,6 +46,7 @@ import java.io.File; import java.io.IOException; import java.nio.charset.Charset; +import java.util.Arrays; import java.util.List; /** @@ -384,6 +385,38 @@ public void testSketchSetPostAggregatorSerde() throws Exception ); } + @Test + public void testCacheKey() + { + final SketchMergeAggregatorFactory factory1 = new SketchMergeAggregatorFactory( + "name", + "fieldName", + 16, + null, + null, + null + ); + final SketchMergeAggregatorFactory factory2 = new SketchMergeAggregatorFactory( + "name", + "fieldName", + 16, + null, + null, + null + ); + final SketchMergeAggregatorFactory factory3 = new SketchMergeAggregatorFactory( + "name", + "fieldName", + 32, + null, + null, + null + ); + + Assert.assertTrue(Arrays.equals(factory1.getCacheKey(), factory2.getCacheKey())); + Assert.assertFalse(Arrays.equals(factory1.getCacheKey(), factory3.getCacheKey())); + } + private void assertPostAggregatorSerde(PostAggregator agg) throws Exception { Assert.assertEquals(