From f6c00171bb2b9c351847c6276718638ac0517212 Mon Sep 17 00:00:00 2001 From: Amatya Date: Tue, 15 Aug 2023 01:03:58 +0530 Subject: [PATCH 1/3] Compaction with MSQ --- .../indexing/common/task/CompactionTask.java | 205 ++++++++++++++++++ .../common/task/CompactionTaskTest.java | 15 ++ .../common/granularity/GranularityType.java | 11 + 3 files changed, 231 insertions(+) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index 108d186b75d9..50bc11ddd10f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -30,6 +30,7 @@ import com.google.common.collect.BiMap; import com.google.common.collect.HashBiMap; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; @@ -46,6 +47,7 @@ import org.apache.druid.indexer.Checks; import org.apache.druid.indexer.Property; import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec; import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; import org.apache.druid.indexer.partitions.PartitionsSpec; import org.apache.druid.indexing.common.LockGranularity; @@ -69,6 +71,7 @@ import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.granularity.GranularityType; import org.apache.druid.java.util.common.guava.Comparators; @@ -96,6 +99,7 @@ import org.apache.druid.timeline.SegmentTimeline; import org.apache.druid.timeline.TimelineObjectHolder; import org.apache.druid.timeline.VersionedIntervalTimeline; +import org.apache.druid.utils.CollectionUtils; import org.joda.time.Duration; import org.joda.time.Interval; @@ -467,6 +471,31 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception toolbox.getCoordinatorClient(), segmentCacheManagerFactory ); + + for (ParallelIndexIngestionSpec ingestionSpec : ingestionSpecs) { + final PartitionsSpec partitionsSpec = ingestionSpec.getTuningConfig().getPartitionsSpec(); + List clusteredBy = new ArrayList<>(); + if (partitionsSpec instanceof DimensionRangePartitionsSpec) { + clusteredBy.addAll(((DimensionRangePartitionsSpec) partitionsSpec).getPartitionDimensions()); + } + System.out.println( + new MSQReplaceCompaction( + getDataSource(), + ((DruidInputSource) ingestionSpec.getIOConfig().getInputSource()).getInterval(), + ingestionSpec.getDataSchema().getDimensionsSpec().getDimensionNames(), + Arrays.stream(ingestionSpec.getDataSchema().getAggregators()) + .map(AggregatorFactory::getName) + .collect(Collectors.toList()), + GranularityType + .fromGranularity(ingestionSpec.getDataSchema().getGranularitySpec().getQueryGranularity()), + GranularityType + .fromGranularity(ingestionSpec.getDataSchema().getGranularitySpec().getSegmentGranularity()), + ingestionSpec.getDataSchema().getGranularitySpec().isRollup(), + clusteredBy, + ingestionSpec.getTuningConfig().getMaxNumConcurrentSubTasks() + ).buildQuery() + ); + } final List indexTaskSpecs = IntStream .range(0, ingestionSpecs.size()) .mapToObj(i -> { @@ -1314,6 +1343,182 @@ public CompactionTask build() } } + public static class MSQReplaceCompaction + { + private static final String MAX_NUM_TASKS = "maxNumTasks"; + + private static final String FINALIZE_AGGREGATIONS = "finalizeAggregations"; + + private static final Set QUERY_GRANULARITIES = ImmutableSet.of( + GranularityType.NONE, + GranularityType.SECOND, + GranularityType.MINUTE, + GranularityType.HOUR, + GranularityType.DAY, + GranularityType.MONTH, + GranularityType.QUARTER, + GranularityType.YEAR + ); + + private static final Set SEGMENT_GRANULARITIES = ImmutableSet.of( + GranularityType.NONE, + GranularityType.SECOND, + GranularityType.MINUTE, + GranularityType.HOUR, + GranularityType.DAY, + GranularityType.MONTH, + GranularityType.QUARTER, + GranularityType.YEAR, + GranularityType.ALL + ); + + private final List dimensions; + + private final List metrics; + + private final Interval interval; + + private final String datasource; + + private final GranularityType queryGranularity; + + private final GranularityType segmentGranularity; + + private final boolean rollup; + + private final List clusterByDimensions; + + private final Map context; + + public MSQReplaceCompaction( + String datasource, + Interval interval, + List dimensions, + List metrics, + GranularityType queryGranularity, + GranularityType segmentGranularity, + boolean rollup, + List clusterByDimensions, + int numSubTasks + ) + { + this.context = ImmutableMap.of(MAX_NUM_TASKS, numSubTasks + 1, FINALIZE_AGGREGATIONS, false); + this.datasource = datasource; + this.interval = interval; + this.dimensions = dimensions; + this.metrics = metrics; + if (queryGranularity != null && !QUERY_GRANULARITIES.contains(queryGranularity)) { + throw new IAE("Invalid query granularity: " + queryGranularity); + } + this.queryGranularity = queryGranularity; + if (!SEGMENT_GRANULARITIES.contains(segmentGranularity)) { + throw new IAE("Invalid segment granularity: " + segmentGranularity); + } + this.segmentGranularity = segmentGranularity; + this.rollup = rollup; + this.clusterByDimensions = clusterByDimensions; + } + + public Map getContext() + { + return this.context; + } + + public String buildQuery() + { + return buildFrom( + datasource, + makeWhereExpression(), + makeSelectExpression(), + makeFromExpression(), + makeGroupByExpression(), + makePartitionedByExpression(), + makeClusteredByExpression() + ); + } + + private String buildFrom( + final String datasource, + final String whereExpression, + final String selectExpression, + final String fromExpression, + final String groupByExpression, + final String partitionedByExpression, + final String clusteredByExpression + ) + { + return "REPLACE INTO " + datasource + "\n" + + "OVERWRITE " + whereExpression + "\n" + + selectExpression + "\n" + + fromExpression + "\n" + + groupByExpression + + partitionedByExpression + "\n" + + clusteredByExpression; + } + + private String makeWhereExpression() + { + return "WHERE __time >= TIMESTAMP '" + interval.getStart().toString() + "'" + + " AND __time < TIMESTAMP '" + interval.getEnd().toString() + "'"; + } + + private String makeTimeExpression() + { + if (queryGranularity == null || GranularityType.NONE.equals(queryGranularity)) { + return "__time"; + } else { + return "FLOOR (__time TO " + queryGranularity + ") AS __time"; + } + } + + private String makeSelectExpression() + { + StringBuilder selectBuilder = new StringBuilder(); + selectBuilder.append("SELECT ").append(makeTimeExpression()); + for (String dimension : dimensions) { + selectBuilder.append(", "); + selectBuilder.append(dimension); + } + for (String metric : metrics) { + selectBuilder.append(", "); + selectBuilder.append(metric); + } + return selectBuilder.toString(); + } + + private String makeFromExpression() + { + return "FROM " + datasource + " " + makeWhereExpression(); + } + + private String makeGroupByExpression() + { + if (rollup) { + return "GROUP BY " + IntStream.rangeClosed(1, 1 + dimensions.size()) + .mapToObj(String::valueOf) + .collect(Collectors.joining(", ")) + + "\n"; + } else { + return ""; + } + } + + private String makePartitionedByExpression() + { + return "PARTITIONED BY " + segmentGranularity; + } + + private String makeClusteredByExpression() + { + if (CollectionUtils.isNullOrEmpty(clusterByDimensions)) { + return ""; + } else { + return "CLUSTERED BY " + String.join(", ", clusterByDimensions) + "\n"; + } + } + } + + /** * Compcation Task Tuning Config. * diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java index 76ea03a8176e..0723c1898774 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java @@ -83,6 +83,7 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularity; +import org.apache.druid.java.util.common.granularity.GranularityType; import org.apache.druid.java.util.common.granularity.PeriodGranularity; import org.apache.druid.java.util.common.guava.Comparators; import org.apache.druid.java.util.emitter.core.NoopEmitter; @@ -422,6 +423,20 @@ public void testCompactionTaskEmitter() ServiceEmitter noopEmitter = new ServiceEmitter("service", "host", new NoopEmitter()); taskCreatedWithSegmentGranularity.emitCompactIngestionModeMetrics(noopEmitter, false); taskCreatedWithSegmentGranularity.emitCompactIngestionModeMetrics(noopEmitter, true); + + final CompactionTask.MSQReplaceCompaction msqCompaction = new CompactionTask.MSQReplaceCompaction( + DATA_SOURCE, + COMPACTION_INTERVAL, + ImmutableList.of("d1", "d2", "d3", "d4"), + ImmutableList.of("m1", "m2"), + null, + GranularityType.HOUR, + false, + ImmutableList.of("d1"), + 3 + ); + System.out.println(msqCompaction.buildQuery()); + System.out.println(msqCompaction.getContext()); } @Test(expected = IAE.class) diff --git a/processing/src/main/java/org/apache/druid/java/util/common/granularity/GranularityType.java b/processing/src/main/java/org/apache/druid/java/util/common/granularity/GranularityType.java index b4b78390605d..a68db432b93f 100644 --- a/processing/src/main/java/org/apache/druid/java/util/common/granularity/GranularityType.java +++ b/processing/src/main/java/org/apache/druid/java/util/common/granularity/GranularityType.java @@ -185,6 +185,17 @@ public static boolean isStandard(Granularity granularity) return false; } + public static GranularityType fromGranularity(Granularity granularity) + { + final GranularityType[] values = GranularityType.values(); + for (GranularityType value : values) { + if (value.getDefaultGranularity().equals(granularity)) { + return value; + } + } + return null; + } + /** * Note: This is only an estimate based on the values in period. * This will not work for complicated periods that represent say 1 year 1 day From fd1040141ba6933ce85f8fb6846e540c8eec3509 Mon Sep 17 00:00:00 2001 From: Amatya Date: Tue, 15 Aug 2023 18:09:48 +0530 Subject: [PATCH 2/3] Submit MSQ tasks from compact task using the new RouterClient --- indexing-service/pom.xml | 5 ++ .../druid/indexing/common/TaskToolbox.java | 18 +++++ .../indexing/common/TaskToolboxFactory.java | 5 ++ .../indexing/common/task/CompactionTask.java | 69 +++++++++++++------ .../indexing/common/TaskToolboxTest.java | 2 + ...penderatorDriverRealtimeIndexTaskTest.java | 2 + .../common/task/RealtimeIndexTaskTest.java | 2 + .../SingleTaskBackgroundRunnerTest.java | 2 + .../indexing/overlord/TaskLifecycleTest.java | 2 + .../overlord/TestTaskToolboxFactory.java | 4 ++ .../SeekableStreamIndexTaskTestBase.java | 2 + .../worker/WorkerTaskManagerTest.java | 2 + .../worker/WorkerTaskMonitorTest.java | 2 + .../druid/rpc/guice/ServiceClientModule.java | 29 ++++++++ .../druid/rpc/indexing/RouterClient.java | 37 ++++++++++ .../druid/rpc/indexing/RouterClientImpl.java | 57 +++++++++++++++ .../client/indexing/NoopRouterClient.java | 32 +++++++++ 17 files changed, 250 insertions(+), 22 deletions(-) create mode 100644 server/src/main/java/org/apache/druid/rpc/indexing/RouterClient.java create mode 100644 server/src/main/java/org/apache/druid/rpc/indexing/RouterClientImpl.java create mode 100644 server/src/test/java/org/apache/druid/client/indexing/NoopRouterClient.java diff --git a/indexing-service/pom.xml b/indexing-service/pom.xml index 4ba42aac022b..0c3165f2831e 100644 --- a/indexing-service/pom.xml +++ b/indexing-service/pom.xml @@ -48,6 +48,11 @@ druid-indexing-hadoop ${project.parent.version} + + org.apache.druid + druid-sql + ${project.parent.version} + io.dropwizard.metrics metrics-core diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java index 9ebfb96b5677..6be1c0085338 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java @@ -45,6 +45,7 @@ import org.apache.druid.query.QueryProcessingPool; import org.apache.druid.query.QueryRunnerFactoryConglomerate; import org.apache.druid.rpc.indexing.OverlordClient; +import org.apache.druid.rpc.indexing.RouterClient; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.IndexMergerV9; import org.apache.druid.segment.handoff.SegmentHandoffNotifierFactory; @@ -120,6 +121,7 @@ public class TaskToolbox private final AppenderatorsManager appenderatorsManager; private final OverlordClient overlordClient; private final CoordinatorClient coordinatorClient; + private final RouterClient routerClient; // Used by only native parallel tasks private final IntermediaryDataManager intermediaryDataManager; @@ -165,6 +167,7 @@ public TaskToolbox( AppenderatorsManager appenderatorsManager, OverlordClient overlordClient, CoordinatorClient coordinatorClient, + RouterClient routerClient, ParallelIndexSupervisorTaskClientProvider supervisorTaskClientProvider, ShuffleClient shuffleClient, TaskLogPusher taskLogPusher, @@ -207,6 +210,7 @@ public TaskToolbox( this.appenderatorsManager = appenderatorsManager; this.overlordClient = overlordClient; this.coordinatorClient = coordinatorClient; + this.routerClient = routerClient; this.supervisorTaskClientProvider = supervisorTaskClientProvider; this.shuffleClient = shuffleClient; this.taskLogPusher = taskLogPusher; @@ -448,6 +452,11 @@ public CoordinatorClient getCoordinatorClient() return coordinatorClient; } + public RouterClient getRouterClient() + { + return routerClient; + } + public ParallelIndexSupervisorTaskClientProvider getSupervisorTaskClientProvider() { return supervisorTaskClientProvider; @@ -538,6 +547,7 @@ public static class Builder private AppenderatorsManager appenderatorsManager; private OverlordClient overlordClient; private CoordinatorClient coordinatorClient; + private RouterClient routerClient; private IntermediaryDataManager intermediaryDataManager; private ParallelIndexSupervisorTaskClientProvider supervisorTaskClientProvider; private ShuffleClient shuffleClient; @@ -584,6 +594,7 @@ public Builder(TaskToolbox other) this.appenderatorsManager = other.appenderatorsManager; this.overlordClient = other.overlordClient; this.coordinatorClient = other.coordinatorClient; + this.routerClient = other.routerClient; this.intermediaryDataManager = other.intermediaryDataManager; this.supervisorTaskClientProvider = other.supervisorTaskClientProvider; this.shuffleClient = other.shuffleClient; @@ -793,6 +804,12 @@ public Builder coordinatorClient(final CoordinatorClient coordinatorClient) return this; } + public Builder routerClient(final RouterClient routerClient) + { + this.routerClient = routerClient; + return this; + } + public Builder intermediaryDataManager(final IntermediaryDataManager intermediaryDataManager) { this.intermediaryDataManager = intermediaryDataManager; @@ -861,6 +878,7 @@ public TaskToolbox build() appenderatorsManager, overlordClient, coordinatorClient, + routerClient, supervisorTaskClientProvider, shuffleClient, taskLogPusher, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java index 288d89919b98..a9e87566c01f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java @@ -47,6 +47,7 @@ import org.apache.druid.query.QueryRunnerFactoryConglomerate; import org.apache.druid.rpc.StandardRetryPolicy; import org.apache.druid.rpc.indexing.OverlordClient; +import org.apache.druid.rpc.indexing.RouterClient; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.IndexMergerV9Factory; import org.apache.druid.segment.handoff.SegmentHandoffNotifierFactory; @@ -105,6 +106,7 @@ public class TaskToolboxFactory private final AppenderatorsManager appenderatorsManager; private final OverlordClient overlordClient; private final CoordinatorClient coordinatorClient; + private final RouterClient routerClient; // Used by only native parallel tasks private final IntermediaryDataManager intermediaryDataManager; @@ -149,6 +151,7 @@ public TaskToolboxFactory( AppenderatorsManager appenderatorsManager, OverlordClient overlordClient, CoordinatorClient coordinatorClient, + RouterClient routerClient, ParallelIndexSupervisorTaskClientProvider supervisorTaskClientProvider, ShuffleClient shuffleClient, TaskLogPusher taskLogPusher, @@ -189,6 +192,7 @@ public TaskToolboxFactory( this.appenderatorsManager = appenderatorsManager; this.overlordClient = overlordClient; this.coordinatorClient = coordinatorClient; + this.routerClient = routerClient; this.supervisorTaskClientProvider = supervisorTaskClientProvider; this.shuffleClient = shuffleClient; this.taskLogPusher = taskLogPusher; @@ -251,6 +255,7 @@ public TaskToolbox build(TaskConfig config, Task task) // Calls will still eventually fail if problems persist. .overlordClient(overlordClient.withRetryPolicy(StandardRetryPolicy.aboutAnHour())) .coordinatorClient(coordinatorClient.withRetryPolicy(StandardRetryPolicy.aboutAnHour())) + .routerClient(routerClient) .supervisorTaskClientProvider(supervisorTaskClientProvider) .shuffleClient(shuffleClient) .taskLogPusher(taskLogPusher) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index 50bc11ddd10f..ac87af381066 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -71,7 +71,6 @@ import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.granularity.GranularityType; import org.apache.druid.java.util.common.guava.Comparators; @@ -95,6 +94,7 @@ import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory; import org.apache.druid.server.coordinator.duty.CompactSegments; import org.apache.druid.server.security.ResourceAction; +import org.apache.druid.sql.http.SqlQuery; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentTimeline; import org.apache.druid.timeline.TimelineObjectHolder; @@ -107,6 +107,7 @@ import javax.annotation.Nullable; import java.io.File; import java.io.IOException; +import java.sql.Timestamp; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -478,23 +479,33 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception if (partitionsSpec instanceof DimensionRangePartitionsSpec) { clusteredBy.addAll(((DimensionRangePartitionsSpec) partitionsSpec).getPartitionDimensions()); } - System.out.println( - new MSQReplaceCompaction( - getDataSource(), - ((DruidInputSource) ingestionSpec.getIOConfig().getInputSource()).getInterval(), - ingestionSpec.getDataSchema().getDimensionsSpec().getDimensionNames(), - Arrays.stream(ingestionSpec.getDataSchema().getAggregators()) - .map(AggregatorFactory::getName) - .collect(Collectors.toList()), - GranularityType - .fromGranularity(ingestionSpec.getDataSchema().getGranularitySpec().getQueryGranularity()), - GranularityType - .fromGranularity(ingestionSpec.getDataSchema().getGranularitySpec().getSegmentGranularity()), - ingestionSpec.getDataSchema().getGranularitySpec().isRollup(), - clusteredBy, - ingestionSpec.getTuningConfig().getMaxNumConcurrentSubTasks() - ).buildQuery() + final MSQReplaceCompaction msqReplaceQuery = new MSQReplaceCompaction( + getDataSource(), + ((DruidInputSource) ingestionSpec.getIOConfig().getInputSource()).getInterval(), + ingestionSpec.getDataSchema().getDimensionsSpec().getDimensionNames(), + Arrays.stream(ingestionSpec.getDataSchema().getAggregators()) + .map(AggregatorFactory::getName) + .collect(Collectors.toList()), + GranularityType + .fromGranularity(ingestionSpec.getDataSchema().getGranularitySpec().getQueryGranularity()), + GranularityType + .fromGranularity(ingestionSpec.getDataSchema().getGranularitySpec().getSegmentGranularity()), + ingestionSpec.getDataSchema().getGranularitySpec().isRollup(), + clusteredBy, + ingestionSpec.getTuningConfig().getMaxNumConcurrentSubTasks() ); + System.out.println(msqReplaceQuery.buildQuery()); + final SqlQuery sqlQuery = new SqlQuery( + msqReplaceQuery.buildQuery(), + null, + true, + true, + true, + msqReplaceQuery.getContext(), + null + ); + System.out.println(sqlQuery); + toolbox.getRouterClient().runQuery(sqlQuery).get(); } final List indexTaskSpecs = IntStream .range(0, ingestionSpecs.size()) @@ -1349,6 +1360,8 @@ public static class MSQReplaceCompaction private static final String FINALIZE_AGGREGATIONS = "finalizeAggregations"; + private static final String SOURCE_TYPE = "sourceType"; + private static final Set QUERY_GRANULARITIES = ImmutableSet.of( GranularityType.NONE, GranularityType.SECOND, @@ -1402,7 +1415,14 @@ public MSQReplaceCompaction( int numSubTasks ) { - this.context = ImmutableMap.of(MAX_NUM_TASKS, numSubTasks + 1, FINALIZE_AGGREGATIONS, false); + this.context = ImmutableMap.of( + MAX_NUM_TASKS, + numSubTasks + 1, + FINALIZE_AGGREGATIONS, + false, + SOURCE_TYPE, + "sql" + ); this.datasource = datasource; this.interval = interval; this.dimensions = dimensions; @@ -1452,14 +1472,19 @@ private String buildFrom( + selectExpression + "\n" + fromExpression + "\n" + groupByExpression - + partitionedByExpression + "\n" + + partitionedByExpression + clusteredByExpression; } private String makeWhereExpression() { - return "WHERE __time >= TIMESTAMP '" + interval.getStart().toString() + "'" - + " AND __time < TIMESTAMP '" + interval.getEnd().toString() + "'"; + final String start = Arrays.stream(new Timestamp(interval.getStartMillis()).toString().split("\\.")) + .findFirst().get(); + final String end = Arrays.stream(new Timestamp(interval.getEndMillis()).toString().split("\\.")) + .findFirst().get(); + + return "WHERE __time >= TIMESTAMP '" + start + "'" + + " AND __time < TIMESTAMP '" + end + "'"; } private String makeTimeExpression() @@ -1513,7 +1538,7 @@ private String makeClusteredByExpression() if (CollectionUtils.isNullOrEmpty(clusterByDimensions)) { return ""; } else { - return "CLUSTERED BY " + String.join(", ", clusterByDimensions) + "\n"; + return "\n" + "CLUSTERED BY " + String.join(", ", clusterByDimensions) + "\n"; } } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java index aaeee9fcd543..6593901046f2 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java @@ -25,6 +25,7 @@ import org.apache.druid.client.cache.CachePopulatorStats; import org.apache.druid.client.coordinator.NoopCoordinatorClient; import org.apache.druid.client.indexing.NoopOverlordClient; +import org.apache.druid.client.indexing.NoopRouterClient; import org.apache.druid.indexing.common.actions.TaskActionClientFactory; import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.common.config.TaskConfigBuilder; @@ -148,6 +149,7 @@ public void setUp() throws IOException new TestAppenderatorsManager(), new NoopOverlordClient(), new NoopCoordinatorClient(), + new NoopRouterClient(), null, null, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java index 7ea82b8f57b0..cce13af921f7 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java @@ -33,6 +33,7 @@ import org.apache.druid.client.cache.MapCache; import org.apache.druid.client.coordinator.NoopCoordinatorClient; import org.apache.druid.client.indexing.NoopOverlordClient; +import org.apache.druid.client.indexing.NoopRouterClient; import org.apache.druid.common.config.NullHandling; import org.apache.druid.data.input.Firehose; import org.apache.druid.data.input.FirehoseFactory; @@ -1641,6 +1642,7 @@ public void close() new TestAppenderatorsManager(), new NoopOverlordClient(), new NoopCoordinatorClient(), + new NoopRouterClient(), null, null, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java index 9881561d61f1..781fbfa36e40 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java @@ -32,6 +32,7 @@ import org.apache.druid.client.cache.MapCache; import org.apache.druid.client.coordinator.NoopCoordinatorClient; import org.apache.druid.client.indexing.NoopOverlordClient; +import org.apache.druid.client.indexing.NoopRouterClient; import org.apache.druid.common.config.NullHandling; import org.apache.druid.data.input.FirehoseFactory; import org.apache.druid.data.input.impl.DimensionsSpec; @@ -1013,6 +1014,7 @@ public void close() new TestAppenderatorsManager(), new NoopOverlordClient(), new NoopCoordinatorClient(), + new NoopRouterClient(), null, null, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java index e683c828ddc6..3d94fcda7a75 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java @@ -22,6 +22,7 @@ import com.google.common.util.concurrent.ListenableFuture; import org.apache.druid.client.coordinator.NoopCoordinatorClient; import org.apache.druid.client.indexing.NoopOverlordClient; +import org.apache.druid.client.indexing.NoopRouterClient; import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatus; @@ -131,6 +132,7 @@ public void setup() throws IOException new TestAppenderatorsManager(), new NoopOverlordClient(), new NoopCoordinatorClient(), + new NoopRouterClient(), null, null, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java index 31ec645b3fdd..c6333a44b14c 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java @@ -36,6 +36,7 @@ import org.apache.druid.client.cache.MapCache; import org.apache.druid.client.coordinator.NoopCoordinatorClient; import org.apache.druid.client.indexing.NoopOverlordClient; +import org.apache.druid.client.indexing.NoopRouterClient; import org.apache.druid.data.input.AbstractInputSource; import org.apache.druid.data.input.Firehose; import org.apache.druid.data.input.FirehoseFactory; @@ -695,6 +696,7 @@ public void unannounceSegments(Iterable segments) appenderatorsManager, new NoopOverlordClient(), new NoopCoordinatorClient(), + new NoopRouterClient(), null, null, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskToolboxFactory.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskToolboxFactory.java index 5c6afdbb61b1..fb8b7f8d42f3 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskToolboxFactory.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskToolboxFactory.java @@ -27,6 +27,7 @@ import org.apache.druid.client.coordinator.CoordinatorClient; import org.apache.druid.client.coordinator.NoopCoordinatorClient; import org.apache.druid.client.indexing.NoopOverlordClient; +import org.apache.druid.client.indexing.NoopRouterClient; import org.apache.druid.discovery.DataNodeService; import org.apache.druid.discovery.DruidNodeAnnouncer; import org.apache.druid.discovery.LookupNodeService; @@ -45,6 +46,7 @@ import org.apache.druid.query.QueryProcessingPool; import org.apache.druid.query.QueryRunnerFactoryConglomerate; import org.apache.druid.rpc.indexing.OverlordClient; +import org.apache.druid.rpc.indexing.RouterClient; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.IndexMergerV9Factory; import org.apache.druid.segment.TestHelper; @@ -111,6 +113,7 @@ public TestTaskToolboxFactory( bob.appenderatorsManager, bob.overlordClient, bob.coordinatorClient, + bob.routerClient, bob.supervisorTaskClientProvider, bob.shuffleClient, bob.taskLogPusher, @@ -154,6 +157,7 @@ public static class Builder private AppenderatorsManager appenderatorsManager; private OverlordClient overlordClient = new NoopOverlordClient(); private CoordinatorClient coordinatorClient = new NoopCoordinatorClient(); + private RouterClient routerClient = new NoopRouterClient(); private ParallelIndexSupervisorTaskClientProvider supervisorTaskClientProvider; private ShuffleClient shuffleClient; private TaskLogPusher taskLogPusher; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java index 6be23407a418..f4f9c0fe5eab 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java @@ -36,6 +36,7 @@ import org.apache.druid.client.cache.MapCache; import org.apache.druid.client.coordinator.NoopCoordinatorClient; import org.apache.druid.client.indexing.NoopOverlordClient; +import org.apache.druid.client.indexing.NoopRouterClient; import org.apache.druid.common.config.NullHandling; import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.impl.ByteEntity; @@ -696,6 +697,7 @@ public void close() new TestAppenderatorsManager(), new NoopOverlordClient(), new NoopCoordinatorClient(), + new NoopRouterClient(), null, null, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java index d36bf1c04bab..b949a8aa0264 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java @@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableSet; import com.google.common.util.concurrent.Futures; import org.apache.druid.client.coordinator.NoopCoordinatorClient; +import org.apache.druid.client.indexing.NoopRouterClient; import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatus; @@ -162,6 +163,7 @@ private WorkerTaskManager createWorkerTaskManager() new TestAppenderatorsManager(), overlordClient, new NoopCoordinatorClient(), + new NoopRouterClient(), null, null, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java index aecfe29ab1fa..1357b05f72bb 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java @@ -28,6 +28,7 @@ import org.apache.curator.test.TestingCluster; import org.apache.druid.client.coordinator.NoopCoordinatorClient; import org.apache.druid.client.indexing.NoopOverlordClient; +import org.apache.druid.client.indexing.NoopRouterClient; import org.apache.druid.curator.PotentiallyGzippedCompressionProvider; import org.apache.druid.indexer.TaskState; import org.apache.druid.indexing.common.IndexingServiceCondition; @@ -204,6 +205,7 @@ private WorkerTaskMonitor createTaskMonitor() new TestAppenderatorsManager(), new NoopOverlordClient(), new NoopCoordinatorClient(), + new NoopRouterClient(), null, null, null, diff --git a/server/src/main/java/org/apache/druid/rpc/guice/ServiceClientModule.java b/server/src/main/java/org/apache/druid/rpc/guice/ServiceClientModule.java index 51dd2b89d736..1882a52a66fd 100644 --- a/server/src/main/java/org/apache/druid/rpc/guice/ServiceClientModule.java +++ b/server/src/main/java/org/apache/druid/rpc/guice/ServiceClientModule.java @@ -42,6 +42,9 @@ import org.apache.druid.rpc.StandardRetryPolicy; import org.apache.druid.rpc.indexing.OverlordClient; import org.apache.druid.rpc.indexing.OverlordClientImpl; +import org.apache.druid.rpc.indexing.RouterClient; +import org.apache.druid.rpc.indexing.RouterClientImpl; +import org.apache.druid.server.router.Router; import java.util.concurrent.ScheduledExecutorService; @@ -117,4 +120,30 @@ public CoordinatorClient makeCoordinatorClient( jsonMapper ); } + + @Provides + @ManageLifecycle + @Router + public ServiceLocator makeRouterServiceLocator(final DruidNodeDiscoveryProvider discoveryProvider) + { + return new DiscoveryServiceLocator(discoveryProvider, NodeRole.ROUTER); + } + + @Provides + @LazySingleton + public RouterClient makeRouterClient( + @Json final ObjectMapper jsonMapper, + @EscalatedGlobal final ServiceClientFactory clientFactory, + @Router final ServiceLocator serviceLocator + ) + { + return new RouterClientImpl( + clientFactory.makeClient( + NodeRole.ROUTER.getJsonName(), + serviceLocator, + StandardRetryPolicy.builder().maxAttempts(CLIENT_MAX_ATTEMPTS).build() + ), + jsonMapper + ); + } } diff --git a/server/src/main/java/org/apache/druid/rpc/indexing/RouterClient.java b/server/src/main/java/org/apache/druid/rpc/indexing/RouterClient.java new file mode 100644 index 000000000000..6f6729669da3 --- /dev/null +++ b/server/src/main/java/org/apache/druid/rpc/indexing/RouterClient.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.rpc.indexing; + + +import com.google.common.util.concurrent.ListenableFuture; + +/** + * High-level Broker client. + * + */ +public interface RouterClient +{ + /** + * Run a given query on a broker + * + * @return + */ + ListenableFuture runQuery(Object query); +} diff --git a/server/src/main/java/org/apache/druid/rpc/indexing/RouterClientImpl.java b/server/src/main/java/org/apache/druid/rpc/indexing/RouterClientImpl.java new file mode 100644 index 000000000000..5fde61be648b --- /dev/null +++ b/server/src/main/java/org/apache/druid/rpc/indexing/RouterClientImpl.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.rpc.indexing; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.druid.common.guava.FutureUtils; +import org.apache.druid.java.util.http.client.response.BytesFullResponseHandler; +import org.apache.druid.rpc.RequestBuilder; +import org.apache.druid.rpc.ServiceClient; +import org.jboss.netty.handler.codec.http.HttpMethod; + +/** + * Production implementation of {@link RouterClient}. + */ +public class RouterClientImpl implements RouterClient +{ + private final ServiceClient client; + private final ObjectMapper jsonMapper; + + public RouterClientImpl(final ServiceClient client, final ObjectMapper jsonMapper) + { + this.client = Preconditions.checkNotNull(client, "client"); + this.jsonMapper = Preconditions.checkNotNull(jsonMapper, "jsonMapper"); + } + + @Override + public ListenableFuture runQuery(final Object query) + { + return FutureUtils.transform( + client.asyncRequest( + new RequestBuilder(HttpMethod.POST, "/druid/v2/sql/task") + .jsonContent(jsonMapper, query), + new BytesFullResponseHandler() + ), + holder -> null + ); + } +} diff --git a/server/src/test/java/org/apache/druid/client/indexing/NoopRouterClient.java b/server/src/test/java/org/apache/druid/client/indexing/NoopRouterClient.java new file mode 100644 index 000000000000..35faf98eca51 --- /dev/null +++ b/server/src/test/java/org/apache/druid/client/indexing/NoopRouterClient.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.client.indexing; + +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.druid.rpc.indexing.RouterClient; + +public class NoopRouterClient implements RouterClient +{ + @Override + public ListenableFuture runQuery(Object query) + { + throw new UnsupportedOperationException(); + } +} From 97dcb690437c07cb71351200ac68f97139c541e6 Mon Sep 17 00:00:00 2001 From: Amatya Date: Wed, 16 Aug 2023 09:29:40 +0530 Subject: [PATCH 3/3] Add task context flag --- .../indexing/common/task/CompactionTask.java | 75 ++++++++++--------- .../druid/indexing/common/task/Tasks.java | 1 + .../common/task/CompactionTaskTest.java | 3 +- .../druid/rpc/indexing/RouterClient.java | 4 +- 4 files changed, 46 insertions(+), 37 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index 5506351fa4c1..2005f0b513d7 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -479,40 +479,44 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception getMetricBuilder() ); - for (ParallelIndexIngestionSpec ingestionSpec : ingestionSpecs) { - final PartitionsSpec partitionsSpec = ingestionSpec.getTuningConfig().getPartitionsSpec(); - List clusteredBy = new ArrayList<>(); - if (partitionsSpec instanceof DimensionRangePartitionsSpec) { - clusteredBy.addAll(((DimensionRangePartitionsSpec) partitionsSpec).getPartitionDimensions()); + if (getContextValue(Tasks.USE_MSQ_COMPACTION, false)) { + for (ParallelIndexIngestionSpec ingestionSpec : ingestionSpecs) { + final PartitionsSpec partitionsSpec = ingestionSpec.getTuningConfig().getPartitionsSpec(); + List clusteredBy = new ArrayList<>(); + if (partitionsSpec instanceof DimensionRangePartitionsSpec) { + clusteredBy.addAll(((DimensionRangePartitionsSpec) partitionsSpec).getPartitionDimensions()); + } + final MSQReplaceCompaction msqReplaceQuery = new MSQReplaceCompaction( + getDataSource(), + ((DruidInputSource) ingestionSpec.getIOConfig().getInputSource()).getInterval(), + ingestionSpec.getDataSchema().getDimensionsSpec().getDimensionNames(), + Arrays.stream(ingestionSpec.getDataSchema().getAggregators()) + .map(AggregatorFactory::getName) + .collect(Collectors.toList()), + GranularityType + .fromGranularity(ingestionSpec.getDataSchema().getGranularitySpec().getQueryGranularity()), + GranularityType + .fromGranularity(ingestionSpec.getDataSchema().getGranularitySpec().getSegmentGranularity()), + ingestionSpec.getDataSchema().getGranularitySpec().isRollup(), + clusteredBy, + ingestionSpec.getTuningConfig().getMaxNumConcurrentSubTasks(), + getPriority() + ); + final SqlQuery sqlQuery = new SqlQuery( + msqReplaceQuery.buildQuery(), + null, + true, + true, + true, + msqReplaceQuery.getContext(), + null + ); + log.info("Submitting SQL replace: [%s]", sqlQuery); + toolbox.getRouterClient().runQuery(sqlQuery).get(); } - final MSQReplaceCompaction msqReplaceQuery = new MSQReplaceCompaction( - getDataSource(), - ((DruidInputSource) ingestionSpec.getIOConfig().getInputSource()).getInterval(), - ingestionSpec.getDataSchema().getDimensionsSpec().getDimensionNames(), - Arrays.stream(ingestionSpec.getDataSchema().getAggregators()) - .map(AggregatorFactory::getName) - .collect(Collectors.toList()), - GranularityType - .fromGranularity(ingestionSpec.getDataSchema().getGranularitySpec().getQueryGranularity()), - GranularityType - .fromGranularity(ingestionSpec.getDataSchema().getGranularitySpec().getSegmentGranularity()), - ingestionSpec.getDataSchema().getGranularitySpec().isRollup(), - clusteredBy, - ingestionSpec.getTuningConfig().getMaxNumConcurrentSubTasks() - ); - System.out.println(msqReplaceQuery.buildQuery()); - final SqlQuery sqlQuery = new SqlQuery( - msqReplaceQuery.buildQuery(), - null, - true, - true, - true, - msqReplaceQuery.getContext(), - null - ); - System.out.println(sqlQuery); - toolbox.getRouterClient().runQuery(sqlQuery).get(); + return TaskStatus.success(getId()); } + final List indexTaskSpecs = IntStream .range(0, ingestionSpecs.size()) .mapToObj(i -> { @@ -1436,7 +1440,8 @@ public MSQReplaceCompaction( GranularityType segmentGranularity, boolean rollup, List clusterByDimensions, - int numSubTasks + int numSubTasks, + int priority ) { this.context = ImmutableMap.of( @@ -1445,7 +1450,9 @@ public MSQReplaceCompaction( FINALIZE_AGGREGATIONS, false, SOURCE_TYPE, - "sql" + "sql", + Tasks.PRIORITY_KEY, + priority ); this.datasource = datasource; this.interval = interval; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java index 90b752697780..5a5b44ea7f6d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java @@ -53,6 +53,7 @@ public class Tasks public static final String FORCE_TIME_CHUNK_LOCK_KEY = "forceTimeChunkLock"; public static final String USE_SHARED_LOCK = "useSharedLock"; public static final String STORE_EMPTY_COLUMNS_KEY = "storeEmptyColumns"; + public static final String USE_MSQ_COMPACTION = "useMSQCompaction"; /** * Context flag denoting if maximum possible values should be used to estimate diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java index 5d6cab7382aa..46e5174d6325 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java @@ -450,7 +450,8 @@ public void testCompactionTaskEmitter() GranularityType.HOUR, false, ImmutableList.of("d1"), - 3 + 3, + 25 ); System.out.println(msqCompaction.buildQuery()); System.out.println(msqCompaction.getContext()); diff --git a/server/src/main/java/org/apache/druid/rpc/indexing/RouterClient.java b/server/src/main/java/org/apache/druid/rpc/indexing/RouterClient.java index 6f6729669da3..517c8db815f4 100644 --- a/server/src/main/java/org/apache/druid/rpc/indexing/RouterClient.java +++ b/server/src/main/java/org/apache/druid/rpc/indexing/RouterClient.java @@ -23,13 +23,13 @@ import com.google.common.util.concurrent.ListenableFuture; /** - * High-level Broker client. + * High-level Router client. * */ public interface RouterClient { /** - * Run a given query on a broker + * Run a given query on a router * * @return */