Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Vectorize earliest aggregator for both numeric and string types #14408

Merged
merged 26 commits into from
Sep 5, 2023

Conversation

somu-imply
Copy link
Contributor

@somu-imply somu-imply commented Jun 12, 2023

The latest agg has been already vectorized. This PR vectorizes the earliest aggregator to ensure both these aggregators are vectorized. Benchmarks are run for the cases

44 -> long
45 -> double
46 -> float 

SqlExpressionBenchmark.querySql       44           5000000        false  avgt    5  38.656 ± 0.695  ms/op
SqlExpressionBenchmark.querySql       44           5000000        force  avgt    5  28.519 ± 1.110  ms/op
SqlExpressionBenchmark.querySql       45           5000000        false  avgt    5  38.667 ± 1.259  ms/op
SqlExpressionBenchmark.querySql       45           5000000        force  avgt    5  17.051 ± 0.523  ms/op
SqlExpressionBenchmark.querySql       46           5000000        false  avgt    5  38.579 ± 0.484  ms/op
SqlExpressionBenchmark.querySql       46           5000000        force  avgt    5  15.587 ± 0.766  ms/op

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@somu-imply somu-imply marked this pull request as ready for review June 19, 2023 04:59
@@ -205,7 +205,7 @@ public String getFormatString()
"SELECT TIME_SHIFT(MILLIS_TO_TIMESTAMP(long4), 'PT1H', 1), string2, SUM(long1 * double4) FROM foo GROUP BY 1,2 ORDER BY 3",
// 37: time shift + expr agg (group by), uniform distribution high cardinality
"SELECT TIME_SHIFT(MILLIS_TO_TIMESTAMP(long5), 'PT1H', 1), string2, SUM(long1 * double4) FROM foo GROUP BY 1,2 ORDER BY 3",
// 38: LATEST aggregator
// 38: LATEST aggregator long
"SELECT LATEST(long1) FROM foo",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: fwiw these benchmarks were primarily meant for testing vectorized expression virtual columns, SqlBenchmark is the general purpose place for measuring stuff, that said these don't hurt being here and they have a bit less baggage than SqlBenchmark

//time is always long
BaseLongVectorValueSelector timeSelector = (BaseLongVectorValueSelector) columnSelectorFactory.makeValueSelector(
timeColumn);
if (capabilities == null || capabilities.isNumeric()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the vectorized engine, capabilities being null means the column doesn't exist, and so you can use the nil aggregation i think?

Comment on lines +122 to +127
if (timeVector[row] < firstTime) {
if (useDefault || nulls == null || !nulls[row]) {
updateTimeWithValue(buf, position, timeVector[row], row);
} else {
updateTimeWithNull(buf, position, timeVector[row]);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the docs seem to indicate that we pick the first non-null value, however looking at the non-vectorized aggregator it looks like we just pick the first value, which is also what we are doing here.

I guess allowing the native aggregator to pick the first value even if it is null is a bit more expressive than always ignoring null values, since we could always wrap this in a filtered aggregator (i vaguely remember having this exact discussion years ago for #9161), but otoh it doesn't seem like very typical behavior for SQL, which usually ignores null values for most aggregation functions. (the 'any' aggregator also behaves consistently with this and will return any value including null).

I wonder if we should either change the SQL conversion stuff to always wrap with a filtered agg to remove nulls, or modify the documentation to indicate that this function will return null values if the earliest row is null.

boolean[] nulls = useDefault ? null : valueSelector.getNullVector();
long[] timeVector = timeSelector.getLongVector();

for (int i = 0; i < numRows; i++) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since this is a hot loop, it might be worth splitting up the two loops into 'has a null vector' and 'doesnt have a null vector' cases, though that's worth measuring to see if it makes a difference

@@ -1374,12 +1367,46 @@ public void testStringAnyInSubquery()
);
}

@Test
public void testOffHeapEarliestGroupBy()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems already covered by other tests that removed 'skipVectorize' statements?

@@ -14721,4 +14743,39 @@ public void testFilterWithNVLAndNotIn()
)
);
}

@Test
public void testEarliestVectorAggregators()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment about maybe redundant test


public class LongFirstVectorAggregator extends NumericFirstVectorAggregator
{
long firstValue;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason these are fields instead of just a local variable?

}

/**
*Updates the time only to the appropriate position in buffer as the value is null
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit formatting (space after *)

Copy link
Contributor

@imply-cheddar imply-cheddar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove Mockito from your tests. Just as a rule, if you ever run into Mockito when doing a change, take the time to remove it.

Also, we can do better optimizations by taking advantage of the dictionaries for the string versions, please implement those.



/**
* @return The primitive object stored at the position in the buffer.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment says that it's returning a primitive, but the method is returning a SerializablePair. Which one is supposed to be correct?

Comment on lines 147 to 150
VectorValueSelector valueSelector = columnSelectorFactory.makeValueSelector(fieldName);
//time is always long
BaseLongVectorValueSelector timeSelector = (BaseLongVectorValueSelector) columnSelectorFactory.makeValueSelector(
timeColumn);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two things:

  1. you don't need either of these until after you've checked capabilities. Don't bother creating them if you don't need them.
  2. This is casting to BaseLongVectorValueSelector, but the arguments on DoubleFirstVectorAggregator don't seem to care about the cast at all. Either it's important that we cast and we force the case, OR it's not important and we shouldn't force the case. The current code makes me think that it's not important.

Comment on lines 136 to 145
ColumnCapabilities capabilities = columnSelectorFactory.getColumnCapabilities(fieldName);
VectorValueSelector valueSelector = columnSelectorFactory.makeValueSelector(fieldName);
//time is always long
BaseLongVectorValueSelector timeSelector = (BaseLongVectorValueSelector) columnSelectorFactory.makeValueSelector(
timeColumn);
if (capabilities == null || capabilities.isNumeric()) {
return new FloatFirstVectorAggregator(timeSelector, valueSelector);
} else {
return NumericNilVectorAggregator.floatNilVectorAggregator();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like the Double one which I had comments on, please apply here too

Comment on lines 166 to 174
ColumnCapabilities capabilities = selectorFactory.getColumnCapabilities(fieldName);
VectorObjectSelector vSelector = selectorFactory.makeObjectSelector(fieldName);
BaseLongVectorValueSelector timeSelector = (BaseLongVectorValueSelector) selectorFactory.makeValueSelector(
timeColumn);
if (capabilities != null) {
return new StringFirstVectorAggregator(timeSelector, vSelector, maxStringBytes);
} else {
return new StringFirstVectorAggregator(null, vSelector, maxStringBytes);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can/should do this a bit more intelligently. Specifically, there are 3 different types of vector selectors that could be needed here and you will need to check column capabilities ahead of time to tell the difference:

  1. If it is a STRING and multi-valued, use the multivalue-dimension version
  2. If it is a STRING and single-valued, use the single value dimension version
  3. Otherwise use a VectorObjectSelector

Your implementation for (3) is in this PR already, for (1) and (2), you can read only the dictionary ids and just keep track of only the earliest dictionaryId (not the string, the dictionary id). Then, when get() is called, convert the dictionary id into the String and truncate the size if necessary.

private final BaseLongVectorValueSelector timeSelector;
private final VectorObjectSelector valueSelector;
private final int maxStringBytes;
//protected long firstTime;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

commented code alert

import java.nio.ByteBuffer;
import java.util.concurrent.ThreadLocalRandom;

@RunWith(MockitoJUnitRunner.class)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please re-write this to not use Mockito.

Comment on lines 53 to 56
@Mock
private VectorValueSelector selector;
@Mock
private BaseLongVectorValueSelector timeSelector;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are both interfaces, if there don't already exist test-oriented implementations of these interfaces, please create them instead of mocking things.

  1. Mockito needs to be killed from the codebase, it should not be used.
  2. The tests will always be easier to understand and debug if there is a test class implementation of the interface instead of using mocks.

@somu-imply
Copy link
Contributor Author

Thanks for the comments, on the path to remove Mockito and address the optimizations as suggested

1. Checking capabilities first before creating selectors
2. Removing mockito in tests for numeric first aggs
3. Removing unnecessary tests
…we can use the dictionary ids instead of the entire string
Comment on lines 64 to 67
// select * from UNNEST(ARRAY[1,2,3]) as somu(d3) where somu.d3 IN ('a','b')
this.base = dataSource; // table
this.virtualColumn = virtualColumn; // MV_TO_ARRAY
this.unnestFilter = unnestFilter; // d3 in (a,b)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: these comments seem strange, did you mean to leave them here? Also unrelated to this PR?

)
{
ColumnCapabilities capabilities = columnSelectorFactory.getColumnCapabilities(fieldName);
if (capabilities.isNumeric()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think you need to check for capabilities being null too, you should be able to confirm this by having a test for a column that doesn't exist (which is what vector engine returns for capabilities if column is missing)

public VectorAggregator factorizeVector(VectorColumnSelectorFactory columnSelectorFactory)
{
ColumnCapabilities capabilities = columnSelectorFactory.getColumnCapabilities(fieldName);
if (capabilities.isNumeric()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment re null check

public VectorAggregator factorizeVector(VectorColumnSelectorFactory columnSelectorFactory)
{
ColumnCapabilities capabilities = columnSelectorFactory.getColumnCapabilities(fieldName);
if (capabilities.isNumeric()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto null check

firstTime = buf.getLong(position);
int index = startRow;
for (int i = startRow; i < endRow; i++) {
if (valueVector[i].get(0) != 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this trying to check for null? if so you need to actually check that value 0 is null and not just the first value of the dictionary. if not, could you leave a comment about what is going on here?

if (timeVector[row] < firstTime) {
firstTime = timeVector[row];
buf.putLong(position, firstTime);
buf.put(position + NumericFirstVectorAggregator.NULL_OFFSET, NullHandling.IS_NOT_NULL_BYTE);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this be checking for the value being null or not? or is the assumption that we never set the null bit here and instead translate it in the get method? If that is the case, why do we need a null byte at all instead of just storing a long and an int in the buffer? Or is it to distinguish the case between 'aggregate' not being called from actually aggregating something? (e.g. an empty group should probably always spit out a null value...)

Comment on lines 112 to 115
int index = buf.getInt(position + NumericFirstVectorAggregator.VALUE_OFFSET);
long earliest = buf.getLong(position);
String strValue = valueDimensionVectorSelector.lookupName(index);
return new SerializablePairLongString(earliest, StringUtils.chop(strValue, maxStringBytes));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be wrong in the case where nothing was aggregated and id 0 in the dictionary is not null? it seems like we need to check the null byte here and return null if the null byte is set to null (since otherwise it appears as if it will be set to not null)

Comment on lines +85 to +89
if (useDefault || nullValueVector == null || !nullValueVector[index]) {
updateTimeWithValue(buf, position, firstTime, index);
} else {
updateTimeWithNull(buf, position, firstTime);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is somewhat confusing, the other loop is breaking if it finds a non-null value, but here we can still write a null i guess if it made it through the whole vector without breaking? That seems odd since it means that it finds the first non-null aggregator in a vector, else it finds the last timestamp in the first vector it reads?

Comment on lines +142 to +145
buf.putLong(position, time);
buf.put(position + NULL_OFFSET, NullHandling.IS_NOT_NULL_BYTE);
putValue(buf, position + VALUE_OFFSET, index);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some thoughts - since the value portion of this is basically the same behavior of NullableTypeStrategy where there is a byte to track nulls and then the actual value bytes, I can't help but wonder if we could share some more code between all of the first/last aggregators by letting them use a NullableTypeStrategy for whatever the underlying selector type is. This definitely doesn't need to be done in this PR, just thinking ahead for if we supported additional types like arrays.

import java.nio.ByteBuffer;
import java.util.concurrent.ThreadLocalRandom;

public class DoubleFirstVectorAggregationTest extends InitializedNullHandlingTest
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these tests seem to have a flaw in that they only test one vector? I don't have an example handy, but it seems like it would be nicer if the tests used a cursor/offset and advanced through all of the rows to provide a more realistic test case.

@somu-imply
Copy link
Contributor Author

@clintropolis I had some confusion regarding the use of flag. It will be removed and the PR will be updated. Working on it and the other comments too

Copy link
Member

@clintropolis clintropolis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the query integration test failures look possibly related:

Expected: [{timestamp=2013-01-01T00:00:00.000Z, result={added=9.11526338E8, count=2815650, firstAdded=39.0, lastAdded=210.0, firstCount=1, lastCount=1, quantilesDoublesSketch=2390950, approxCountTheta=219483.4076460526, approxCountHLL=216700, delta=5.48967603E8, variation=1.274085073E9, delta_hist={breaks=[-2634692.25, -2048505.0, -1462317.75, -876130.4375, -289943.125, 296244.1875, 882431.5, 1468619.0], counts=[1.0, 2.0, 1.0, 56.0, 2815544.0, 41.0, 5.0]}, unique_users=229361.39005604674, deleted=-3.62558735E8, rows=2390950}}],
Actual: [{timestamp=2013-01-01T00:00:00.000Z, result={firstCount=1, added=9.11526338E8, count=2815650, delta=5.48967603E8, lastCount=1, rows=2390950, firstAdded=39.0, variation=1.274085073E9, unique_users=229361.39005604674, deleted=-3.62558735E8, quantilesDoublesSketch=0, approxCountTheta=219483.4076460526, approxCountHLL=216700, lastAdded=210.0, delta_hist={breaks=[-2634692.25, -2048505.0, -1462317.75, -876130.4375, -289943.125, 296244.1875, 882431.5, 1468619.0], counts=[1.0, 2.0, 1.0, 56.0, 2815544.0, 41.0, 5.0]}}}]

specifically: expected quantilesDoublesSketch=2390950 but actual is quantilesDoublesSketch=0, which might be a bug exposed by the query becoming vectorizable? We should look into what is happening here. We don't necessarily need to fix it in this PR, but it either needs fixed or these integration tests need to set the query context to not vectorize (assuming it is related to vectorization) so that the results don't change.

)
{
ColumnCapabilities capabilities = columnSelectorFactory.getColumnCapabilities(fieldName);
if (capabilities != null && capabilities.isNumeric()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

public VectorAggregator factorizeVector(VectorColumnSelectorFactory columnSelectorFactory)
{
ColumnCapabilities capabilities = columnSelectorFactory.getColumnCapabilities(fieldName);
if (capabilities != null && capabilities.isNumeric()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can use Types.isNumeric

public VectorAggregator factorizeVector(VectorColumnSelectorFactory columnSelectorFactory)
{
ColumnCapabilities capabilities = columnSelectorFactory.getColumnCapabilities(fieldName);
if (capabilities != null && capabilities.isNumeric()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can use Types.isNumeric

// iterate once over the object vector to find first non null element and
// determine if the type is Pair or not
boolean foldNeeded = false;
for (Object obj : objectsWhichMightBeStrings) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i forget, are selectors that spit out SerializablePairLongString always not null values? if not, do we need to check that we actually found something inside of the loop that wasn't null? im thinking of the case of when the column is sparse and has lots of nulls, and the whole vector for this aggregate call is all nulls. I guess it ends up in the not-folding case, which is ok if the serializable pairs are never null

@Override
public boolean[] getNullVector()
{
return NULLS;
Copy link
Member

@clintropolis clintropolis Aug 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the real time column will never have null values afaik

)
{
ColumnCapabilities capabilities = columnSelectorFactory.getColumnCapabilities(fieldName);
if (capabilities != null && Types.isNumeric(capabilities)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops sorry for the confusion, Types.isNumeric includes the null check on capabilities so it isn't needed here

Comment on lines 70 to 71
// the time vector is already sorted so the first element would be the earliest
// traverse accordingly
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for not realizing this ... earlier(!), but actually this is only true when the timeSelector is for the __time column. If this aggregator is being used for LATEST_BY with some expression virtual column then this assumption is not correct, since the time values could be produced from any column which may or may not be sorted (and which might also have nulls, so we probably also need to check null vector of the time selector).

It might be worth splitting out the implementation for earliest and earliest_by since the sorted __time column is probably a decent optimization in that specific case.

Comment on lines 16 to 17
"useCache": "true",
"vectorize": "false",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it should only be necessary to not vectorize queries which have the quantiles doubles sketch aggregator that seems possibly broken, please don't mark all of them like this

for (int i = startRow; i < endRow; i++) {
index = i;
if (nullTimeVector != null && nullTimeVector[index]) {
continue;
Copy link
Member

@clintropolis clintropolis Aug 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how does this work if all of the time values are null? The docs seem to indicate that if the time column is null we just take the first value, but i'm not completely sure what that means (maybe the docs are wrong?). I wonder if we should we maybe treat rows with a null timestamp as if the timestamp were Long.MAX_VALUE and update the value? Though, I guess in that case we would have trouble distinguishing null time and null row from the initialized state. I suppose we could be consistent if we made a more general behavior that if there are multiple values with the same timestamp we take the first non-null value we encounter, though I suspect that would require change with the non-vector aggs too.

Copy link
Contributor Author

@somu-imply somu-imply Aug 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So currently in case of non-null timestamps, we just take the first value. We iterate over the values and only update the first time if the current time is less than the earliest time. The issue arises when all timestamps are null. Considering the earliest where the time selector is __time, the chances of this happening are more when used on a secondary timestamp through earliest_by. In such a case, should we even return any results ? The docs point that

If expr comes from a relation with a timestamp column (like __time in a Druid datasource), the "earliest" is taken from the row with the overall earliest non-null value of the timestamp column.

@Override
public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
{
final long[] timeVector = timeSelector.getLongVector();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think we need to check the null vector here for the timeSelector

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed these

@Override
public void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset)
{
long[] timeVector = timeSelector.getLongVector();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment about null vector for time selector

if (timeSelector == null) {
return;
}
long[] times = timeSelector.getLongVector();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment about checking null vector of timeSelector

@Override
public void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset)
{
long[] timeVector = timeSelector.getLongVector();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment about checking null vector of timeSelector

Copy link
Member

@clintropolis clintropolis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think we should consider splitting the string aggregator from the pair aggregator in the future since it should make the handling of both a lot cleaner

@soumyava soumyava merged commit 8088a76 into apache:master Sep 5, 2023
74 checks passed
@soumyava
Copy link
Contributor

soumyava commented Sep 5, 2023

Will open a separate PR to go down two different paths for pair and not pair

@LakshSingla LakshSingla added this to the 28.0 milestone Oct 12, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants