From 00f9d4a00abd50012c5d7283aa8362cf96c8d91b Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Wed, 13 Sep 2023 13:17:13 -0700 Subject: [PATCH 1/4] SQL: Plan non-equijoin conditions as cross join followed by filter. Druid has previously refused to execute joins with non-equality-based conditions. This was well-intentioned: the idea was to push people to write their queries in a different, hopefully more performant way. But as we're moving towards fuller SQL support, it makes more sense to allow these conditions to go through with the best plan we can come up with: a cross join followed by a filter. In some cases this will allow the query to run, and people will be happy with that. In other cases, it will run into resource limits during execution. But we should at least give the query a chance. This patch also updates the documentation to explain how people can tell whether their queries are being planned this way. --- docs/querying/datasource.md | 36 +++--- .../msq/test/CalciteSelectQueryMSQTest.java | 2 +- .../calcite/planner/CalciteRulesManager.java | 10 ++ .../sql/calcite/CalciteJoinQueryTest.java | 104 ++++++++++++++++++ .../druid/sql/calcite/CalciteQueryTest.java | 25 +---- .../DecoupledPlanningCalciteQueryTest.java | 7 +- 6 files changed, 143 insertions(+), 41 deletions(-) diff --git a/docs/querying/datasource.md b/docs/querying/datasource.md index ae87431587a1..947df3492efd 100644 --- a/docs/querying/datasource.md +++ b/docs/querying/datasource.md @@ -320,12 +320,14 @@ Join datasources allow you to do a SQL-style join of two datasources. Stacking j you to join arbitrarily many datasources. In Druid {{DRUIDVERSION}}, joins in native queries are implemented with a broadcast hash-join algorithm. This means -that all datasources other than the leftmost "base" datasource must fit in memory. It also means that the join condition -must be an equality. This feature is intended mainly to allow joining regular Druid tables with [lookup](#lookup), -[inline](#inline), and [query](#query) datasources. +that all datasources other than the leftmost "base" datasource must fit in memory. In native queries, the join condition +must be an equality. In SQL, any join condition is accepted, but only equalities of a certain form +(see [Joins in SQL](#joins-in-sql) execute as part of a native join. Other kinds of conditions execute as a cross join +(cartesian product) plus a filter. -Refer to the [Query execution](query-execution.md#join) page for more details on how queries are executed when you -use join datasources. +This feature is intended mainly to allow joining regular Druid tables with [lookup](#lookup), [inline](#inline), and +[query](#query) datasources. Refer to the [Query execution](query-execution.md#join) page for more details on how +queries are executed when you use join datasources. #### Joins in SQL @@ -335,21 +337,23 @@ SQL joins take the form: [ INNER | LEFT [OUTER] ] JOIN ON ``` -The condition must involve only equalities, but functions are okay, and there can be multiple equalities ANDed together. -Conditions like `t1.x = t2.x`, or `LOWER(t1.x) = t2.x`, or `t1.x = t2.x AND t1.y = t2.y` can all be handled. Conditions -like `t1.x <> t2.x` cannot currently be handled. +Any condition is accepted, but only certain kinds of conditions execute as part of a native join. To execute efficiently +as part of a native join, a condition must be a single clause like the following, or an `AND` of clauses like the +following: -Note that Druid SQL is less rigid than what native join datasources can handle. In cases where a SQL query does -something that is not allowed as-is with a native join datasource, Druid SQL will generate a subquery. This can have -a substantial effect on performance and scalability, so it is something to watch out for. Some examples of when the -SQL layer will generate subqueries include: +- Equality between fields of the same type on each side, like `t1 JOIN t2 ON t1.x = t2.x`. +- Equality between a function call on one side, and a field on the other side, like `t1 JOIN t2 ON LOWER(t1.x) = t2.x`. +- The equality operator may be `=` (which does not match nulls) or `IS NOT DISTINCT FROM` (which does match nulls). -- Joining a regular Druid table to itself, or to another regular Druid table. The native join datasource can accept -a table on the left-hand side, but not the right, so a subquery is needed. +In other cases, Druid will either insert a subquery below the join, or will use a cross join (cartesian product) +followed by a filter. Joins executed in these ways may run into resource or performance constraints. To determine +if your query is using one of these execution paths, run `EXPLAIN PLAN FOR ` and look for the following: -- Join conditions where the expressions on either side are of different types. +- `query` type datasources under the `left` or `right` key of your `join` datasource. +- `join` type datasource with `condition` set to `"1"` (cartesian product) followed by a `filter` that encodes the + condition you provided. -- Join conditions where the right-hand expression is not a direct column access. +In these cases, you may be able to improve the performance of your query by rewriting it. For more information about how Druid translates SQL to native queries, refer to the [Druid SQL](sql-translation.md) documentation. diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteSelectQueryMSQTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteSelectQueryMSQTest.java index 5ee3ba875388..d7131d0b7ef8 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteSelectQueryMSQTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteSelectQueryMSQTest.java @@ -144,7 +144,7 @@ public void testExactCountDistinctWithFilter() @Ignore @Override - public void testUnplannableQueries() + public void testUnplannableScanOrderByNonTime() { } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/CalciteRulesManager.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/CalciteRulesManager.java index 8d2f1103922b..a3a46de2ba57 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/CalciteRulesManager.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/CalciteRulesManager.java @@ -162,6 +162,15 @@ public class CalciteRulesManager CoreRules.INTERSECT_TO_DISTINCT ); + /** + * Rules from Calcite that are not part of Calcite's standard set, but that we use anyway. + */ + private static final List EXTRA_CALCITE_RULES = + ImmutableList.of( + // Useful for planning funky join conditions as filters on top of cross joins. + CoreRules.JOIN_EXTRACT_FILTER + ); + /** * Rules from {@link org.apache.calcite.plan.RelOptRules#ABSTRACT_RELATIONAL_RULES}, minus: * @@ -340,6 +349,7 @@ public List baseRuleSet(final PlannerContext plannerContext) rules.addAll(BASE_RULES); rules.addAll(ABSTRACT_RULES); rules.addAll(ABSTRACT_RELATIONAL_RULES); + rules.addAll(EXTRA_CALCITE_RULES); if (plannerContext.getJoinAlgorithm().requiresSubquery()) { rules.addAll(FANCY_JOIN_RULES); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java index a19011d9bdc7..26cd0318db35 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java @@ -4443,6 +4443,110 @@ public void testCountDistinctOfLookupUsingJoinOperator(Map query ); } + @Test + @Parameters(source = QueryContextForJoinProvider.class) + public void testJoinWithNonEquiCondition(Map queryContext) + { + // Native JOIN operator cannot handle the condition, so a SQL JOIN with greater-than is translated into a + // cross join with a filter. + cannotVectorize(); + + testQuery( + "SELECT x.m1, y.m1 FROM foo x INNER JOIN foo y ON x.m1 > y.m1", + queryContext, + ImmutableList.of( + newScanQueryBuilder() + .dataSource( + join( + new TableDataSource(CalciteTests.DATASOURCE1), + new QueryDataSource( + newScanQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(querySegmentSpec(Filtration.eternity())) + .columns("m1") + .context(queryContext) + .build() + ), + "j0.", + "1", + JoinType.INNER + ) + ) + .intervals(querySegmentSpec(Filtration.eternity())) + .filters(expressionFilter("(\"m1\" > \"j0.m1\")")) + .columns("j0.m1", "m1") + .context(queryContext) + .build() + ), + sortIfSortBased( + ImmutableList.of( + new Object[]{2.0f, 1.0f}, + new Object[]{3.0f, 1.0f}, + new Object[]{3.0f, 2.0f}, + new Object[]{4.0f, 1.0f}, + new Object[]{4.0f, 2.0f}, + new Object[]{4.0f, 3.0f}, + new Object[]{5.0f, 1.0f}, + new Object[]{5.0f, 2.0f}, + new Object[]{5.0f, 3.0f}, + new Object[]{5.0f, 4.0f}, + new Object[]{6.0f, 1.0f}, + new Object[]{6.0f, 2.0f}, + new Object[]{6.0f, 3.0f}, + new Object[]{6.0f, 4.0f}, + new Object[]{6.0f, 5.0f} + ), + 1, + 0 + ) + ); + } + + @Test + @Parameters(source = QueryContextForJoinProvider.class) + public void testJoinWithEquiAndNonEquiCondition(Map queryContext) + { + // Native JOIN operator cannot handle the condition, so a SQL JOIN with greater-than is translated into a + // cross join with a filter. + cannotVectorize(); + + testQuery( + "SELECT x.m1, y.m1 FROM foo x INNER JOIN foo y ON x.m1 = y.m1 AND x.m1 + y.m1 = 6", + queryContext, + ImmutableList.of( + newScanQueryBuilder() + .dataSource( + join( + new TableDataSource(CalciteTests.DATASOURCE1), + new QueryDataSource( + newScanQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(querySegmentSpec(Filtration.eternity())) + .columns("m1") + .context(queryContext) + .build() + ), + "j0.", + "1", + JoinType.INNER + ) + ) + .virtualColumns(expressionVirtualColumn("v0", "(\"m1\" + \"j0.m1\")", ColumnType.FLOAT)) + .intervals(querySegmentSpec(Filtration.eternity())) + .filters( + and( + expressionFilter("(\"m1\" == \"j0.m1\")"), + equality("v0", 6f, ColumnType.FLOAT) + ) + ) + .columns("j0.m1", "m1") + .context(queryContext) + .build() + ), + ImmutableList.of(new Object[]{3.0f, 3.0f}) + ); + } + @Test @Parameters(source = QueryContextForJoinProvider.class) public void testUsingSubqueryAsPartOfAndFilter(Map queryContext) diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java index 582408003719..459681ebefec 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java @@ -5633,32 +5633,15 @@ public void testCountStarWithNotOfDegenerateFilter() } @Test - public void testUnplannableQueries() + public void testUnplannableScanOrderByNonTime() { + // Scan can ORDER BY non-time in MSQ. notMsqCompatible(); - // All of these queries are unplannable because they rely on features Druid doesn't support. - // This test is here to confirm that we don't fall back to Calcite's interpreter or enumerable implementation. - // It's also here so when we do support these features, we can have "real" tests for these queries. - final Map queries = ImmutableMap.of( - // SELECT query with order by non-__time. + assertQueryIsUnplannable( "SELECT dim1 FROM druid.foo ORDER BY dim1", - "SQL query requires order by non-time column [[dim1 ASC]], which is not supported.", - - // JOIN condition with not-equals (<>). - "SELECT foo.dim1, foo.dim2, l.k, l.v\n" - + "FROM foo INNER JOIN lookup.lookyloo l ON foo.dim2 <> l.k", - "SQL requires a join with 'NOT_EQUALS' condition that is not supported.", - - // JOIN condition with a function of both sides. - "SELECT foo.dim1, foo.dim2, l.k, l.v\n" - + "FROM foo INNER JOIN lookup.lookyloo l ON CHARACTER_LENGTH(foo.dim2 || l.k) > 3\n", - "SQL requires a join with 'GREATER_THAN' condition that is not supported." + "SQL query requires order by non-time column [[dim1 ASC]], which is not supported." ); - - for (final Map.Entry queryErrorPair : queries.entrySet()) { - assertQueryIsUnplannable(queryErrorPair.getKey(), queryErrorPair.getValue()); - } } @Test diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/DecoupledPlanningCalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/DecoupledPlanningCalciteQueryTest.java index 7d7559f85279..c1552b0cfb85 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/DecoupledPlanningCalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/DecoupledPlanningCalciteQueryTest.java @@ -187,21 +187,21 @@ public void testGroupByWithSortOnPostAggregationNoTopNContext() @Override @Ignore - public void testUnplannableQueries() + public void testUnplannableTwoExactCountDistincts() { } @Override @Ignore - public void testUnplannableTwoExactCountDistincts() + public void testUnplannableExactCountDistinctOnSketch() { } @Override @Ignore - public void testUnplannableExactCountDistinctOnSketch() + public void testUnplannableScanOrderByNonTime() { } @@ -338,6 +338,7 @@ public void testFilterOnCurrentTimestampOnView() { } + // When run through decoupled, it expects // dimensions=[DefaultDimensionSpec{dimension='dim2', outputName='d0', outputType='STRING'}, // DefaultDimensionSpec{dimension='dim1', outputName='d1', outputType='STRING'}] From f8801733d3db6efdf5e56956a61da2bbe2052ba5 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Wed, 13 Sep 2023 13:36:11 -0700 Subject: [PATCH 2/4] cartesian is a word. --- website/.spelling | 1 + 1 file changed, 1 insertion(+) diff --git a/website/.spelling b/website/.spelling index 10751d69530a..4333cf5dc074 100644 --- a/website/.spelling +++ b/website/.spelling @@ -43,6 +43,7 @@ Base64 Base64-encoded ByteBuffer bottlenecked +cartesian concat CIDR CORS From 8a5292e5cb74be3b4325625b527b1dba357db6f3 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Thu, 14 Sep 2023 10:38:12 -0700 Subject: [PATCH 3/4] Adjust tests. --- .../sql/calcite/CalciteJoinQueryTest.java | 37 +++---------------- .../druid/sql/calcite/CalciteQueryTest.java | 2 +- 2 files changed, 6 insertions(+), 33 deletions(-) diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java index 26cd0318db35..d0d7935a334d 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java @@ -3470,7 +3470,7 @@ public void testLeftJoinSubqueryWithNullKeyFilter(Map queryConte // Cannot vectorize due to 'concat' expression. cannotVectorize(); - ScanQuery nullCompatibleModePlan = newScanQueryBuilder() + ScanQuery expectedQuery = newScanQueryBuilder() .dataSource( join( new TableDataSource(CalciteTests.DATASOURCE1), @@ -3496,33 +3496,6 @@ public void testLeftJoinSubqueryWithNullKeyFilter(Map queryConte .context(queryContext) .build(); - ScanQuery nonNullCompatibleModePlan = newScanQueryBuilder() - .dataSource( - join( - new TableDataSource(CalciteTests.DATASOURCE1), - new QueryDataSource( - GroupByQuery - .builder() - .setDataSource(new LookupDataSource("lookyloo")) - .setInterval(querySegmentSpec(Filtration.eternity())) - .setGranularity(Granularities.ALL) - .setVirtualColumns( - expressionVirtualColumn("v0", "concat(\"k\",'')", ColumnType.STRING) - ) - .setDimensions(new DefaultDimensionSpec("v0", "d0")) - .build() - ), - "j0.", - equalsCondition(makeColumnExpression("dim1"), makeColumnExpression("j0.d0")), - JoinType.LEFT - ) - ) - .intervals(querySegmentSpec(Filtration.eternity())) - .columns("dim1", "j0.d0") - .filters(notNull("j0.d0")) - .context(queryContext) - .build(); - boolean isJoinFilterRewriteEnabled = queryContext.getOrDefault(QueryContexts.JOIN_FILTER_REWRITE_ENABLE_KEY, true) .toString() .equals("true"); @@ -3532,7 +3505,7 @@ public void testLeftJoinSubqueryWithNullKeyFilter(Map queryConte + "LEFT JOIN (select k || '' as k from lookup.lookyloo group by 1) l1 ON foo.dim1 = l1.k\n" + "WHERE l1.k IS NOT NULL\n", queryContext, - ImmutableList.of(NullHandling.sqlCompatible() ? nullCompatibleModePlan : nonNullCompatibleModePlan), + ImmutableList.of(expectedQuery), NullHandling.sqlCompatible() || !isJoinFilterRewriteEnabled ? ImmutableList.of(new Object[]{"abc", "abc"}) : ImmutableList.of( @@ -4511,7 +4484,7 @@ public void testJoinWithEquiAndNonEquiCondition(Map queryContext cannotVectorize(); testQuery( - "SELECT x.m1, y.m1 FROM foo x INNER JOIN foo y ON x.m1 = y.m1 AND x.m1 + y.m1 = 6", + "SELECT x.m1, y.m1 FROM foo x INNER JOIN foo y ON x.m1 = y.m1 AND x.m1 + y.m1 = 6.0", queryContext, ImmutableList.of( newScanQueryBuilder() @@ -4531,12 +4504,12 @@ public void testJoinWithEquiAndNonEquiCondition(Map queryContext JoinType.INNER ) ) - .virtualColumns(expressionVirtualColumn("v0", "(\"m1\" + \"j0.m1\")", ColumnType.FLOAT)) + .virtualColumns(expressionVirtualColumn("v0", "(\"m1\" + \"j0.m1\")", ColumnType.DOUBLE)) .intervals(querySegmentSpec(Filtration.eternity())) .filters( and( expressionFilter("(\"m1\" == \"j0.m1\")"), - equality("v0", 6f, ColumnType.FLOAT) + equality("v0", 6.0, ColumnType.DOUBLE) ) ) .columns("j0.m1", "m1") diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java index db7f1c7ed401..042b17368278 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java @@ -5694,7 +5694,7 @@ public void testUnplannableTwoExactCountDistincts() assertQueryIsUnplannable( PLANNER_CONFIG_NO_HLL, "SELECT dim2, COUNT(distinct dim1), COUNT(distinct dim2) FROM druid.foo GROUP BY dim2", - "SQL requires a join with 'IS_NOT_DISTINCT_FROM' condition that is not supported." + "SQL query requires 'IS NOT DISTINCT FROM' operator that is not supported." ); } From 6e35a147e2e84b734564fc399030f3520f89af99 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Fri, 15 Sep 2023 08:37:58 -0700 Subject: [PATCH 4/4] Update docs/querying/datasource.md Co-authored-by: Benedict Jin --- docs/querying/datasource.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/querying/datasource.md b/docs/querying/datasource.md index 947df3492efd..333461b729c0 100644 --- a/docs/querying/datasource.md +++ b/docs/querying/datasource.md @@ -322,7 +322,7 @@ you to join arbitrarily many datasources. In Druid {{DRUIDVERSION}}, joins in native queries are implemented with a broadcast hash-join algorithm. This means that all datasources other than the leftmost "base" datasource must fit in memory. In native queries, the join condition must be an equality. In SQL, any join condition is accepted, but only equalities of a certain form -(see [Joins in SQL](#joins-in-sql) execute as part of a native join. Other kinds of conditions execute as a cross join +(see [Joins in SQL](#joins-in-sql)) execute as part of a native join. Other kinds of conditions execute as a cross join (cartesian product) plus a filter. This feature is intended mainly to allow joining regular Druid tables with [lookup](#lookup), [inline](#inline), and