-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Vectorize earliest aggregator for both numeric and string types #14408
Changes from 3 commits
2b556f6
59118ae
6c139de
556b3de
cf6fe0f
df3db6e
cf88e00
a9a6fc2
5f65c42
f78ca05
6cae490
ef87989
4c5813d
67fce5f
ccfd600
1c372af
aa97181
f585412
4291709
44c3a4c
890c865
6e75540
3664b87
83a784a
47762c4
f4ddb7c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,14 +29,21 @@ | |
import org.apache.druid.query.aggregation.AggregatorFactory; | ||
import org.apache.druid.query.aggregation.AggregatorUtil; | ||
import org.apache.druid.query.aggregation.BufferAggregator; | ||
import org.apache.druid.query.aggregation.VectorAggregator; | ||
import org.apache.druid.query.aggregation.any.NumericNilVectorAggregator; | ||
import org.apache.druid.query.cache.CacheKeyBuilder; | ||
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; | ||
import org.apache.druid.segment.BaseDoubleColumnValueSelector; | ||
import org.apache.druid.segment.ColumnInspector; | ||
import org.apache.druid.segment.ColumnSelectorFactory; | ||
import org.apache.druid.segment.ColumnValueSelector; | ||
import org.apache.druid.segment.NilColumnValueSelector; | ||
import org.apache.druid.segment.column.ColumnCapabilities; | ||
import org.apache.druid.segment.column.ColumnHolder; | ||
import org.apache.druid.segment.column.ColumnType; | ||
import org.apache.druid.segment.vector.BaseLongVectorValueSelector; | ||
import org.apache.druid.segment.vector.VectorColumnSelectorFactory; | ||
import org.apache.druid.segment.vector.VectorValueSelector; | ||
|
||
import javax.annotation.Nullable; | ||
import java.nio.ByteBuffer; | ||
|
@@ -97,6 +104,12 @@ public DoubleFirstAggregatorFactory( | |
this.storeDoubleAsFloat = ColumnHolder.storeDoubleAsFloat(); | ||
} | ||
|
||
@Override | ||
public boolean canVectorize(ColumnInspector columnInspector) | ||
{ | ||
return true; | ||
} | ||
|
||
@Override | ||
public Aggregator factorize(ColumnSelectorFactory metricFactory) | ||
{ | ||
|
@@ -125,6 +138,23 @@ public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) | |
} | ||
} | ||
|
||
@Override | ||
public VectorAggregator factorizeVector( | ||
VectorColumnSelectorFactory columnSelectorFactory | ||
) | ||
{ | ||
ColumnCapabilities capabilities = columnSelectorFactory.getColumnCapabilities(fieldName); | ||
VectorValueSelector valueSelector = columnSelectorFactory.makeValueSelector(fieldName); | ||
//time is always long | ||
BaseLongVectorValueSelector timeSelector = (BaseLongVectorValueSelector) columnSelectorFactory.makeValueSelector( | ||
timeColumn); | ||
if (capabilities == null || capabilities.isNumeric()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. in the vectorized engine, capabilities being null means the column doesn't exist, and so you can use the nil aggregation i think? |
||
return new DoubleFirstVectorAggregator(timeSelector, valueSelector); | ||
} else { | ||
return NumericNilVectorAggregator.doubleNilVectorAggregator(); | ||
} | ||
} | ||
|
||
@Override | ||
public Comparator getComparator() | ||
{ | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package org.apache.druid.query.aggregation.first; | ||
|
||
import org.apache.druid.collections.SerializablePair; | ||
import org.apache.druid.segment.vector.VectorValueSelector; | ||
|
||
import javax.annotation.Nullable; | ||
import java.nio.ByteBuffer; | ||
|
||
/** | ||
* Vectorized version of on heap 'earliest' aggregator for column selectors with type LONG.. | ||
*/ | ||
public class DoubleFirstVectorAggregator extends NumericFirstVectorAggregator | ||
{ | ||
double firstValue; | ||
|
||
public DoubleFirstVectorAggregator(VectorValueSelector timeSelector, VectorValueSelector valueSelector) | ||
{ | ||
super(timeSelector, valueSelector); | ||
firstValue = 0; | ||
} | ||
|
||
@Override | ||
public void initValue(ByteBuffer buf, int position) | ||
{ | ||
buf.putDouble(position, 0); | ||
} | ||
|
||
|
||
@Override | ||
void putValue(ByteBuffer buf, int position, int index) | ||
{ | ||
firstValue = valueSelector.getDoubleVector()[index]; | ||
buf.putDouble(position, firstValue); | ||
} | ||
|
||
|
||
/** | ||
* @return The primitive object stored at the position in the buffer. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This comment says that it's returning a primitive, but the method is returning a SerializablePair. Which one is supposed to be correct? |
||
*/ | ||
@Nullable | ||
@Override | ||
public Object get(ByteBuffer buf, int position) | ||
{ | ||
final boolean rhsNull = isValueNull(buf, position); | ||
return new SerializablePair<>(buf.getLong(position), rhsNull ? null : buf.getDouble(position + VALUE_OFFSET)); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,14 +29,21 @@ | |
import org.apache.druid.query.aggregation.AggregatorFactory; | ||
import org.apache.druid.query.aggregation.AggregatorUtil; | ||
import org.apache.druid.query.aggregation.BufferAggregator; | ||
import org.apache.druid.query.aggregation.VectorAggregator; | ||
import org.apache.druid.query.aggregation.any.NumericNilVectorAggregator; | ||
import org.apache.druid.query.cache.CacheKeyBuilder; | ||
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; | ||
import org.apache.druid.segment.BaseFloatColumnValueSelector; | ||
import org.apache.druid.segment.ColumnInspector; | ||
import org.apache.druid.segment.ColumnSelectorFactory; | ||
import org.apache.druid.segment.ColumnValueSelector; | ||
import org.apache.druid.segment.NilColumnValueSelector; | ||
import org.apache.druid.segment.column.ColumnCapabilities; | ||
import org.apache.druid.segment.column.ColumnHolder; | ||
import org.apache.druid.segment.column.ColumnType; | ||
import org.apache.druid.segment.vector.BaseLongVectorValueSelector; | ||
import org.apache.druid.segment.vector.VectorColumnSelectorFactory; | ||
import org.apache.druid.segment.vector.VectorValueSelector; | ||
|
||
import javax.annotation.Nullable; | ||
import java.nio.ByteBuffer; | ||
|
@@ -123,6 +130,27 @@ public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) | |
} | ||
} | ||
|
||
@Override | ||
public VectorAggregator factorizeVector(VectorColumnSelectorFactory columnSelectorFactory) | ||
{ | ||
ColumnCapabilities capabilities = columnSelectorFactory.getColumnCapabilities(fieldName); | ||
VectorValueSelector valueSelector = columnSelectorFactory.makeValueSelector(fieldName); | ||
//time is always long | ||
BaseLongVectorValueSelector timeSelector = (BaseLongVectorValueSelector) columnSelectorFactory.makeValueSelector( | ||
timeColumn); | ||
if (capabilities == null || capabilities.isNumeric()) { | ||
return new FloatFirstVectorAggregator(timeSelector, valueSelector); | ||
} else { | ||
return NumericNilVectorAggregator.floatNilVectorAggregator(); | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This looks like the Double one which I had comments on, please apply here too |
||
} | ||
|
||
@Override | ||
public boolean canVectorize(ColumnInspector columnInspector) | ||
{ | ||
return true; | ||
} | ||
|
||
@Override | ||
public Comparator getComparator() | ||
{ | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package org.apache.druid.query.aggregation.first; | ||
|
||
import org.apache.druid.collections.SerializablePair; | ||
import org.apache.druid.segment.vector.VectorValueSelector; | ||
|
||
import javax.annotation.Nullable; | ||
import java.nio.ByteBuffer; | ||
|
||
/** | ||
* Vectorized version of on heap 'earliest' aggregator for column selectors with type LONG.. | ||
*/ | ||
public class FloatFirstVectorAggregator extends NumericFirstVectorAggregator | ||
{ | ||
float firstValue; | ||
|
||
public FloatFirstVectorAggregator(VectorValueSelector timeSelector, VectorValueSelector valueSelector) | ||
{ | ||
super(timeSelector, valueSelector); | ||
firstValue = 0; | ||
} | ||
|
||
@Override | ||
public void initValue(ByteBuffer buf, int position) | ||
{ | ||
buf.putFloat(position, 0); | ||
} | ||
|
||
|
||
@Override | ||
void putValue(ByteBuffer buf, int position, int index) | ||
{ | ||
firstValue = valueSelector.getFloatVector()[index]; | ||
buf.putFloat(position, firstValue); | ||
} | ||
|
||
|
||
/** | ||
* @return The primitive object stored at the position in the buffer. | ||
*/ | ||
@Nullable | ||
@Override | ||
public Object get(ByteBuffer buf, int position) | ||
{ | ||
final boolean rhsNull = isValueNull(buf, position); | ||
return new SerializablePair<>(buf.getLong(position), rhsNull ? null : buf.getFloat(position + VALUE_OFFSET)); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package org.apache.druid.query.aggregation.first; | ||
|
||
import org.apache.druid.collections.SerializablePair; | ||
import org.apache.druid.segment.vector.VectorValueSelector; | ||
|
||
import javax.annotation.Nullable; | ||
import java.nio.ByteBuffer; | ||
|
||
/** | ||
* Vectorized version of on heap 'earliest' aggregator for column selectors with type LONG.. | ||
*/ | ||
public class LongFirstVectorAggregator extends NumericFirstVectorAggregator | ||
{ | ||
long firstValue; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. any reason these are fields instead of just a local variable? |
||
|
||
public LongFirstVectorAggregator(VectorValueSelector timeSelector, VectorValueSelector valueSelector) | ||
{ | ||
super(timeSelector, valueSelector); | ||
firstValue = 0; | ||
} | ||
|
||
@Override | ||
public void initValue(ByteBuffer buf, int position) | ||
{ | ||
buf.putLong(position, 0); | ||
} | ||
|
||
|
||
@Override | ||
void putValue(ByteBuffer buf, int position, int index) | ||
{ | ||
firstValue = valueSelector.getLongVector()[index]; | ||
buf.putLong(position, firstValue); | ||
} | ||
|
||
|
||
/** | ||
* @return The primitive object stored at the position in the buffer. | ||
*/ | ||
@Nullable | ||
@Override | ||
public Object get(ByteBuffer buf, int position) | ||
{ | ||
final boolean rhsNull = isValueNull(buf, position); | ||
return new SerializablePair<>(buf.getLong(position), rhsNull ? null : buf.getLong(position + VALUE_OFFSET)); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two things:
BaseLongVectorValueSelector
, but the arguments onDoubleFirstVectorAggregator
don't seem to care about the cast at all. Either it's important that we cast and we force the case, OR it's not important and we shouldn't force the case. The current code makes me think that it's not important.