-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Vectorize earliest aggregator for both numeric and string types #14408
Conversation
...sing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstVectorAggregator.java
Fixed
Show fixed
Hide fixed
@@ -205,7 +205,7 @@ public String getFormatString() | |||
"SELECT TIME_SHIFT(MILLIS_TO_TIMESTAMP(long4), 'PT1H', 1), string2, SUM(long1 * double4) FROM foo GROUP BY 1,2 ORDER BY 3", | |||
// 37: time shift + expr agg (group by), uniform distribution high cardinality | |||
"SELECT TIME_SHIFT(MILLIS_TO_TIMESTAMP(long5), 'PT1H', 1), string2, SUM(long1 * double4) FROM foo GROUP BY 1,2 ORDER BY 3", | |||
// 38: LATEST aggregator | |||
// 38: LATEST aggregator long | |||
"SELECT LATEST(long1) FROM foo", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: fwiw these benchmarks were primarily meant for testing vectorized expression virtual columns, SqlBenchmark
is the general purpose place for measuring stuff, that said these don't hurt being here and they have a bit less baggage than SqlBenchmark
//time is always long | ||
BaseLongVectorValueSelector timeSelector = (BaseLongVectorValueSelector) columnSelectorFactory.makeValueSelector( | ||
timeColumn); | ||
if (capabilities == null || capabilities.isNumeric()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in the vectorized engine, capabilities being null means the column doesn't exist, and so you can use the nil aggregation i think?
if (timeVector[row] < firstTime) { | ||
if (useDefault || nulls == null || !nulls[row]) { | ||
updateTimeWithValue(buf, position, timeVector[row], row); | ||
} else { | ||
updateTimeWithNull(buf, position, timeVector[row]); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the docs seem to indicate that we pick the first non-null value, however looking at the non-vectorized aggregator it looks like we just pick the first value, which is also what we are doing here.
I guess allowing the native aggregator to pick the first value even if it is null is a bit more expressive than always ignoring null values, since we could always wrap this in a filtered aggregator (i vaguely remember having this exact discussion years ago for #9161), but otoh it doesn't seem like very typical behavior for SQL, which usually ignores null values for most aggregation functions. (the 'any' aggregator also behaves consistently with this and will return any value including null).
I wonder if we should either change the SQL conversion stuff to always wrap with a filtered agg to remove nulls, or modify the documentation to indicate that this function will return null values if the earliest row is null.
boolean[] nulls = useDefault ? null : valueSelector.getNullVector(); | ||
long[] timeVector = timeSelector.getLongVector(); | ||
|
||
for (int i = 0; i < numRows; i++) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since this is a hot loop, it might be worth splitting up the two loops into 'has a null vector' and 'doesnt have a null vector' cases, though that's worth measuring to see if it makes a difference
@@ -1374,12 +1367,46 @@ public void testStringAnyInSubquery() | |||
); | |||
} | |||
|
|||
@Test | |||
public void testOffHeapEarliestGroupBy() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seems already covered by other tests that removed 'skipVectorize' statements?
@@ -14721,4 +14743,39 @@ public void testFilterWithNVLAndNotIn() | |||
) | |||
); | |||
} | |||
|
|||
@Test | |||
public void testEarliestVectorAggregators() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comment about maybe redundant test
|
||
public class LongFirstVectorAggregator extends NumericFirstVectorAggregator | ||
{ | ||
long firstValue; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any reason these are fields instead of just a local variable?
} | ||
|
||
/** | ||
*Updates the time only to the appropriate position in buffer as the value is null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit formatting (space after *)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove Mockito from your tests. Just as a rule, if you ever run into Mockito when doing a change, take the time to remove it.
Also, we can do better optimizations by taking advantage of the dictionaries for the string versions, please implement those.
|
||
|
||
/** | ||
* @return The primitive object stored at the position in the buffer. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment says that it's returning a primitive, but the method is returning a SerializablePair. Which one is supposed to be correct?
VectorValueSelector valueSelector = columnSelectorFactory.makeValueSelector(fieldName); | ||
//time is always long | ||
BaseLongVectorValueSelector timeSelector = (BaseLongVectorValueSelector) columnSelectorFactory.makeValueSelector( | ||
timeColumn); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two things:
- you don't need either of these until after you've checked capabilities. Don't bother creating them if you don't need them.
- This is casting to
BaseLongVectorValueSelector
, but the arguments onDoubleFirstVectorAggregator
don't seem to care about the cast at all. Either it's important that we cast and we force the case, OR it's not important and we shouldn't force the case. The current code makes me think that it's not important.
ColumnCapabilities capabilities = columnSelectorFactory.getColumnCapabilities(fieldName); | ||
VectorValueSelector valueSelector = columnSelectorFactory.makeValueSelector(fieldName); | ||
//time is always long | ||
BaseLongVectorValueSelector timeSelector = (BaseLongVectorValueSelector) columnSelectorFactory.makeValueSelector( | ||
timeColumn); | ||
if (capabilities == null || capabilities.isNumeric()) { | ||
return new FloatFirstVectorAggregator(timeSelector, valueSelector); | ||
} else { | ||
return NumericNilVectorAggregator.floatNilVectorAggregator(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like the Double one which I had comments on, please apply here too
ColumnCapabilities capabilities = selectorFactory.getColumnCapabilities(fieldName); | ||
VectorObjectSelector vSelector = selectorFactory.makeObjectSelector(fieldName); | ||
BaseLongVectorValueSelector timeSelector = (BaseLongVectorValueSelector) selectorFactory.makeValueSelector( | ||
timeColumn); | ||
if (capabilities != null) { | ||
return new StringFirstVectorAggregator(timeSelector, vSelector, maxStringBytes); | ||
} else { | ||
return new StringFirstVectorAggregator(null, vSelector, maxStringBytes); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can/should do this a bit more intelligently. Specifically, there are 3 different types of vector selectors that could be needed here and you will need to check column capabilities ahead of time to tell the difference:
- If it is a STRING and multi-valued, use the multivalue-dimension version
- If it is a STRING and single-valued, use the single value dimension version
- Otherwise use a VectorObjectSelector
Your implementation for (3) is in this PR already, for (1) and (2), you can read only the dictionary ids and just keep track of only the earliest dictionaryId (not the string, the dictionary id). Then, when get()
is called, convert the dictionary id into the String and truncate the size if necessary.
private final BaseLongVectorValueSelector timeSelector; | ||
private final VectorObjectSelector valueSelector; | ||
private final int maxStringBytes; | ||
//protected long firstTime; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
commented code alert
import java.nio.ByteBuffer; | ||
import java.util.concurrent.ThreadLocalRandom; | ||
|
||
@RunWith(MockitoJUnitRunner.class) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please re-write this to not use Mockito.
@Mock | ||
private VectorValueSelector selector; | ||
@Mock | ||
private BaseLongVectorValueSelector timeSelector; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are both interfaces, if there don't already exist test-oriented implementations of these interfaces, please create them instead of mocking things.
- Mockito needs to be killed from the codebase, it should not be used.
- The tests will always be easier to understand and debug if there is a test class implementation of the interface instead of using mocks.
Thanks for the comments, on the path to remove Mockito and address the optimizations as suggested |
1. Checking capabilities first before creating selectors 2. Removing mockito in tests for numeric first aggs 3. Removing unnecessary tests
…we can use the dictionary ids instead of the entire string
// select * from UNNEST(ARRAY[1,2,3]) as somu(d3) where somu.d3 IN ('a','b') | ||
this.base = dataSource; // table | ||
this.virtualColumn = virtualColumn; // MV_TO_ARRAY | ||
this.unnestFilter = unnestFilter; // d3 in (a,b) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: these comments seem strange, did you mean to leave them here? Also unrelated to this PR?
) | ||
{ | ||
ColumnCapabilities capabilities = columnSelectorFactory.getColumnCapabilities(fieldName); | ||
if (capabilities.isNumeric()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think you need to check for capabilities being null too, you should be able to confirm this by having a test for a column that doesn't exist (which is what vector engine returns for capabilities if column is missing)
public VectorAggregator factorizeVector(VectorColumnSelectorFactory columnSelectorFactory) | ||
{ | ||
ColumnCapabilities capabilities = columnSelectorFactory.getColumnCapabilities(fieldName); | ||
if (capabilities.isNumeric()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comment re null check
public VectorAggregator factorizeVector(VectorColumnSelectorFactory columnSelectorFactory) | ||
{ | ||
ColumnCapabilities capabilities = columnSelectorFactory.getColumnCapabilities(fieldName); | ||
if (capabilities.isNumeric()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto null check
firstTime = buf.getLong(position); | ||
int index = startRow; | ||
for (int i = startRow; i < endRow; i++) { | ||
if (valueVector[i].get(0) != 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this trying to check for null? if so you need to actually check that value 0 is null and not just the first value of the dictionary. if not, could you leave a comment about what is going on here?
if (timeVector[row] < firstTime) { | ||
firstTime = timeVector[row]; | ||
buf.putLong(position, firstTime); | ||
buf.put(position + NumericFirstVectorAggregator.NULL_OFFSET, NullHandling.IS_NOT_NULL_BYTE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't this be checking for the value being null or not? or is the assumption that we never set the null bit here and instead translate it in the get method? If that is the case, why do we need a null byte at all instead of just storing a long and an int in the buffer? Or is it to distinguish the case between 'aggregate' not being called from actually aggregating something? (e.g. an empty group should probably always spit out a null value...)
int index = buf.getInt(position + NumericFirstVectorAggregator.VALUE_OFFSET); | ||
long earliest = buf.getLong(position); | ||
String strValue = valueDimensionVectorSelector.lookupName(index); | ||
return new SerializablePairLongString(earliest, StringUtils.chop(strValue, maxStringBytes)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can this be wrong in the case where nothing was aggregated and id 0 in the dictionary is not null? it seems like we need to check the null byte here and return null if the null byte is set to null (since otherwise it appears as if it will be set to not null)
if (useDefault || nullValueVector == null || !nullValueVector[index]) { | ||
updateTimeWithValue(buf, position, firstTime, index); | ||
} else { | ||
updateTimeWithNull(buf, position, firstTime); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is somewhat confusing, the other loop is breaking if it finds a non-null value, but here we can still write a null i guess if it made it through the whole vector without breaking? That seems odd since it means that it finds the first non-null aggregator in a vector, else it finds the last timestamp in the first vector it reads?
buf.putLong(position, time); | ||
buf.put(position + NULL_OFFSET, NullHandling.IS_NOT_NULL_BYTE); | ||
putValue(buf, position + VALUE_OFFSET, index); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
some thoughts - since the value portion of this is basically the same behavior of NullableTypeStrategy
where there is a byte to track nulls and then the actual value bytes, I can't help but wonder if we could share some more code between all of the first/last aggregators by letting them use a NullableTypeStrategy
for whatever the underlying selector type is. This definitely doesn't need to be done in this PR, just thinking ahead for if we supported additional types like arrays.
import java.nio.ByteBuffer; | ||
import java.util.concurrent.ThreadLocalRandom; | ||
|
||
public class DoubleFirstVectorAggregationTest extends InitializedNullHandlingTest |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these tests seem to have a flaw in that they only test one vector? I don't have an example handy, but it seems like it would be nicer if the tests used a cursor/offset and advanced through all of the rows to provide a more realistic test case.
@clintropolis I had some confusion regarding the use of flag. It will be removed and the PR will be updated. Working on it and the other comments too |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the query integration test failures look possibly related:
Expected: [{timestamp=2013-01-01T00:00:00.000Z, result={added=9.11526338E8, count=2815650, firstAdded=39.0, lastAdded=210.0, firstCount=1, lastCount=1, quantilesDoublesSketch=2390950, approxCountTheta=219483.4076460526, approxCountHLL=216700, delta=5.48967603E8, variation=1.274085073E9, delta_hist={breaks=[-2634692.25, -2048505.0, -1462317.75, -876130.4375, -289943.125, 296244.1875, 882431.5, 1468619.0], counts=[1.0, 2.0, 1.0, 56.0, 2815544.0, 41.0, 5.0]}, unique_users=229361.39005604674, deleted=-3.62558735E8, rows=2390950}}],
Actual: [{timestamp=2013-01-01T00:00:00.000Z, result={firstCount=1, added=9.11526338E8, count=2815650, delta=5.48967603E8, lastCount=1, rows=2390950, firstAdded=39.0, variation=1.274085073E9, unique_users=229361.39005604674, deleted=-3.62558735E8, quantilesDoublesSketch=0, approxCountTheta=219483.4076460526, approxCountHLL=216700, lastAdded=210.0, delta_hist={breaks=[-2634692.25, -2048505.0, -1462317.75, -876130.4375, -289943.125, 296244.1875, 882431.5, 1468619.0], counts=[1.0, 2.0, 1.0, 56.0, 2815544.0, 41.0, 5.0]}}}]
specifically: expected quantilesDoublesSketch=2390950
but actual is quantilesDoublesSketch=0
, which might be a bug exposed by the query becoming vectorizable? We should look into what is happening here. We don't necessarily need to fix it in this PR, but it either needs fixed or these integration tests need to set the query context to not vectorize (assuming it is related to vectorization) so that the results don't change.
) | ||
{ | ||
ColumnCapabilities capabilities = columnSelectorFactory.getColumnCapabilities(fieldName); | ||
if (capabilities != null && capabilities.isNumeric()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can use Types.isNumeric(capabilities)
https://github.com/apache/druid/blob/master/processing/src/main/java/org/apache/druid/segment/column/Types.java#L120
public VectorAggregator factorizeVector(VectorColumnSelectorFactory columnSelectorFactory) | ||
{ | ||
ColumnCapabilities capabilities = columnSelectorFactory.getColumnCapabilities(fieldName); | ||
if (capabilities != null && capabilities.isNumeric()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can use Types.isNumeric
public VectorAggregator factorizeVector(VectorColumnSelectorFactory columnSelectorFactory) | ||
{ | ||
ColumnCapabilities capabilities = columnSelectorFactory.getColumnCapabilities(fieldName); | ||
if (capabilities != null && capabilities.isNumeric()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can use Types.isNumeric
// iterate once over the object vector to find first non null element and | ||
// determine if the type is Pair or not | ||
boolean foldNeeded = false; | ||
for (Object obj : objectsWhichMightBeStrings) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i forget, are selectors that spit out SerializablePairLongString
always not null values? if not, do we need to check that we actually found something inside of the loop that wasn't null? im thinking of the case of when the column is sparse and has lots of nulls, and the whole vector for this aggregate call is all nulls. I guess it ends up in the not-folding case, which is ok if the serializable pairs are never null
@Override | ||
public boolean[] getNullVector() | ||
{ | ||
return NULLS; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: the real time column will never have null values afaik
) | ||
{ | ||
ColumnCapabilities capabilities = columnSelectorFactory.getColumnCapabilities(fieldName); | ||
if (capabilities != null && Types.isNumeric(capabilities)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oops sorry for the confusion, Types.isNumeric
includes the null check on capabilities
so it isn't needed here
// the time vector is already sorted so the first element would be the earliest | ||
// traverse accordingly |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for not realizing this ... earlier(!), but actually this is only true when the timeSelector
is for the __time
column. If this aggregator is being used for LATEST_BY
with some expression virtual column then this assumption is not correct, since the time values could be produced from any column which may or may not be sorted (and which might also have nulls, so we probably also need to check null vector of the time selector).
It might be worth splitting out the implementation for earliest and earliest_by since the sorted __time column is probably a decent optimization in that specific case.
…ime expression need not be in sorted order
This reverts commit 4291709.
"useCache": "true", | ||
"vectorize": "false", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it should only be necessary to not vectorize queries which have the quantiles doubles sketch aggregator that seems possibly broken, please don't mark all of them like this
for (int i = startRow; i < endRow; i++) { | ||
index = i; | ||
if (nullTimeVector != null && nullTimeVector[index]) { | ||
continue; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how does this work if all of the time values are null? The docs seem to indicate that if the time column is null we just take the first value, but i'm not completely sure what that means (maybe the docs are wrong?). I wonder if we should we maybe treat rows with a null timestamp as if the timestamp were Long.MAX_VALUE and update the value? Though, I guess in that case we would have trouble distinguishing null time and null row from the initialized state. I suppose we could be consistent if we made a more general behavior that if there are multiple values with the same timestamp we take the first non-null value we encounter, though I suspect that would require change with the non-vector aggs too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So currently in case of non-null timestamps, we just take the first value. We iterate over the values and only update the first time if the current time is less than the earliest time. The issue arises when all timestamps are null. Considering the earliest where the time selector is __time, the chances of this happening are more when used on a secondary timestamp through earliest_by. In such a case, should we even return any results ? The docs point that
If expr comes from a relation with a timestamp column (like __time in a Druid datasource), the "earliest" is taken from the row with the overall earliest non-null value of the timestamp column.
@Override | ||
public void aggregate(ByteBuffer buf, int position, int startRow, int endRow) | ||
{ | ||
final long[] timeVector = timeSelector.getLongVector(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think we need to check the null vector here for the timeSelector
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed these
@Override | ||
public void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset) | ||
{ | ||
long[] timeVector = timeSelector.getLongVector(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comment about null vector for time selector
if (timeSelector == null) { | ||
return; | ||
} | ||
long[] times = timeSelector.getLongVector(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comment about checking null vector of timeSelector
@Override | ||
public void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset) | ||
{ | ||
long[] timeVector = timeSelector.getLongVector(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comment about checking null vector of timeSelector
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think we should consider splitting the string aggregator from the pair aggregator in the future since it should make the handling of both a lot cleaner
Will open a separate PR to go down two different paths for pair and not pair |
The latest agg has been already vectorized. This PR vectorizes the earliest aggregator to ensure both these aggregators are vectorized. Benchmarks are run for the cases
This PR has: