Skip to content

Commit

Permalink
[MSQ] Handle dimensionless group by queries with partitioning, and mu…
Browse files Browse the repository at this point in the history
…ltiple workers (#14678)

* fixup

* add ut

* review
  • Loading branch information
LakshSingla authored Jul 29, 2023
1 parent 25df122 commit 8232c03
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public class StageDefinition
this.shuffleCheckHasMultipleValues = shuffleCheckHasMultipleValues;
this.frameReader = Suppliers.memoize(() -> FrameReader.create(signature))::get;

if (mustGatherResultKeyStatistics() && shuffleSpec.clusterBy().getColumns().isEmpty()) {
if (mustGatherResultKeyStatistics() && shuffleSpec.clusterBy().isEmpty()) {
throw new IAE("Cannot shuffle with spec [%s] and nil clusterBy", shuffleSpec);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,14 @@ public QueryDefinition makeQueryDefinition(
// (i.e. no GROUP BY clause)
// __time in such queries is generated using either an aggregator (e.g. sum(metric) as __time) or using a
// post-aggregator (e.g. TIMESTAMP '2000-01-01' as __time)
if (intermediateClusterBy.getColumns().isEmpty() && resultClusterBy.isEmpty()) {
if (intermediateClusterBy.isEmpty() && resultClusterBy.isEmpty()) {
// Ignore shuffleSpecFactory, since we know only a single partition will come out, and we can save some effort.
shuffleSpecFactoryPreAggregation = ShuffleSpecFactories.singlePartition();
shuffleSpecFactoryPostAggregation = ShuffleSpecFactories.singlePartition();
} else if (doOrderBy) {
shuffleSpecFactoryPreAggregation = ShuffleSpecFactories.globalSortWithMaxPartitionCount(maxWorkerCount);
shuffleSpecFactoryPreAggregation = intermediateClusterBy.isEmpty()
? ShuffleSpecFactories.singlePartition()
: ShuffleSpecFactories.globalSortWithMaxPartitionCount(maxWorkerCount);
shuffleSpecFactoryPostAggregation = doLimitOrOffset
? ShuffleSpecFactories.singlePartition()
: resultShuffleSpecFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,40 @@ public void testInsertOnFoo1WithTimeAggregator()

}

@Test
public void testInsertOnFoo1WithTimeAggregatorAndMultipleWorkers()
{
Map<String, Object> localContext = new HashMap<>(context);
localContext.put(MultiStageQueryContext.CTX_TASK_ASSIGNMENT_STRATEGY, WorkerAssignmentStrategy.MAX.name());
localContext.put(MultiStageQueryContext.CTX_MAX_NUM_TASKS, 4);

RowSignature rowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
.build();

testIngestQuery().setSql(
"INSERT INTO foo1 "
+ "SELECT MILLIS_TO_TIMESTAMP((SUM(CAST(\"m1\" AS BIGINT)))) AS __time "
+ "FROM foo "
+ "PARTITIONED BY DAY"
)
.setExpectedDataSource("foo1")
.setExpectedRowSignature(rowSignature)
.setQueryContext(localContext)
.setExpectedSegment(
ImmutableSet.of(
SegmentId.of("foo1", Intervals.of("1970-01-01/P1D"), "test", 0)
)
)
.setExpectedResultRows(
ImmutableList.of(
new Object[]{21L}
)
)
.verifyResults();
}


@Test
public void testInsertOnFoo1WithTimePostAggregator()
{
Expand Down

0 comments on commit 8232c03

Please sign in to comment.