Skip to content

Commit

Permalink
Reset buffer aggregators when resetting Groupers. (apache#16296)
Browse files Browse the repository at this point in the history
Buffer aggregators can contain some cached objects within them, such as
Memory references or HLL Unions. Prior to this patch, various Grouper
implementations were not releasing this state when resetting their own
internal state, which could lead to excessive memory use.

This patch renames AggregatorAdapater#close to "reset", and updates
Grouper implementations to call this reset method whenever they reset
their internal state.

The base method on BufferAggregator and VectorAggregator remains named
"close", for compatibility with existing extensions, but the contract
is adjusted to say that the aggregator may be reused after the method
is called. All existing implementations in core already adhere to this
new contract, except for the ArrayOfDoubles build flavors, which are
updated in this patch to adhere.

Additionally, this patch harmonizes buffer sketch helpers to call their
clear method "clear" rather than a mix of "clear" and "close". (Others
were already using "clear".)
  • Loading branch information
gianm authored Apr 24, 2024
1 parent 1dabb02 commit 274ccbf
Show file tree
Hide file tree
Showing 22 changed files with 58 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public Object get(final ByteBuffer buf, final int position)
@Override
public void close()
{
helper.close();
helper.clear();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ private void initializeEmptyUnion(ByteBuffer buf, int position)
}
}

public void close()
public void clear()
{
unions.clear();
memCache.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public Object get(final ByteBuffer buf, final int position)
@Override
public void close()
{
helper.close();
helper.clear();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public double getDouble(ByteBuffer buf, int position)
@Override
public void close()
{
helper.close();
helper.clear();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, Byt
/**
* Returns a {@link Union} associated with a particular buffer location.
*
* The Union object will be cached in this helper until {@link #close()} is called.
* The Union object will be cached in this helper until {@link #clear()} is called.
*/
public Union getOrCreateUnion(ByteBuffer buf, int position)
{
Expand All @@ -122,7 +122,7 @@ private Union createNewUnion(ByteBuffer buf, int position, boolean isWrapped)
return union;
}

public void close()
public void clear()
{
unions.clear();
memCache.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,6 @@ public void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, Byt
@Override
public void close()
{
helper.close();
helper.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.druid.segment.data.IndexedInts;

import javax.annotation.Nullable;

import java.nio.ByteBuffer;
import java.util.LinkedHashMap;
import java.util.List;
Expand All @@ -48,6 +47,7 @@ public class ArrayOfDoublesSketchBuildAggregator implements Aggregator
@Nullable
private ArrayOfDoublesUpdatableSketch sketch;

private final int nominalEntries;
private final boolean canLookupUtf8;
private final boolean canCacheById;
private final LinkedHashMap<Integer, Object> stringCache = new LinkedHashMap<Integer, Object>()
Expand All @@ -67,10 +67,7 @@ public ArrayOfDoublesSketchBuildAggregator(
{
this.keySelector = keySelector;
this.valueSelectors = valueSelectors.toArray(new BaseDoubleColumnValueSelector[0]);
values = new double[valueSelectors.size()];
sketch = new ArrayOfDoublesUpdatableSketchBuilder().setNominalEntries(nominalEntries)
.setNumberOfValues(valueSelectors.size()).build();

this.nominalEntries = nominalEntries;
this.canCacheById = this.keySelector.nameLookupPossibleInAdvance();
this.canLookupUtf8 = this.keySelector.supportsLookupNameUtf8();
}
Expand All @@ -83,6 +80,15 @@ public ArrayOfDoublesSketchBuildAggregator(
@Override
public void aggregate()
{
if (values == null) {
values = new double[valueSelectors.length];
}

if (sketch == null) {
sketch = new ArrayOfDoublesUpdatableSketchBuilder().setNominalEntries(nominalEntries)
.setNumberOfValues(valueSelectors.length).build();
}

final IndexedInts keys = keySelector.getRow();
for (int i = 0; i < valueSelectors.length; i++) {
if (valueSelectors[i].isNull()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,6 @@ public ArrayOfDoublesSketchBuildBufferAggregator(
this.valueSelectors = valueSelectors.toArray(new BaseDoubleColumnValueSelector[0]);
this.nominalEntries = nominalEntries;
this.maxIntermediateSize = maxIntermediateSize;
values = new double[valueSelectors.size()];

this.canCacheById = this.keySelector.nameLookupPossibleInAdvance();
this.canLookupUtf8 = this.keySelector.supportsLookupNameUtf8();
}
Expand All @@ -92,6 +90,10 @@ public void init(final ByteBuffer buf, final int position)
@Override
public void aggregate(final ByteBuffer buf, final int position)
{
if (values == null) {
values = new double[valueSelectors.length];
}

for (int i = 0; i < valueSelectors.length; i++) {
if (valueSelectors[i].isNull()) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;

import javax.annotation.Nullable;
import java.io.Closeable;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
Expand All @@ -42,7 +41,7 @@
* (2) Query engines are freed from the need to manage how much space each individual aggregator needs. They only
* need to allocate a block of size "spaceNeeded".
*/
public class AggregatorAdapters implements Closeable
public class AggregatorAdapters
{
private static final Logger log = new Logger(AggregatorAdapters.class);

Expand Down Expand Up @@ -230,14 +229,14 @@ public void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, Byt
}

/**
* Close all of our aggregators.
* Reset all of our aggregators, releasing resources held by them. After this, this instance may be reused or
* it may be discarded.
*/
@Override
public void close()
public void reset()
{
for (Adapter adapter : adapters) {
try {
adapter.close();
adapter.reset();
}
catch (Exception e) {
log.warn(e, "Could not close aggregator [%s], skipping.", adapter.getFactory().getName());
Expand All @@ -250,7 +249,7 @@ public void close()
* BufferAggregator and VectorAggregator. Private, since it doesn't escape this class and the
* only two implementations are private static classes below.
*/
private interface Adapter extends Closeable
private interface Adapter
{
void init(ByteBuffer buf, int position);

Expand All @@ -259,8 +258,7 @@ private interface Adapter extends Closeable

void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer);

@Override
void close();
void reset();

AggregatorFactory getFactory();

Expand Down Expand Up @@ -293,7 +291,7 @@ public Object get(final ByteBuffer buf, final int position)
}

@Override
public void close()
public void reset()
{
aggregator.close();
}
Expand Down Expand Up @@ -352,7 +350,7 @@ public Object get(final ByteBuffer buf, final int position)
}

@Override
public void close()
public void reset()
{
aggregator.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,11 @@ default double getDouble(ByteBuffer buf, int position)
}

/**
* Release any resources used by the aggregator
* Release any resources used by the aggregator. The aggregator may be reused after this call, by calling
* {@link #init(ByteBuffer, int)} followed by other methods as normal.
*
* This call would be more properly named "reset", but we use the name "close" to improve compatibility with
* existing aggregator implementations in extensions.
*/
void close();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,11 @@ default void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, By
}

/**
* Release any resources used by the aggregator.
* Release any resources used by the aggregator. The aggregator may be reused after this call, by calling
* {@link #init(ByteBuffer, int)} followed by other methods as normal.
*
* This call would be more properly named "reset", but we use the name "close" to improve compatibility with
* existing aggregator implementations in extensions.
*/
void close();
}
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ public AggregateResult aggregate(KeyType key, int keyHash)
public void close()
{
keySerde.reset();
aggregators.close();
aggregators.reset();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ public void reset()
{
// Clear the entire usedFlagBuffer
usedFlagMemory.clear();
aggregators.reset();
}

@Override
Expand All @@ -280,7 +281,7 @@ public IntGrouperHashFunction hashFunction()
@Override
public void close()
{
aggregators.close();
aggregators.reset();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ public void reset()
offsetList.reset();
hashTable.reset();
keySerde.reset();
aggregators.reset();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ public void reset()
}

this.hashTable = createTable(buffer, tableStart, numBuckets);
this.aggregators.reset();
}

@Override
Expand Down Expand Up @@ -256,7 +257,7 @@ public void close()
@Override
public void close()
{
aggregators.close();
aggregators.reset();
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ public void reset()
hashTable.reset();
keySerde.reset();
offsetHeap.reset();
aggregators.reset();
heapIndexUpdater.setHashTableBuffer(hashTable.getTableBuffer());
hasIterated = false;
offsetHeapIterableSize = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,9 @@ private Sequence<Result<TimeseriesResultValue>> processVectorized(
}

final VectorColumnSelectorFactory columnSelectorFactory = cursor.getColumnSelectorFactory();
final AggregatorAdapters aggregators = closer.register(
AggregatorAdapters.factorizeVector(columnSelectorFactory, query.getAggregatorSpecs())
);
final AggregatorAdapters aggregators =
AggregatorAdapters.factorizeVector(columnSelectorFactory, query.getAggregatorSpecs());
closer.register(aggregators::reset);

final ResourceHolder<ByteBuffer> bufferHolder = closer.register(bufferPool.take());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ private void runWithCardinalityKnown(

updateResults(params, theDimValSelector, aggregatesStore, resultBuilder);

closeAggregators(aggregatesStore);
resetAggregators(aggregatesStore);

numProcessed += numToProcess;
params.getCursor().reset();
Expand Down Expand Up @@ -151,7 +151,7 @@ private void runWithCardinalityUnknown(
}
long processedRows = scanAndAggregate(params, null, aggregatesStore);
updateResults(params, null, aggregatesStore, resultBuilder);
closeAggregators(aggregatesStore);
resetAggregators(aggregatesStore);
params.getCursor().reset();
if (queryMetrics != null) {
queryMetrics.addProcessedRows(processedRows);
Expand Down Expand Up @@ -199,7 +199,7 @@ protected abstract void updateResults(
TopNResultBuilder resultBuilder
);

protected abstract void closeAggregators(
protected abstract void resetAggregators(
DimValAggregateStore dimValAggregateStore
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ protected void updateResults(
}

@Override
protected void closeAggregators(TopNColumnAggregatesProcessor processor)
protected void resetAggregators(TopNColumnAggregatesProcessor processor)
{
processor.closeAggregators();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -768,7 +768,7 @@ protected void updateResults(
}

@Override
protected void closeAggregators(BufferAggregator[] bufferAggregators)
protected void resetAggregators(BufferAggregator[] bufferAggregators)
{
for (BufferAggregator agg : bufferAggregators) {
agg.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ protected void updateResults(
}

@Override
protected void closeAggregators(Map<Object, Aggregator[]> stringMap)
protected void resetAggregators(Map<Object, Aggregator[]> stringMap)
{
for (Aggregator[] aggregators : stringMap.values()) {
for (Aggregator agg : aggregators) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public void testCloseAggregatorAdaptorsShouldBeClosed()
);
grouper.initVectorized(512);
grouper.close();
Mockito.verify(aggregatorAdapters, Mockito.times(1)).close();
Mockito.verify(aggregatorAdapters, Mockito.times(2)).reset();
}

@Test
Expand Down

0 comments on commit 274ccbf

Please sign in to comment.