From 1b4297ab98a4f0571dbf4452c156eec00abd7419 Mon Sep 17 00:00:00 2001 From: Soumyava Das Date: Thu, 3 Aug 2023 19:37:55 -0700 Subject: [PATCH 1/3] Latest aggregator factorizes should accept time as VectorValueSelector instead of BaseLongVectorValueSelector --- .../first/StringFirstLastUtils.java | 4 +-- .../last/DoubleLastAggregatorFactory.java | 5 ++- .../last/FloatLastAggregatorFactory.java | 5 ++- .../last/LongLastAggregatorFactory.java | 3 +- .../last/StringLastAggregatorFactory.java | 4 +-- .../last/StringLastVectorAggregator.java | 6 ++-- .../druid/sql/calcite/CalciteQueryTest.java | 32 +++++++++++++++++++ 7 files changed, 44 insertions(+), 15 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java index 6b93be7d7080..3a9b8818cd0b 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java @@ -27,8 +27,8 @@ import org.apache.druid.segment.NilColumnValueSelector; import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.column.ValueType; -import org.apache.druid.segment.vector.BaseLongVectorValueSelector; import org.apache.druid.segment.vector.VectorObjectSelector; +import org.apache.druid.segment.vector.VectorValueSelector; import javax.annotation.Nullable; import java.nio.ByteBuffer; @@ -79,7 +79,7 @@ public static boolean objectNeedsFoldCheck(Object obj) * index of bounds issues is the responsibility of the caller */ public static SerializablePairLongString readPairFromVectorSelectorsAtIndex( - BaseLongVectorValueSelector timeSelector, + VectorValueSelector timeSelector, VectorObjectSelector valueSelector, int index ) diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/last/DoubleLastAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/last/DoubleLastAggregatorFactory.java index 5e3fa6667928..ee6c330249d0 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/last/DoubleLastAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/last/DoubleLastAggregatorFactory.java @@ -43,7 +43,6 @@ import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.column.ColumnType; -import org.apache.druid.segment.vector.BaseLongVectorValueSelector; import org.apache.druid.segment.vector.VectorColumnSelectorFactory; import org.apache.druid.segment.vector.VectorValueSelector; @@ -129,8 +128,8 @@ public VectorAggregator factorizeVector( { ColumnCapabilities capabilities = columnSelectorFactory.getColumnCapabilities(fieldName); VectorValueSelector valueSelector = columnSelectorFactory.makeValueSelector(fieldName); - //time is always long - BaseLongVectorValueSelector timeSelector = (BaseLongVectorValueSelector) columnSelectorFactory.makeValueSelector( + + VectorValueSelector timeSelector = columnSelectorFactory.makeValueSelector( timeColumn); if (capabilities == null || capabilities.isNumeric()) { return new DoubleLastVectorAggregator(timeSelector, valueSelector); diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/last/FloatLastAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/last/FloatLastAggregatorFactory.java index ff23c3d96dc4..f9fb0b3a4c4d 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/last/FloatLastAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/last/FloatLastAggregatorFactory.java @@ -43,7 +43,6 @@ import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.column.ColumnType; -import org.apache.druid.segment.vector.BaseLongVectorValueSelector; import org.apache.druid.segment.vector.VectorColumnSelectorFactory; import org.apache.druid.segment.vector.VectorValueSelector; @@ -141,8 +140,8 @@ public VectorAggregator factorizeVector( { ColumnCapabilities capabilities = columnSelectorFactory.getColumnCapabilities(fieldName); VectorValueSelector valueSelector = columnSelectorFactory.makeValueSelector(fieldName); - //time is always long - BaseLongVectorValueSelector timeSelector = (BaseLongVectorValueSelector) columnSelectorFactory.makeValueSelector( + + VectorValueSelector timeSelector = columnSelectorFactory.makeValueSelector( timeColumn); if (capabilities == null || capabilities.isNumeric()) { return new FloatLastVectorAggregator(timeSelector, valueSelector); diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/last/LongLastAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/last/LongLastAggregatorFactory.java index a5304fe10927..c08967461879 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/last/LongLastAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/last/LongLastAggregatorFactory.java @@ -42,7 +42,6 @@ import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.column.ColumnType; -import org.apache.druid.segment.vector.BaseLongVectorValueSelector; import org.apache.druid.segment.vector.VectorColumnSelectorFactory; import org.apache.druid.segment.vector.VectorValueSelector; @@ -140,7 +139,7 @@ public VectorAggregator factorizeVector( { ColumnCapabilities capabilities = columnSelectorFactory.getColumnCapabilities(fieldName); VectorValueSelector valueSelector = columnSelectorFactory.makeValueSelector(fieldName); - BaseLongVectorValueSelector timeSelector = (BaseLongVectorValueSelector) columnSelectorFactory.makeValueSelector( + VectorValueSelector timeSelector = columnSelectorFactory.makeValueSelector( timeColumn); if (capabilities == null || capabilities.isNumeric()) { return new LongLastVectorAggregator(timeSelector, valueSelector); diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregatorFactory.java index e1b39edc4add..2f135f96cca1 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregatorFactory.java @@ -42,9 +42,9 @@ import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.column.ColumnType; -import org.apache.druid.segment.vector.BaseLongVectorValueSelector; import org.apache.druid.segment.vector.VectorColumnSelectorFactory; import org.apache.druid.segment.vector.VectorObjectSelector; +import org.apache.druid.segment.vector.VectorValueSelector; import javax.annotation.Nullable; import java.nio.ByteBuffer; @@ -159,7 +159,7 @@ public VectorAggregator factorizeVector(VectorColumnSelectorFactory selectorFact ColumnCapabilities capabilities = selectorFactory.getColumnCapabilities(fieldName); VectorObjectSelector vSelector = selectorFactory.makeObjectSelector(fieldName); - BaseLongVectorValueSelector timeSelector = (BaseLongVectorValueSelector) selectorFactory.makeValueSelector( + VectorValueSelector timeSelector = selectorFactory.makeValueSelector( timeColumn); if (capabilities != null) { return new StringLastVectorAggregator(timeSelector, vSelector, maxStringBytes); diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastVectorAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastVectorAggregator.java index f10584a2c748..a9c0b1e9ade1 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastVectorAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastVectorAggregator.java @@ -24,8 +24,8 @@ import org.apache.druid.query.aggregation.VectorAggregator; import org.apache.druid.query.aggregation.first.StringFirstLastUtils; import org.apache.druid.segment.DimensionHandlerUtils; -import org.apache.druid.segment.vector.BaseLongVectorValueSelector; import org.apache.druid.segment.vector.VectorObjectSelector; +import org.apache.druid.segment.vector.VectorValueSelector; import javax.annotation.Nullable; import java.nio.ByteBuffer; @@ -36,13 +36,13 @@ public class StringLastVectorAggregator implements VectorAggregator DateTimes.MIN.getMillis(), null ); - private final BaseLongVectorValueSelector timeSelector; + private final VectorValueSelector timeSelector; private final VectorObjectSelector valueSelector; private final int maxStringBytes; protected long lastTime; public StringLastVectorAggregator( - @Nullable final BaseLongVectorValueSelector timeSelector, + @Nullable final VectorValueSelector timeSelector, final VectorObjectSelector valueSelector, final int maxStringBytes ) diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java index 87db26ea5a30..9fabf77b3634 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java @@ -853,6 +853,38 @@ public void testAnyAggregatorsOnHeapNumericNulls() ); } + // This test is to check if expressions are accepted properly by the vectorized last aggregator + @Test + public void testLatestVectorAggregatorsOnExpression() + { + notMsqCompatible(); + testQuery( + "SELECT \n" + + " LATEST_BY(m1, MILLIS_TO_TIMESTAMP(BITWISE_SHIFT_RIGHT(TIMESTAMP_TO_MILLIS(__time), 3)))\n" + + " FROM druid.foo GROUP BY TIME_FLOOR(__time, 'P1Y', null, 'America/Los_Angeles')", + ImmutableList.of( + Druids.newTimeseriesQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(querySegmentSpec(Filtration.eternity())) + .granularity(new PeriodGranularity(Period.years(1), null, DateTimes.inferTzFromString(LOS_ANGELES))) + .virtualColumns( + expressionVirtualColumn("v1", "bitwiseShiftRight(\"__time\",3)", ColumnType.LONG) + ) + .aggregators( + aggregators( + new FloatLastAggregatorFactory("a0", "m1", "v1") + ) + ) + .context(QUERY_CONTEXT_DEFAULT) + .build() + ), + ImmutableList.of( + new Object[]{1.0f}, + new Object[]{4.0f}, + new Object[]{6.0f} + ) + ); + } // This test the off-heap (buffer) version of the AnyAggregator (Double/Float/Long) against numeric columns // that have null values (when run in SQL compatible null mode) @Test From 081b7e10b7a11b57be256f9a56bd587d773df736 Mon Sep 17 00:00:00 2001 From: Soumyava <93540295+somu-imply@users.noreply.github.com> Date: Thu, 3 Aug 2023 20:46:24 -0700 Subject: [PATCH 2/3] Update sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java Co-authored-by: Abhishek Agarwal <1477457+abhishekagarwal87@users.noreply.github.com> --- .../java/org/apache/druid/sql/calcite/CalciteQueryTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java index 9fabf77b3634..c89019571b66 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java @@ -853,7 +853,7 @@ public void testAnyAggregatorsOnHeapNumericNulls() ); } - // This test is to check if expressions are accepted properly by the vectorized last aggregator + // This test is to check if time expressions are accepted properly by the vectorized last aggregator @Test public void testLatestVectorAggregatorsOnExpression() { From 1e39f8dfecb4e5b88b17849dfd63f2b041352998 Mon Sep 17 00:00:00 2001 From: Soumyava <93540295+somu-imply@users.noreply.github.com> Date: Thu, 3 Aug 2023 20:46:29 -0700 Subject: [PATCH 3/3] Update sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java Co-authored-by: Abhishek Agarwal <1477457+abhishekagarwal87@users.noreply.github.com> --- .../java/org/apache/druid/sql/calcite/CalciteQueryTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java index c89019571b66..e14be37e98b0 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java @@ -855,7 +855,7 @@ public void testAnyAggregatorsOnHeapNumericNulls() // This test is to check if time expressions are accepted properly by the vectorized last aggregator @Test - public void testLatestVectorAggregatorsOnExpression() + public void testLatestVectorAggregatorsOnTimeExpression() { notMsqCompatible(); testQuery(