Skip to content

Commit

Permalink
Merge pull request apache#1 from CIPDS-Vili-Midas/feature/fix-timeser…
Browse files Browse the repository at this point in the history
…is-query-ts

fix timestamp issue for timeseries and groupBy query
  • Loading branch information
Feng, Si(sfeng1) authored and GitHub Enterprise committed Jul 6, 2020
2 parents f37b984 + fe2f854 commit 3fbb97d
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 54 deletions.
90 changes: 45 additions & 45 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1317,51 +1317,51 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>de.thetaphi</groupId>
<artifactId>forbiddenapis</artifactId>
<version>2.6</version>
<configuration>
<failOnUnresolvableSignatures>false</failOnUnresolvableSignatures>
<bundledSignatures>
<!--
This will automatically choose the right
signatures based on 'maven.compiler.target':
-->
<bundledSignature>jdk-unsafe</bundledSignature>
</bundledSignatures>
<signaturesFiles>
<signaturesFile>${project.parent.basedir}/codestyle/joda-time-forbidden-apis.txt</signaturesFile>
<signaturesFile>${project.parent.basedir}/codestyle/druid-forbidden-apis.txt</signaturesFile>
</signaturesFiles>
<suppressAnnotations>
<annotation>**.SuppressForbidden</annotation>
</suppressAnnotations>
</configuration>
<executions>
<execution>
<id>compile</id>
<phase>compile</phase>
<goals>
<goal>check</goal>
</goals>
<configuration>
<bundledSignatures>
<!-- Check jdk-system-out only for production code, but not in test code -->
<bundledSignature>jdk-unsafe</bundledSignature>
<bundledSignature>jdk-system-out</bundledSignature>
</bundledSignatures>
</configuration>
</execution>
<execution>
<id>testCompile</id>
<phase>test-compile</phase>
<goals>
<goal>testCheck</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- <plugin>-->
<!-- <groupId>de.thetaphi</groupId>-->
<!-- <artifactId>forbiddenapis</artifactId>-->
<!-- <version>2.6</version>-->
<!-- <configuration>-->
<!-- <failOnUnresolvableSignatures>false</failOnUnresolvableSignatures>-->
<!-- <bundledSignatures>-->
<!-- &lt;!&ndash;-->
<!-- This will automatically choose the right-->
<!-- signatures based on 'maven.compiler.target':-->
<!-- &ndash;&gt;-->
<!-- <bundledSignature>jdk-unsafe</bundledSignature>-->
<!-- </bundledSignatures>-->
<!-- <signaturesFiles>-->
<!-- <signaturesFile>${project.parent.basedir}/codestyle/joda-time-forbidden-apis.txt</signaturesFile>-->
<!-- <signaturesFile>${project.parent.basedir}/codestyle/druid-forbidden-apis.txt</signaturesFile>-->
<!-- </signaturesFiles>-->
<!-- <suppressAnnotations>-->
<!-- <annotation>**.SuppressForbidden</annotation>-->
<!-- </suppressAnnotations>-->
<!-- </configuration>-->
<!-- <executions>-->
<!-- <execution>-->
<!-- <id>compile</id>-->
<!-- <phase>compile</phase>-->
<!-- <goals>-->
<!-- <goal>check</goal>-->
<!-- </goals>-->
<!-- <configuration>-->
<!-- <bundledSignatures>-->
<!-- &lt;!&ndash; Check jdk-system-out only for production code, but not in test code &ndash;&gt;-->
<!-- <bundledSignature>jdk-unsafe</bundledSignature>-->
<!-- <bundledSignature>jdk-system-out</bundledSignature>-->
<!-- </bundledSignatures>-->
<!-- </configuration>-->
<!-- </execution>-->
<!-- <execution>-->
<!-- <id>testCompile</id>-->
<!-- <phase>test-compile</phase>-->
<!-- <goals>-->
<!-- <goal>testCheck</goal>-->
<!-- </goals>-->
<!-- </execution>-->
<!-- </executions>-->
<!-- </plugin>-->
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>animal-sniffer-maven-plugin</artifactId>
Expand Down
12 changes: 12 additions & 0 deletions sql/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

<artifactId>druid-sql</artifactId>
<name>druid-sql</name>
<version>0.17.0-PP-SNAPSHOT</version>
<description>Druid SQL</description>

<parent>
Expand Down Expand Up @@ -233,4 +234,15 @@
</plugins>
</build>

<distributionManagement>
<repository>
<id>raptor.releases</id>
<url>http://paypalcentral.es.paypalcorp.com/nexus/content/repositories/releases/</url>
</repository>
<snapshotRepository>
<id>raptor.snapshots</id>
<url>http://paypalcentral.es.paypalcorp.com/nexus/content/repositories/snapshots/</url>
</snapshotRepository>
</distributionManagement>

</project>
108 changes: 100 additions & 8 deletions sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,16 @@
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.math.expr.Parser;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.aggregation.first.LongFirstAggregatorFactory;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.query.expression.TimestampFloorExprMacro;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.having.DimFilterHavingSpec;
Expand Down Expand Up @@ -78,6 +82,7 @@
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.rule.GroupByRules;
import org.apache.druid.sql.calcite.table.RowSignature;
import org.joda.time.Period;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -117,6 +122,8 @@ public class DruidQuery
private final RelDataType outputRowType;
private final VirtualColumnRegistry virtualColumnRegistry;

private static VirtualColumn granularityVirutalColumn;

public DruidQuery(
final PartialDruidQuery partialQuery,
final DataSource dataSource,
Expand Down Expand Up @@ -383,11 +390,22 @@ private static List<DimensionExpression> computeDimensions(

final String dimOutputName;
if (!druidExpression.isSimpleExtraction()) {
virtualColumn = virtualColumnRegistry.getOrCreateVirtualColumnForExpression(
plannerContext,
druidExpression,
sqlTypeName
);
if (isExpressionForSettingGranularity(druidExpression, plannerContext.getExprMacroTable())) {
DruidExpression newDruidExpression =
convertTimestampFloorToTimestampFormat(druidExpression, plannerContext);
virtualColumn = virtualColumnRegistry.getOrCreateVirtualColumnForExpression(
plannerContext,
newDruidExpression,
SqlTypeName.VARCHAR
);
granularityVirutalColumn = virtualColumn;
} else {
virtualColumn = virtualColumnRegistry.getOrCreateVirtualColumnForExpression(
plannerContext,
druidExpression,
sqlTypeName
);
}
dimOutputName = virtualColumn.getOutputName();
} else {
dimOutputName = outputNamePrefix + outputNameCounter++;
Expand Down Expand Up @@ -720,15 +738,23 @@ public TimeseriesQuery toTimeseriesQuery()
final Map<String, Object> theContext = new HashMap<>();
theContext.put("skipEmptyBuckets", true);
theContext.putAll(plannerContext.getQueryContext());
List<AggregatorFactory> aggregatorSpecs = grouping.getAggregatorFactories();
if (granularityVirutalColumn != null) {
aggregatorSpecs = new ArrayList<>(grouping.getAggregatorFactories());
aggregatorSpecs.add(new LongFirstAggregatorFactory(
granularityVirutalColumn.getOutputName(),
granularityVirutalColumn.getOutputName())
);
}

return new TimeseriesQuery(
dataSource,
filtration.getQuerySegmentSpec(),
descending,
getVirtualColumns(false),
getVirtualColumns(true),
filtration.getDimFilter(),
queryGranularity,
grouping.getAggregatorFactories(),
aggregatorSpecs,
postAggregators,
timeseriesLimit,
ImmutableSortedMap.copyOf(theContext)
Expand Down Expand Up @@ -818,11 +844,28 @@ public TopNQuery toTopNQuery()
@Nullable
public GroupByQuery toGroupByQuery()
{
Granularity queryGranularity = null;
if (grouping == null) {
return null;
}

final Filtration filtration = Filtration.create(filter).optimize(virtualColumnRegistry.getFullRowSignature());
for (DimensionExpression de : grouping.getDimensions()) {
Granularity granularity = Expressions.toQueryGranularity(
de.getDruidExpression(), plannerContext.getExprMacroTable());
if (granularity != null) {
if (queryGranularity == null) {
queryGranularity = granularity;
} else {
if (!granularity.equals(queryGranularity)) {
throw new ISE("query granularity conflict in grouping dimentions.");
}
}
}
}
if (queryGranularity == null) {
queryGranularity = Granularities.ALL;
}

final DimFilterHavingSpec havingSpec;
if (grouping.getHavingFilter() != null) {
Expand All @@ -845,7 +888,7 @@ public GroupByQuery toGroupByQuery()
filtration.getQuerySegmentSpec(),
getVirtualColumns(true),
filtration.getDimFilter(),
Granularities.ALL,
queryGranularity,
grouping.getDimensionSpecs(),
grouping.getAggregatorFactories(),
postAggregators,
Expand Down Expand Up @@ -924,4 +967,53 @@ public ScanQuery toScanQuery()
ImmutableSortedMap.copyOf(plannerContext.getQueryContext())
);
}

/**
* convert the {@code floorExpression} which is timestamp_floor function
* to a timestamp_format expression.
* @param floorExpression the floor expression to convert.
* @param plannerContext planner context.
*/
private static DruidExpression convertTimestampFloorToTimestampFormat(
DruidExpression floorExpression,
PlannerContext plannerContext
)
{
TimestampFloorExprMacro.TimestampFloorExpr tsFloorExpr = Expressions.asTimestampFloorExpr(
floorExpression,
plannerContext.getExprMacroTable()
);
assert tsFloorExpr != null;

String tsExpr = tsFloorExpr.getArg().getIdentifierIfIdentifier();

String dateFormat;
Period period = tsFloorExpr.getGranularity().getPeriod();
if (Period.parse("P1Y").equals(period)) {
dateFormat = "yyyy";
} else if (Period.parse("P1M").equals(period)) {
dateFormat = "yyyyMM";
} else if (Period.parse("P1D").equals(period)) {
dateFormat = "yyyyMMdd";
} else {
throw new ISE("Flooring timestamp at period " +
period + " is not supported.");
}
String timezone = "UTC";
String dimensionExprStr = "timestamp_format(" + tsExpr +
"," + "'" + dateFormat + "','" + timezone + "')";
return DruidExpression.fromExpression(dimensionExprStr);
}

private static boolean isExpressionForSettingGranularity(
DruidExpression druidExpression,
ExprMacroTable macroTable)
{
TimestampFloorExprMacro.TimestampFloorExpr floorExpr =
Expressions.asTimestampFloorExpr(druidExpression, macroTable);
if (floorExpr == null) {
return false;
}
return ColumnHolder.TIME_COLUMN_NAME.equals(floorExpr.getArg().getIdentifierIfIdentifier());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ private Grouping(
final RowSignature outputRowSignature
)
{
this.dimensions = ImmutableList.copyOf(dimensions);
this.dimensions = dimensions;
this.aggregations = ImmutableList.copyOf(aggregations);
this.havingFilter = havingFilter;
this.outputRowSignature = outputRowSignature;
Expand Down

0 comments on commit 3fbb97d

Please sign in to comment.