Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Vectorize earliest aggregator for both numeric and string types #14408

Merged
merged 26 commits into from
Sep 5, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
2b556f6
Vectorizing earliest for numeric
somu-imply Jun 12, 2023
59118ae
Vectorizing earliest string aggregator
somu-imply Jun 12, 2023
6c139de
checkstyle fix
somu-imply Jun 12, 2023
556b3de
Removing unnecessary exceptions
somu-imply Jun 13, 2023
cf6fe0f
Ignoring tests in MSQ as earliest is not supported for numeric there
somu-imply Jun 16, 2023
df3db6e
Merge remote-tracking branch 'upstream/master' into vectorize_earlies…
somu-imply Jun 16, 2023
cf88e00
Fixing benchmarks
somu-imply Jun 16, 2023
a9a6fc2
Updating tests as MSQ does not support earliest for some cases
somu-imply Jun 19, 2023
5f65c42
Merge remote-tracking branch 'upstream/master' into vectorize_earlies…
somu-imply Jul 7, 2023
f78ca05
Addressing review comments by adding the following:
somu-imply Jul 7, 2023
6cae490
Addressing issues for dictionary encoded single string columns where …
somu-imply Jul 13, 2023
ef87989
Adding a flag for multi value dimension selector
somu-imply Jul 13, 2023
4c5813d
Addressing comments
somu-imply Jul 19, 2023
67fce5f
1 more change
somu-imply Jul 19, 2023
ccfd600
Merge remote-tracking branch 'upstream/master' into vectorize_earlies…
somu-imply Aug 7, 2023
1c372af
Handling review comments part 1
somu-imply Aug 7, 2023
aa97181
Merge remote-tracking branch 'upstream/master' into vectorize_earlies…
somu-imply Aug 15, 2023
f585412
Handling review comments and correctness fix for latest_by when the t…
somu-imply Aug 16, 2023
4291709
Updating numeric first vector agg
somu-imply Aug 17, 2023
44c3a4c
Merge remote-tracking branch 'upstream/master' into vectorize_earlies…
somu-imply Aug 17, 2023
890c865
Revert "Updating numeric first vector agg"
somu-imply Aug 21, 2023
6e75540
Updating code for correctness issues
somu-imply Aug 21, 2023
3664b87
fixing an issue with latest agg
somu-imply Aug 22, 2023
83a784a
Adding more comments and removing an unnecessary check
somu-imply Aug 22, 2023
47762c4
Merge remote-tracking branch 'upstream/master' into vectorize_earlies…
somu-imply Aug 25, 2023
f4ddb7c
Addressing null checks for tie selector and only vectorize false for …
somu-imply Aug 25, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,15 @@ public String getFormatString()
"SELECT LATEST(float3), LATEST(long1), LATEST(double4) FROM foo",
// 42,43: filter numeric nulls
"SELECT SUM(long5) FROM foo WHERE long5 IS NOT NULL",
"SELECT string2, SUM(long5) FROM foo WHERE long5 IS NOT NULL GROUP BY 1"
"SELECT string2, SUM(long5) FROM foo WHERE long5 IS NOT NULL GROUP BY 1",
// 44: EARLIEST aggregator
"SELECT EARLIEST(long1) FROM foo",
// 45: EARLIEST aggregator double
"SELECT EARLIEST(double4) FROM foo",
// 46: EARLIEST aggregator float
"SELECT EARLIEST(float3) FROM foo",
// 47: EARLIEST aggregator all
"SELECT EARLIEST(float3), EARLIEST(long1), EARLIEST(double4) FROM foo"
);

@Param({"5000000"})
Expand Down Expand Up @@ -273,7 +281,11 @@ public String getFormatString()
"40",
"41",
"42",
"43"
"43",
"44",
"45",
"46",
"47"
})
private String query;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,21 @@
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.VectorAggregator;
import org.apache.druid.query.aggregation.any.NumericNilVectorAggregator;
import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.BaseDoubleColumnValueSelector;
import org.apache.druid.segment.ColumnInspector;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.NilColumnValueSelector;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.vector.BaseLongVectorValueSelector;
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
import org.apache.druid.segment.vector.VectorValueSelector;

import javax.annotation.Nullable;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -97,6 +104,12 @@ public DoubleFirstAggregatorFactory(
this.storeDoubleAsFloat = ColumnHolder.storeDoubleAsFloat();
}

@Override
public boolean canVectorize(ColumnInspector columnInspector)
{
return true;
}

@Override
public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
Expand Down Expand Up @@ -125,6 +138,23 @@ public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
}
}

@Override
public VectorAggregator factorizeVector(
VectorColumnSelectorFactory columnSelectorFactory
)
{
ColumnCapabilities capabilities = columnSelectorFactory.getColumnCapabilities(fieldName);
VectorValueSelector valueSelector = columnSelectorFactory.makeValueSelector(fieldName);
//time is always long
BaseLongVectorValueSelector timeSelector = (BaseLongVectorValueSelector) columnSelectorFactory.makeValueSelector(
timeColumn);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two things:

  1. you don't need either of these until after you've checked capabilities. Don't bother creating them if you don't need them.
  2. This is casting to BaseLongVectorValueSelector, but the arguments on DoubleFirstVectorAggregator don't seem to care about the cast at all. Either it's important that we cast and we force the case, OR it's not important and we shouldn't force the case. The current code makes me think that it's not important.

if (capabilities == null || capabilities.isNumeric()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the vectorized engine, capabilities being null means the column doesn't exist, and so you can use the nil aggregation i think?

return new DoubleFirstVectorAggregator(timeSelector, valueSelector);
} else {
return NumericNilVectorAggregator.doubleNilVectorAggregator();
}
}

@Override
public Comparator getComparator()
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.query.aggregation.first;

import org.apache.druid.collections.SerializablePair;
import org.apache.druid.segment.vector.VectorValueSelector;

import javax.annotation.Nullable;
import java.nio.ByteBuffer;

/**
* Vectorized version of on heap 'earliest' aggregator for column selectors with type LONG..
*/
public class DoubleFirstVectorAggregator extends NumericFirstVectorAggregator
{
double firstValue;

public DoubleFirstVectorAggregator(VectorValueSelector timeSelector, VectorValueSelector valueSelector)
{
super(timeSelector, valueSelector);
firstValue = 0;
}

@Override
public void initValue(ByteBuffer buf, int position)
{
buf.putDouble(position, 0);
}


@Override
void putValue(ByteBuffer buf, int position, int index)
{
firstValue = valueSelector.getDoubleVector()[index];
buf.putDouble(position, firstValue);
}


/**
* @return The primitive object stored at the position in the buffer.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment says that it's returning a primitive, but the method is returning a SerializablePair. Which one is supposed to be correct?

*/
@Nullable
@Override
public Object get(ByteBuffer buf, int position)
{
final boolean rhsNull = isValueNull(buf, position);
return new SerializablePair<>(buf.getLong(position), rhsNull ? null : buf.getDouble(position + VALUE_OFFSET));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,21 @@
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.VectorAggregator;
import org.apache.druid.query.aggregation.any.NumericNilVectorAggregator;
import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.BaseFloatColumnValueSelector;
import org.apache.druid.segment.ColumnInspector;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.NilColumnValueSelector;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.vector.BaseLongVectorValueSelector;
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
import org.apache.druid.segment.vector.VectorValueSelector;

import javax.annotation.Nullable;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -123,6 +130,27 @@ public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
}
}

@Override
public VectorAggregator factorizeVector(VectorColumnSelectorFactory columnSelectorFactory)
{
ColumnCapabilities capabilities = columnSelectorFactory.getColumnCapabilities(fieldName);
VectorValueSelector valueSelector = columnSelectorFactory.makeValueSelector(fieldName);
//time is always long
BaseLongVectorValueSelector timeSelector = (BaseLongVectorValueSelector) columnSelectorFactory.makeValueSelector(
timeColumn);
if (capabilities == null || capabilities.isNumeric()) {
return new FloatFirstVectorAggregator(timeSelector, valueSelector);
} else {
return NumericNilVectorAggregator.floatNilVectorAggregator();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like the Double one which I had comments on, please apply here too

}

@Override
public boolean canVectorize(ColumnInspector columnInspector)
{
return true;
}

@Override
public Comparator getComparator()
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.query.aggregation.first;

import org.apache.druid.collections.SerializablePair;
import org.apache.druid.segment.vector.VectorValueSelector;

import javax.annotation.Nullable;
import java.nio.ByteBuffer;

/**
* Vectorized version of on heap 'earliest' aggregator for column selectors with type LONG..
*/
public class FloatFirstVectorAggregator extends NumericFirstVectorAggregator
{
float firstValue;

public FloatFirstVectorAggregator(VectorValueSelector timeSelector, VectorValueSelector valueSelector)
{
super(timeSelector, valueSelector);
firstValue = 0;
}

@Override
public void initValue(ByteBuffer buf, int position)
{
buf.putFloat(position, 0);
}


@Override
void putValue(ByteBuffer buf, int position, int index)
{
firstValue = valueSelector.getFloatVector()[index];
buf.putFloat(position, firstValue);
}


/**
* @return The primitive object stored at the position in the buffer.
*/
@Nullable
@Override
public Object get(ByteBuffer buf, int position)
{
final boolean rhsNull = isValueNull(buf, position);
return new SerializablePair<>(buf.getLong(position), rhsNull ? null : buf.getFloat(position + VALUE_OFFSET));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,21 @@
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.VectorAggregator;
import org.apache.druid.query.aggregation.any.NumericNilVectorAggregator;
import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.BaseLongColumnValueSelector;
import org.apache.druid.segment.ColumnInspector;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.NilColumnValueSelector;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.vector.BaseLongVectorValueSelector;
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
import org.apache.druid.segment.vector.VectorValueSelector;

import javax.annotation.Nullable;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -122,6 +129,26 @@ public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
}
}

@Override
public VectorAggregator factorizeVector(VectorColumnSelectorFactory columnSelectorFactory)
{
ColumnCapabilities capabilities = columnSelectorFactory.getColumnCapabilities(fieldName);
VectorValueSelector valueSelector = columnSelectorFactory.makeValueSelector(fieldName);
BaseLongVectorValueSelector timeSelector = (BaseLongVectorValueSelector) columnSelectorFactory.makeValueSelector(
timeColumn);
if (capabilities == null || capabilities.isNumeric()) {
return new LongFirstVectorAggregator(timeSelector, valueSelector);
} else {
return NumericNilVectorAggregator.longNilVectorAggregator();
}
}

@Override
public boolean canVectorize(ColumnInspector columnInspector)
{
return true;
}

@Override
public Comparator getComparator()
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.query.aggregation.first;

import org.apache.druid.collections.SerializablePair;
import org.apache.druid.segment.vector.VectorValueSelector;

import javax.annotation.Nullable;
import java.nio.ByteBuffer;

/**
* Vectorized version of on heap 'earliest' aggregator for column selectors with type LONG..
*/
public class LongFirstVectorAggregator extends NumericFirstVectorAggregator
{
long firstValue;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason these are fields instead of just a local variable?


public LongFirstVectorAggregator(VectorValueSelector timeSelector, VectorValueSelector valueSelector)
{
super(timeSelector, valueSelector);
firstValue = 0;
}

@Override
public void initValue(ByteBuffer buf, int position)
{
buf.putLong(position, 0);
}


@Override
void putValue(ByteBuffer buf, int position, int index)
{
firstValue = valueSelector.getLongVector()[index];
buf.putLong(position, firstValue);
}


/**
* @return The primitive object stored at the position in the buffer.
*/
@Nullable
@Override
public Object get(ByteBuffer buf, int position)
{
final boolean rhsNull = isValueNull(buf, position);
return new SerializablePair<>(buf.getLong(position), rhsNull ? null : buf.getLong(position + VALUE_OFFSET));
}
}
Loading