diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregator.java index 8e3bfffa0737..0458b5084c43 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregator.java @@ -75,7 +75,7 @@ public Object get(final ByteBuffer buf, final int position) @Override public void close() { - helper.close(); + helper.clear(); } @Override diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregatorHelper.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregatorHelper.java index 226530197725..1fa9ee4c9a3f 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregatorHelper.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregatorHelper.java @@ -142,7 +142,7 @@ private void initializeEmptyUnion(ByteBuffer buf, int position) } } - public void close() + public void clear() { unions.clear(); memCache.clear(); diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeVectorAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeVectorAggregator.java index 5fec9b94ba2d..31ad26cb5d7d 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeVectorAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeVectorAggregator.java @@ -102,7 +102,7 @@ public Object get(final ByteBuffer buf, final int position) @Override public void close() { - helper.close(); + helper.clear(); } @Override diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchBufferAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchBufferAggregator.java index 34aae3f36e18..60d83f4e0a71 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchBufferAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchBufferAggregator.java @@ -85,7 +85,7 @@ public double getDouble(ByteBuffer buf, int position) @Override public void close() { - helper.close(); + helper.clear(); } @Override diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchBufferAggregatorHelper.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchBufferAggregatorHelper.java index 49856c9e80df..e2f699012a19 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchBufferAggregatorHelper.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchBufferAggregatorHelper.java @@ -95,7 +95,7 @@ public void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, Byt /** * Returns a {@link Union} associated with a particular buffer location. * - * The Union object will be cached in this helper until {@link #close()} is called. + * The Union object will be cached in this helper until {@link #clear()} is called. */ public Union getOrCreateUnion(ByteBuffer buf, int position) { @@ -122,7 +122,7 @@ private Union createNewUnion(ByteBuffer buf, int position, boolean isWrapped) return union; } - public void close() + public void clear() { unions.clear(); memCache.clear(); diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchVectorAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchVectorAggregator.java index a862265d561c..7d10bc30fb5e 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchVectorAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchVectorAggregator.java @@ -107,6 +107,6 @@ public void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, Byt @Override public void close() { - helper.close(); + helper.clear(); } } diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchBuildAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchBuildAggregator.java index 7ca1061889de..b093e730f0b3 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchBuildAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchBuildAggregator.java @@ -27,7 +27,6 @@ import org.apache.druid.segment.data.IndexedInts; import javax.annotation.Nullable; - import java.nio.ByteBuffer; import java.util.LinkedHashMap; import java.util.List; @@ -48,6 +47,7 @@ public class ArrayOfDoublesSketchBuildAggregator implements Aggregator @Nullable private ArrayOfDoublesUpdatableSketch sketch; + private final int nominalEntries; private final boolean canLookupUtf8; private final boolean canCacheById; private final LinkedHashMap stringCache = new LinkedHashMap() @@ -67,10 +67,7 @@ public ArrayOfDoublesSketchBuildAggregator( { this.keySelector = keySelector; this.valueSelectors = valueSelectors.toArray(new BaseDoubleColumnValueSelector[0]); - values = new double[valueSelectors.size()]; - sketch = new ArrayOfDoublesUpdatableSketchBuilder().setNominalEntries(nominalEntries) - .setNumberOfValues(valueSelectors.size()).build(); - + this.nominalEntries = nominalEntries; this.canCacheById = this.keySelector.nameLookupPossibleInAdvance(); this.canLookupUtf8 = this.keySelector.supportsLookupNameUtf8(); } @@ -83,6 +80,15 @@ public ArrayOfDoublesSketchBuildAggregator( @Override public void aggregate() { + if (values == null) { + values = new double[valueSelectors.length]; + } + + if (sketch == null) { + sketch = new ArrayOfDoublesUpdatableSketchBuilder().setNominalEntries(nominalEntries) + .setNumberOfValues(valueSelectors.length).build(); + } + final IndexedInts keys = keySelector.getRow(); for (int i = 0; i < valueSelectors.length; i++) { if (valueSelectors[i].isNull()) { diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchBuildBufferAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchBuildBufferAggregator.java index 18906d129360..b925220c89fd 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchBuildBufferAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchBuildBufferAggregator.java @@ -73,8 +73,6 @@ public ArrayOfDoublesSketchBuildBufferAggregator( this.valueSelectors = valueSelectors.toArray(new BaseDoubleColumnValueSelector[0]); this.nominalEntries = nominalEntries; this.maxIntermediateSize = maxIntermediateSize; - values = new double[valueSelectors.size()]; - this.canCacheById = this.keySelector.nameLookupPossibleInAdvance(); this.canLookupUtf8 = this.keySelector.supportsLookupNameUtf8(); } @@ -92,6 +90,10 @@ public void init(final ByteBuffer buf, final int position) @Override public void aggregate(final ByteBuffer buf, final int position) { + if (values == null) { + values = new double[valueSelectors.length]; + } + for (int i = 0; i < valueSelectors.length; i++) { if (valueSelectors[i].isNull()) { return; diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorAdapters.java b/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorAdapters.java index 8ae7a33b08d6..25c9102bcf72 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorAdapters.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorAdapters.java @@ -26,7 +26,6 @@ import org.apache.druid.segment.vector.VectorColumnSelectorFactory; import javax.annotation.Nullable; -import java.io.Closeable; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.List; @@ -42,7 +41,7 @@ * (2) Query engines are freed from the need to manage how much space each individual aggregator needs. They only * need to allocate a block of size "spaceNeeded". */ -public class AggregatorAdapters implements Closeable +public class AggregatorAdapters { private static final Logger log = new Logger(AggregatorAdapters.class); @@ -230,14 +229,14 @@ public void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, Byt } /** - * Close all of our aggregators. + * Reset all of our aggregators, releasing resources held by them. After this, this instance may be reused or + * it may be discarded. */ - @Override - public void close() + public void reset() { for (Adapter adapter : adapters) { try { - adapter.close(); + adapter.reset(); } catch (Exception e) { log.warn(e, "Could not close aggregator [%s], skipping.", adapter.getFactory().getName()); @@ -250,7 +249,7 @@ public void close() * BufferAggregator and VectorAggregator. Private, since it doesn't escape this class and the * only two implementations are private static classes below. */ - private interface Adapter extends Closeable + private interface Adapter { void init(ByteBuffer buf, int position); @@ -259,8 +258,7 @@ private interface Adapter extends Closeable void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer); - @Override - void close(); + void reset(); AggregatorFactory getFactory(); @@ -293,7 +291,7 @@ public Object get(final ByteBuffer buf, final int position) } @Override - public void close() + public void reset() { aggregator.close(); } @@ -352,7 +350,7 @@ public Object get(final ByteBuffer buf, final int position) } @Override - public void close() + public void reset() { aggregator.close(); } diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/BufferAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/BufferAggregator.java index e9fdbeaa061b..20d13491b0f6 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/BufferAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/BufferAggregator.java @@ -158,7 +158,11 @@ default double getDouble(ByteBuffer buf, int position) } /** - * Release any resources used by the aggregator + * Release any resources used by the aggregator. The aggregator may be reused after this call, by calling + * {@link #init(ByteBuffer, int)} followed by other methods as normal. + * + * This call would be more properly named "reset", but we use the name "close" to improve compatibility with + * existing aggregator implementations in extensions. */ void close(); diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/VectorAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/VectorAggregator.java index befff12ba6e0..a3e506e59c87 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/VectorAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/VectorAggregator.java @@ -83,7 +83,11 @@ default void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, By } /** - * Release any resources used by the aggregator. + * Release any resources used by the aggregator. The aggregator may be reused after this call, by calling + * {@link #init(ByteBuffer, int)} followed by other methods as normal. + * + * This call would be more properly named "reset", but we use the name "close" to improve compatibility with + * existing aggregator implementations in extensions. */ void close(); } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/AbstractBufferHashGrouper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/AbstractBufferHashGrouper.java index f3bc195dcbdf..70cf5832cf33 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/AbstractBufferHashGrouper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/AbstractBufferHashGrouper.java @@ -170,7 +170,7 @@ public AggregateResult aggregate(KeyType key, int keyHash) public void close() { keySerde.reset(); - aggregators.close(); + aggregators.reset(); } /** diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferArrayGrouper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferArrayGrouper.java index 0fcb4ddeb2e0..616ac190dd83 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferArrayGrouper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferArrayGrouper.java @@ -269,6 +269,7 @@ public void reset() { // Clear the entire usedFlagBuffer usedFlagMemory.clear(); + aggregators.reset(); } @Override @@ -280,7 +281,7 @@ public IntGrouperHashFunction hashFunction() @Override public void close() { - aggregators.close(); + aggregators.reset(); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferHashGrouper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferHashGrouper.java index 167b322b9d45..c4d046977168 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferHashGrouper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferHashGrouper.java @@ -158,6 +158,7 @@ public void reset() offsetList.reset(); hashTable.reset(); keySerde.reset(); + aggregators.reset(); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/HashVectorGrouper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/HashVectorGrouper.java index ad12d0503f8f..e5c2801c3b74 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/HashVectorGrouper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/HashVectorGrouper.java @@ -205,6 +205,7 @@ public void reset() } this.hashTable = createTable(buffer, tableStart, numBuckets); + this.aggregators.reset(); } @Override @@ -256,7 +257,7 @@ public void close() @Override public void close() { - aggregators.close(); + aggregators.reset(); } @VisibleForTesting diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/LimitedBufferHashGrouper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/LimitedBufferHashGrouper.java index 756a8227f5e9..90a0e1e250d6 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/LimitedBufferHashGrouper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/LimitedBufferHashGrouper.java @@ -185,6 +185,7 @@ public void reset() hashTable.reset(); keySerde.reset(); offsetHeap.reset(); + aggregators.reset(); heapIndexUpdater.setHashTableBuffer(hashTable.getTableBuffer()); hasIterated = false; offsetHeapIterableSize = 0; diff --git a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryEngine.java b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryEngine.java index 7ae290dd7d48..c5e83b84e87c 100644 --- a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryEngine.java +++ b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryEngine.java @@ -164,9 +164,9 @@ private Sequence> processVectorized( } final VectorColumnSelectorFactory columnSelectorFactory = cursor.getColumnSelectorFactory(); - final AggregatorAdapters aggregators = closer.register( - AggregatorAdapters.factorizeVector(columnSelectorFactory, query.getAggregatorSpecs()) - ); + final AggregatorAdapters aggregators = + AggregatorAdapters.factorizeVector(columnSelectorFactory, query.getAggregatorSpecs()); + closer.register(aggregators::reset); final ResourceHolder bufferHolder = closer.register(bufferPool.take()); diff --git a/processing/src/main/java/org/apache/druid/query/topn/BaseTopNAlgorithm.java b/processing/src/main/java/org/apache/druid/query/topn/BaseTopNAlgorithm.java index 843d248221e7..f34464a49d0e 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/BaseTopNAlgorithm.java +++ b/processing/src/main/java/org/apache/druid/query/topn/BaseTopNAlgorithm.java @@ -120,7 +120,7 @@ private void runWithCardinalityKnown( updateResults(params, theDimValSelector, aggregatesStore, resultBuilder); - closeAggregators(aggregatesStore); + resetAggregators(aggregatesStore); numProcessed += numToProcess; params.getCursor().reset(); @@ -151,7 +151,7 @@ private void runWithCardinalityUnknown( } long processedRows = scanAndAggregate(params, null, aggregatesStore); updateResults(params, null, aggregatesStore, resultBuilder); - closeAggregators(aggregatesStore); + resetAggregators(aggregatesStore); params.getCursor().reset(); if (queryMetrics != null) { queryMetrics.addProcessedRows(processedRows); @@ -199,7 +199,7 @@ protected abstract void updateResults( TopNResultBuilder resultBuilder ); - protected abstract void closeAggregators( + protected abstract void resetAggregators( DimValAggregateStore dimValAggregateStore ); diff --git a/processing/src/main/java/org/apache/druid/query/topn/HeapBasedTopNAlgorithm.java b/processing/src/main/java/org/apache/druid/query/topn/HeapBasedTopNAlgorithm.java index 14f3b729e1e5..ba5fbf251084 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/HeapBasedTopNAlgorithm.java +++ b/processing/src/main/java/org/apache/druid/query/topn/HeapBasedTopNAlgorithm.java @@ -112,7 +112,7 @@ protected void updateResults( } @Override - protected void closeAggregators(TopNColumnAggregatesProcessor processor) + protected void resetAggregators(TopNColumnAggregatesProcessor processor) { processor.closeAggregators(); } diff --git a/processing/src/main/java/org/apache/druid/query/topn/PooledTopNAlgorithm.java b/processing/src/main/java/org/apache/druid/query/topn/PooledTopNAlgorithm.java index 6ddda5eb1be8..d0c0fb064e03 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/PooledTopNAlgorithm.java +++ b/processing/src/main/java/org/apache/druid/query/topn/PooledTopNAlgorithm.java @@ -768,7 +768,7 @@ protected void updateResults( } @Override - protected void closeAggregators(BufferAggregator[] bufferAggregators) + protected void resetAggregators(BufferAggregator[] bufferAggregators) { for (BufferAggregator agg : bufferAggregators) { agg.close(); diff --git a/processing/src/main/java/org/apache/druid/query/topn/TimeExtractionTopNAlgorithm.java b/processing/src/main/java/org/apache/druid/query/topn/TimeExtractionTopNAlgorithm.java index 70e01e49aed0..3b60bb65ee17 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/TimeExtractionTopNAlgorithm.java +++ b/processing/src/main/java/org/apache/druid/query/topn/TimeExtractionTopNAlgorithm.java @@ -135,7 +135,7 @@ protected void updateResults( } @Override - protected void closeAggregators(Map stringMap) + protected void resetAggregators(Map stringMap) { for (Aggregator[] aggregators : stringMap.values()) { for (Aggregator agg : aggregators) { diff --git a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/HashVectorGrouperTest.java b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/HashVectorGrouperTest.java index d5a863a7542a..fd0314a607c4 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/HashVectorGrouperTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/HashVectorGrouperTest.java @@ -45,7 +45,7 @@ public void testCloseAggregatorAdaptorsShouldBeClosed() ); grouper.initVectorized(512); grouper.close(); - Mockito.verify(aggregatorAdapters, Mockito.times(1)).close(); + Mockito.verify(aggregatorAdapters, Mockito.times(2)).reset(); } @Test