Skip to content

Commit

Permalink
SQL: Plan non-equijoin conditions as cross join followed by filter. (#…
Browse files Browse the repository at this point in the history
…14978)

* SQL: Plan non-equijoin conditions as cross join followed by filter.

Druid has previously refused to execute joins with non-equality-based
conditions. This was well-intentioned: the idea was to push people to
write their queries in a different, hopefully more performant way.

But as we're moving towards fuller SQL support, it makes more sense to
allow these conditions to go through with the best plan we can come up
with: a cross join followed by a filter. In some cases this will allow
the query to run, and people will be happy with that. In other cases,
it will run into resource limits during execution. But we should at
least give the query a chance.

This patch also updates the documentation to explain how people can
tell whether their queries are being planned this way.

* cartesian is a word.

* Adjust tests.

* Update docs/querying/datasource.md

Co-authored-by: Benedict Jin <asdf2014@apache.org>

---------

Co-authored-by: Benedict Jin <asdf2014@apache.org>
  • Loading branch information
gianm and asdf2014 authored Sep 19, 2023
1 parent d459df8 commit 4f498e6
Show file tree
Hide file tree
Showing 7 changed files with 147 additions and 71 deletions.
36 changes: 20 additions & 16 deletions docs/querying/datasource.md
Original file line number Diff line number Diff line change
Expand Up @@ -320,12 +320,14 @@ Join datasources allow you to do a SQL-style join of two datasources. Stacking j
you to join arbitrarily many datasources.

In Druid {{DRUIDVERSION}}, joins in native queries are implemented with a broadcast hash-join algorithm. This means
that all datasources other than the leftmost "base" datasource must fit in memory. It also means that the join condition
must be an equality. This feature is intended mainly to allow joining regular Druid tables with [lookup](#lookup),
[inline](#inline), and [query](#query) datasources.
that all datasources other than the leftmost "base" datasource must fit in memory. In native queries, the join condition
must be an equality. In SQL, any join condition is accepted, but only equalities of a certain form
(see [Joins in SQL](#joins-in-sql)) execute as part of a native join. Other kinds of conditions execute as a cross join
(cartesian product) plus a filter.

Refer to the [Query execution](query-execution.md#join) page for more details on how queries are executed when you
use join datasources.
This feature is intended mainly to allow joining regular Druid tables with [lookup](#lookup), [inline](#inline), and
[query](#query) datasources. Refer to the [Query execution](query-execution.md#join) page for more details on how
queries are executed when you use join datasources.

#### Joins in SQL

Expand All @@ -335,21 +337,23 @@ SQL joins take the form:
<o1> [ INNER | LEFT [OUTER] ] JOIN <o2> ON <condition>
```

The condition must involve only equalities, but functions are okay, and there can be multiple equalities ANDed together.
Conditions like `t1.x = t2.x`, or `LOWER(t1.x) = t2.x`, or `t1.x = t2.x AND t1.y = t2.y` can all be handled. Conditions
like `t1.x <> t2.x` cannot currently be handled.
Any condition is accepted, but only certain kinds of conditions execute as part of a native join. To execute efficiently
as part of a native join, a condition must be a single clause like the following, or an `AND` of clauses like the
following:

Note that Druid SQL is less rigid than what native join datasources can handle. In cases where a SQL query does
something that is not allowed as-is with a native join datasource, Druid SQL will generate a subquery. This can have
a substantial effect on performance and scalability, so it is something to watch out for. Some examples of when the
SQL layer will generate subqueries include:
- Equality between fields of the same type on each side, like `t1 JOIN t2 ON t1.x = t2.x`.
- Equality between a function call on one side, and a field on the other side, like `t1 JOIN t2 ON LOWER(t1.x) = t2.x`.
- The equality operator may be `=` (which does not match nulls) or `IS NOT DISTINCT FROM` (which does match nulls).

- Joining a regular Druid table to itself, or to another regular Druid table. The native join datasource can accept
a table on the left-hand side, but not the right, so a subquery is needed.
In other cases, Druid will either insert a subquery below the join, or will use a cross join (cartesian product)
followed by a filter. Joins executed in these ways may run into resource or performance constraints. To determine
if your query is using one of these execution paths, run `EXPLAIN PLAN FOR <query>` and look for the following:

- Join conditions where the expressions on either side are of different types.
- `query` type datasources under the `left` or `right` key of your `join` datasource.
- `join` type datasource with `condition` set to `"1"` (cartesian product) followed by a `filter` that encodes the
condition you provided.

- Join conditions where the right-hand expression is not a direct column access.
In these cases, you may be able to improve the performance of your query by rewriting it.

For more information about how Druid translates SQL to native queries, refer to the
[Druid SQL](sql-translation.md) documentation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public void testExactCountDistinctWithFilter()

@Ignore
@Override
public void testUnplannableQueries()
public void testUnplannableScanOrderByNonTime()
{

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,15 @@ public class CalciteRulesManager
CoreRules.INTERSECT_TO_DISTINCT
);

/**
* Rules from Calcite that are not part of Calcite's standard set, but that we use anyway.
*/
private static final List<RelOptRule> EXTRA_CALCITE_RULES =
ImmutableList.of(
// Useful for planning funky join conditions as filters on top of cross joins.
CoreRules.JOIN_EXTRACT_FILTER
);

/**
* Rules from {@link org.apache.calcite.plan.RelOptRules#ABSTRACT_RELATIONAL_RULES}, minus:
*
Expand Down Expand Up @@ -340,6 +349,7 @@ public List<RelOptRule> baseRuleSet(final PlannerContext plannerContext)
rules.addAll(BASE_RULES);
rules.addAll(ABSTRACT_RULES);
rules.addAll(ABSTRACT_RELATIONAL_RULES);
rules.addAll(EXTRA_CALCITE_RULES);

if (plannerContext.getJoinAlgorithm().requiresSubquery()) {
rules.addAll(FANCY_JOIN_RULES);
Expand Down
135 changes: 106 additions & 29 deletions sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -3470,7 +3470,7 @@ public void testLeftJoinSubqueryWithNullKeyFilter(Map<String, Object> queryConte
// Cannot vectorize due to 'concat' expression.
cannotVectorize();

ScanQuery nullCompatibleModePlan = newScanQueryBuilder()
ScanQuery expectedQuery = newScanQueryBuilder()
.dataSource(
join(
new TableDataSource(CalciteTests.DATASOURCE1),
Expand All @@ -3496,33 +3496,6 @@ public void testLeftJoinSubqueryWithNullKeyFilter(Map<String, Object> queryConte
.context(queryContext)
.build();

ScanQuery nonNullCompatibleModePlan = newScanQueryBuilder()
.dataSource(
join(
new TableDataSource(CalciteTests.DATASOURCE1),
new QueryDataSource(
GroupByQuery
.builder()
.setDataSource(new LookupDataSource("lookyloo"))
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setVirtualColumns(
expressionVirtualColumn("v0", "concat(\"k\",'')", ColumnType.STRING)
)
.setDimensions(new DefaultDimensionSpec("v0", "d0"))
.build()
),
"j0.",
equalsCondition(makeColumnExpression("dim1"), makeColumnExpression("j0.d0")),
JoinType.LEFT
)
)
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("dim1", "j0.d0")
.filters(notNull("j0.d0"))
.context(queryContext)
.build();

boolean isJoinFilterRewriteEnabled = queryContext.getOrDefault(QueryContexts.JOIN_FILTER_REWRITE_ENABLE_KEY, true)
.toString()
.equals("true");
Expand All @@ -3532,7 +3505,7 @@ public void testLeftJoinSubqueryWithNullKeyFilter(Map<String, Object> queryConte
+ "LEFT JOIN (select k || '' as k from lookup.lookyloo group by 1) l1 ON foo.dim1 = l1.k\n"
+ "WHERE l1.k IS NOT NULL\n",
queryContext,
ImmutableList.of(NullHandling.sqlCompatible() ? nullCompatibleModePlan : nonNullCompatibleModePlan),
ImmutableList.of(expectedQuery),
NullHandling.sqlCompatible() || !isJoinFilterRewriteEnabled
? ImmutableList.of(new Object[]{"abc", "abc"})
: ImmutableList.of(
Expand Down Expand Up @@ -4443,6 +4416,110 @@ public void testCountDistinctOfLookupUsingJoinOperator(Map<String, Object> query
);
}

@Test
@Parameters(source = QueryContextForJoinProvider.class)
public void testJoinWithNonEquiCondition(Map<String, Object> queryContext)
{
// Native JOIN operator cannot handle the condition, so a SQL JOIN with greater-than is translated into a
// cross join with a filter.
cannotVectorize();

testQuery(
"SELECT x.m1, y.m1 FROM foo x INNER JOIN foo y ON x.m1 > y.m1",
queryContext,
ImmutableList.of(
newScanQueryBuilder()
.dataSource(
join(
new TableDataSource(CalciteTests.DATASOURCE1),
new QueryDataSource(
newScanQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("m1")
.context(queryContext)
.build()
),
"j0.",
"1",
JoinType.INNER
)
)
.intervals(querySegmentSpec(Filtration.eternity()))
.filters(expressionFilter("(\"m1\" > \"j0.m1\")"))
.columns("j0.m1", "m1")
.context(queryContext)
.build()
),
sortIfSortBased(
ImmutableList.of(
new Object[]{2.0f, 1.0f},
new Object[]{3.0f, 1.0f},
new Object[]{3.0f, 2.0f},
new Object[]{4.0f, 1.0f},
new Object[]{4.0f, 2.0f},
new Object[]{4.0f, 3.0f},
new Object[]{5.0f, 1.0f},
new Object[]{5.0f, 2.0f},
new Object[]{5.0f, 3.0f},
new Object[]{5.0f, 4.0f},
new Object[]{6.0f, 1.0f},
new Object[]{6.0f, 2.0f},
new Object[]{6.0f, 3.0f},
new Object[]{6.0f, 4.0f},
new Object[]{6.0f, 5.0f}
),
1,
0
)
);
}

@Test
@Parameters(source = QueryContextForJoinProvider.class)
public void testJoinWithEquiAndNonEquiCondition(Map<String, Object> queryContext)
{
// Native JOIN operator cannot handle the condition, so a SQL JOIN with greater-than is translated into a
// cross join with a filter.
cannotVectorize();

testQuery(
"SELECT x.m1, y.m1 FROM foo x INNER JOIN foo y ON x.m1 = y.m1 AND x.m1 + y.m1 = 6.0",
queryContext,
ImmutableList.of(
newScanQueryBuilder()
.dataSource(
join(
new TableDataSource(CalciteTests.DATASOURCE1),
new QueryDataSource(
newScanQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(querySegmentSpec(Filtration.eternity()))
.columns("m1")
.context(queryContext)
.build()
),
"j0.",
"1",
JoinType.INNER
)
)
.virtualColumns(expressionVirtualColumn("v0", "(\"m1\" + \"j0.m1\")", ColumnType.DOUBLE))
.intervals(querySegmentSpec(Filtration.eternity()))
.filters(
and(
expressionFilter("(\"m1\" == \"j0.m1\")"),
equality("v0", 6.0, ColumnType.DOUBLE)
)
)
.columns("j0.m1", "m1")
.context(queryContext)
.build()
),
ImmutableList.of(new Object[]{3.0f, 3.0f})
);
}

@Test
@Parameters(source = QueryContextForJoinProvider.class)
public void testUsingSubqueryAsPartOfAndFilter(Map<String, Object> queryContext)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5633,32 +5633,15 @@ public void testCountStarWithNotOfDegenerateFilter()
}

@Test
public void testUnplannableQueries()
public void testUnplannableScanOrderByNonTime()
{
// Scan can ORDER BY non-time in MSQ.
notMsqCompatible();
// All of these queries are unplannable because they rely on features Druid doesn't support.
// This test is here to confirm that we don't fall back to Calcite's interpreter or enumerable implementation.
// It's also here so when we do support these features, we can have "real" tests for these queries.

final Map<String, String> queries = ImmutableMap.of(
// SELECT query with order by non-__time.
assertQueryIsUnplannable(
"SELECT dim1 FROM druid.foo ORDER BY dim1",
"SQL query requires order by non-time column [[dim1 ASC]], which is not supported.",

// JOIN condition with not-equals (<>).
"SELECT foo.dim1, foo.dim2, l.k, l.v\n"
+ "FROM foo INNER JOIN lookup.lookyloo l ON foo.dim2 <> l.k",
"SQL requires a join with 'NOT_EQUALS' condition that is not supported.",

// JOIN condition with a function of both sides.
"SELECT foo.dim1, foo.dim2, l.k, l.v\n"
+ "FROM foo INNER JOIN lookup.lookyloo l ON CHARACTER_LENGTH(foo.dim2 || l.k) > 3\n",
"SQL requires a join with 'GREATER_THAN' condition that is not supported."
"SQL query requires order by non-time column [[dim1 ASC]], which is not supported."
);

for (final Map.Entry<String, String> queryErrorPair : queries.entrySet()) {
assertQueryIsUnplannable(queryErrorPair.getKey(), queryErrorPair.getValue());
}
}

@Test
Expand Down Expand Up @@ -5711,7 +5694,7 @@ public void testUnplannableTwoExactCountDistincts()
assertQueryIsUnplannable(
PLANNER_CONFIG_NO_HLL,
"SELECT dim2, COUNT(distinct dim1), COUNT(distinct dim2) FROM druid.foo GROUP BY dim2",
"SQL requires a join with 'IS_NOT_DISTINCT_FROM' condition that is not supported."
"SQL query requires 'IS NOT DISTINCT FROM' operator that is not supported."
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,21 +187,21 @@ public void testGroupByWithSortOnPostAggregationNoTopNContext()

@Override
@Ignore
public void testUnplannableQueries()
public void testUnplannableTwoExactCountDistincts()
{

}

@Override
@Ignore
public void testUnplannableTwoExactCountDistincts()
public void testUnplannableExactCountDistinctOnSketch()
{

}

@Override
@Ignore
public void testUnplannableExactCountDistinctOnSketch()
public void testUnplannableScanOrderByNonTime()
{

}
Expand Down Expand Up @@ -338,6 +338,7 @@ public void testFilterOnCurrentTimestampOnView()
{

}

// When run through decoupled, it expects
// dimensions=[DefaultDimensionSpec{dimension='dim2', outputName='d0', outputType='STRING'},
// DefaultDimensionSpec{dimension='dim1', outputName='d1', outputType='STRING'}]
Expand Down
1 change: 1 addition & 0 deletions website/.spelling
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ Base64
Base64-encoded
ByteBuffer
bottlenecked
cartesian
concat
CIDR
CORS
Expand Down

0 comments on commit 4f498e6

Please sign in to comment.