diff --git a/indexing-service/pom.xml b/indexing-service/pom.xml index 067abfdb0fb2..29cd39f165ad 100644 --- a/indexing-service/pom.xml +++ b/indexing-service/pom.xml @@ -48,6 +48,11 @@ druid-indexing-hadoop ${project.parent.version} + + org.apache.druid + druid-sql + ${project.parent.version} + io.dropwizard.metrics metrics-core diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java index 9ebfb96b5677..6be1c0085338 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java @@ -45,6 +45,7 @@ import org.apache.druid.query.QueryProcessingPool; import org.apache.druid.query.QueryRunnerFactoryConglomerate; import org.apache.druid.rpc.indexing.OverlordClient; +import org.apache.druid.rpc.indexing.RouterClient; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.IndexMergerV9; import org.apache.druid.segment.handoff.SegmentHandoffNotifierFactory; @@ -120,6 +121,7 @@ public class TaskToolbox private final AppenderatorsManager appenderatorsManager; private final OverlordClient overlordClient; private final CoordinatorClient coordinatorClient; + private final RouterClient routerClient; // Used by only native parallel tasks private final IntermediaryDataManager intermediaryDataManager; @@ -165,6 +167,7 @@ public TaskToolbox( AppenderatorsManager appenderatorsManager, OverlordClient overlordClient, CoordinatorClient coordinatorClient, + RouterClient routerClient, ParallelIndexSupervisorTaskClientProvider supervisorTaskClientProvider, ShuffleClient shuffleClient, TaskLogPusher taskLogPusher, @@ -207,6 +210,7 @@ public TaskToolbox( this.appenderatorsManager = appenderatorsManager; this.overlordClient = overlordClient; this.coordinatorClient = coordinatorClient; + this.routerClient = routerClient; this.supervisorTaskClientProvider = supervisorTaskClientProvider; this.shuffleClient = shuffleClient; this.taskLogPusher = taskLogPusher; @@ -448,6 +452,11 @@ public CoordinatorClient getCoordinatorClient() return coordinatorClient; } + public RouterClient getRouterClient() + { + return routerClient; + } + public ParallelIndexSupervisorTaskClientProvider getSupervisorTaskClientProvider() { return supervisorTaskClientProvider; @@ -538,6 +547,7 @@ public static class Builder private AppenderatorsManager appenderatorsManager; private OverlordClient overlordClient; private CoordinatorClient coordinatorClient; + private RouterClient routerClient; private IntermediaryDataManager intermediaryDataManager; private ParallelIndexSupervisorTaskClientProvider supervisorTaskClientProvider; private ShuffleClient shuffleClient; @@ -584,6 +594,7 @@ public Builder(TaskToolbox other) this.appenderatorsManager = other.appenderatorsManager; this.overlordClient = other.overlordClient; this.coordinatorClient = other.coordinatorClient; + this.routerClient = other.routerClient; this.intermediaryDataManager = other.intermediaryDataManager; this.supervisorTaskClientProvider = other.supervisorTaskClientProvider; this.shuffleClient = other.shuffleClient; @@ -793,6 +804,12 @@ public Builder coordinatorClient(final CoordinatorClient coordinatorClient) return this; } + public Builder routerClient(final RouterClient routerClient) + { + this.routerClient = routerClient; + return this; + } + public Builder intermediaryDataManager(final IntermediaryDataManager intermediaryDataManager) { this.intermediaryDataManager = intermediaryDataManager; @@ -861,6 +878,7 @@ public TaskToolbox build() appenderatorsManager, overlordClient, coordinatorClient, + routerClient, supervisorTaskClientProvider, shuffleClient, taskLogPusher, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java index 288d89919b98..a9e87566c01f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java @@ -47,6 +47,7 @@ import org.apache.druid.query.QueryRunnerFactoryConglomerate; import org.apache.druid.rpc.StandardRetryPolicy; import org.apache.druid.rpc.indexing.OverlordClient; +import org.apache.druid.rpc.indexing.RouterClient; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.IndexMergerV9Factory; import org.apache.druid.segment.handoff.SegmentHandoffNotifierFactory; @@ -105,6 +106,7 @@ public class TaskToolboxFactory private final AppenderatorsManager appenderatorsManager; private final OverlordClient overlordClient; private final CoordinatorClient coordinatorClient; + private final RouterClient routerClient; // Used by only native parallel tasks private final IntermediaryDataManager intermediaryDataManager; @@ -149,6 +151,7 @@ public TaskToolboxFactory( AppenderatorsManager appenderatorsManager, OverlordClient overlordClient, CoordinatorClient coordinatorClient, + RouterClient routerClient, ParallelIndexSupervisorTaskClientProvider supervisorTaskClientProvider, ShuffleClient shuffleClient, TaskLogPusher taskLogPusher, @@ -189,6 +192,7 @@ public TaskToolboxFactory( this.appenderatorsManager = appenderatorsManager; this.overlordClient = overlordClient; this.coordinatorClient = coordinatorClient; + this.routerClient = routerClient; this.supervisorTaskClientProvider = supervisorTaskClientProvider; this.shuffleClient = shuffleClient; this.taskLogPusher = taskLogPusher; @@ -251,6 +255,7 @@ public TaskToolbox build(TaskConfig config, Task task) // Calls will still eventually fail if problems persist. .overlordClient(overlordClient.withRetryPolicy(StandardRetryPolicy.aboutAnHour())) .coordinatorClient(coordinatorClient.withRetryPolicy(StandardRetryPolicy.aboutAnHour())) + .routerClient(routerClient) .supervisorTaskClientProvider(supervisorTaskClientProvider) .shuffleClient(shuffleClient) .taskLogPusher(taskLogPusher) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index c068aa1433fb..2005f0b513d7 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -30,6 +30,7 @@ import com.google.common.collect.BiMap; import com.google.common.collect.HashBiMap; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; @@ -46,6 +47,7 @@ import org.apache.druid.indexer.Checks; import org.apache.druid.indexer.Property; import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec; import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; import org.apache.druid.indexer.partitions.PartitionsSpec; import org.apache.druid.indexing.common.LockGranularity; @@ -93,10 +95,12 @@ import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory; import org.apache.druid.server.coordinator.duty.CompactSegments; import org.apache.druid.server.security.ResourceAction; +import org.apache.druid.sql.http.SqlQuery; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentTimeline; import org.apache.druid.timeline.TimelineObjectHolder; import org.apache.druid.timeline.VersionedIntervalTimeline; +import org.apache.druid.utils.CollectionUtils; import org.joda.time.Duration; import org.joda.time.Interval; @@ -104,6 +108,7 @@ import javax.annotation.Nullable; import java.io.File; import java.io.IOException; +import java.sql.Timestamp; import java.time.Clock; import java.util.ArrayList; import java.util.Arrays; @@ -473,6 +478,45 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception segmentCacheManagerFactory, getMetricBuilder() ); + + if (getContextValue(Tasks.USE_MSQ_COMPACTION, false)) { + for (ParallelIndexIngestionSpec ingestionSpec : ingestionSpecs) { + final PartitionsSpec partitionsSpec = ingestionSpec.getTuningConfig().getPartitionsSpec(); + List clusteredBy = new ArrayList<>(); + if (partitionsSpec instanceof DimensionRangePartitionsSpec) { + clusteredBy.addAll(((DimensionRangePartitionsSpec) partitionsSpec).getPartitionDimensions()); + } + final MSQReplaceCompaction msqReplaceQuery = new MSQReplaceCompaction( + getDataSource(), + ((DruidInputSource) ingestionSpec.getIOConfig().getInputSource()).getInterval(), + ingestionSpec.getDataSchema().getDimensionsSpec().getDimensionNames(), + Arrays.stream(ingestionSpec.getDataSchema().getAggregators()) + .map(AggregatorFactory::getName) + .collect(Collectors.toList()), + GranularityType + .fromGranularity(ingestionSpec.getDataSchema().getGranularitySpec().getQueryGranularity()), + GranularityType + .fromGranularity(ingestionSpec.getDataSchema().getGranularitySpec().getSegmentGranularity()), + ingestionSpec.getDataSchema().getGranularitySpec().isRollup(), + clusteredBy, + ingestionSpec.getTuningConfig().getMaxNumConcurrentSubTasks(), + getPriority() + ); + final SqlQuery sqlQuery = new SqlQuery( + msqReplaceQuery.buildQuery(), + null, + true, + true, + true, + msqReplaceQuery.getContext(), + null + ); + log.info("Submitting SQL replace: [%s]", sqlQuery); + toolbox.getRouterClient().runQuery(sqlQuery).get(); + } + return TaskStatus.success(getId()); + } + final List indexTaskSpecs = IntStream .range(0, ingestionSpecs.size()) .mapToObj(i -> { @@ -1338,6 +1382,199 @@ public CompactionTask build() } } + public static class MSQReplaceCompaction + { + private static final String MAX_NUM_TASKS = "maxNumTasks"; + + private static final String FINALIZE_AGGREGATIONS = "finalizeAggregations"; + + private static final String SOURCE_TYPE = "sourceType"; + + private static final Set QUERY_GRANULARITIES = ImmutableSet.of( + GranularityType.NONE, + GranularityType.SECOND, + GranularityType.MINUTE, + GranularityType.HOUR, + GranularityType.DAY, + GranularityType.MONTH, + GranularityType.QUARTER, + GranularityType.YEAR + ); + + private static final Set SEGMENT_GRANULARITIES = ImmutableSet.of( + GranularityType.NONE, + GranularityType.SECOND, + GranularityType.MINUTE, + GranularityType.HOUR, + GranularityType.DAY, + GranularityType.MONTH, + GranularityType.QUARTER, + GranularityType.YEAR, + GranularityType.ALL + ); + + private final List dimensions; + + private final List metrics; + + private final Interval interval; + + private final String datasource; + + private final GranularityType queryGranularity; + + private final GranularityType segmentGranularity; + + private final boolean rollup; + + private final List clusterByDimensions; + + private final Map context; + + public MSQReplaceCompaction( + String datasource, + Interval interval, + List dimensions, + List metrics, + GranularityType queryGranularity, + GranularityType segmentGranularity, + boolean rollup, + List clusterByDimensions, + int numSubTasks, + int priority + ) + { + this.context = ImmutableMap.of( + MAX_NUM_TASKS, + numSubTasks + 1, + FINALIZE_AGGREGATIONS, + false, + SOURCE_TYPE, + "sql", + Tasks.PRIORITY_KEY, + priority + ); + this.datasource = datasource; + this.interval = interval; + this.dimensions = dimensions; + this.metrics = metrics; + if (queryGranularity != null && !QUERY_GRANULARITIES.contains(queryGranularity)) { + throw new IAE("Invalid query granularity: " + queryGranularity); + } + this.queryGranularity = queryGranularity; + if (!SEGMENT_GRANULARITIES.contains(segmentGranularity)) { + throw new IAE("Invalid segment granularity: " + segmentGranularity); + } + this.segmentGranularity = segmentGranularity; + this.rollup = rollup; + this.clusterByDimensions = clusterByDimensions; + } + + public Map getContext() + { + return this.context; + } + + public String buildQuery() + { + return buildFrom( + datasource, + makeWhereExpression(), + makeSelectExpression(), + makeFromExpression(), + makeGroupByExpression(), + makePartitionedByExpression(), + makeClusteredByExpression() + ); + } + + private String buildFrom( + final String datasource, + final String whereExpression, + final String selectExpression, + final String fromExpression, + final String groupByExpression, + final String partitionedByExpression, + final String clusteredByExpression + ) + { + return "REPLACE INTO " + datasource + "\n" + + "OVERWRITE " + whereExpression + "\n" + + selectExpression + "\n" + + fromExpression + "\n" + + groupByExpression + + partitionedByExpression + + clusteredByExpression; + } + + private String makeWhereExpression() + { + final String start = Arrays.stream(new Timestamp(interval.getStartMillis()).toString().split("\\.")) + .findFirst().get(); + final String end = Arrays.stream(new Timestamp(interval.getEndMillis()).toString().split("\\.")) + .findFirst().get(); + + return "WHERE __time >= TIMESTAMP '" + start + "'" + + " AND __time < TIMESTAMP '" + end + "'"; + } + + private String makeTimeExpression() + { + if (queryGranularity == null || GranularityType.NONE.equals(queryGranularity)) { + return "__time"; + } else { + return "FLOOR (__time TO " + queryGranularity + ") AS __time"; + } + } + + private String makeSelectExpression() + { + StringBuilder selectBuilder = new StringBuilder(); + selectBuilder.append("SELECT ").append(makeTimeExpression()); + for (String dimension : dimensions) { + selectBuilder.append(", "); + selectBuilder.append(dimension); + } + for (String metric : metrics) { + selectBuilder.append(", "); + selectBuilder.append(metric); + } + return selectBuilder.toString(); + } + + private String makeFromExpression() + { + return "FROM " + datasource + " " + makeWhereExpression(); + } + + private String makeGroupByExpression() + { + if (rollup) { + return "GROUP BY " + IntStream.rangeClosed(1, 1 + dimensions.size()) + .mapToObj(String::valueOf) + .collect(Collectors.joining(", ")) + + "\n"; + } else { + return ""; + } + } + + private String makePartitionedByExpression() + { + return "PARTITIONED BY " + segmentGranularity; + } + + private String makeClusteredByExpression() + { + if (CollectionUtils.isNullOrEmpty(clusterByDimensions)) { + return ""; + } else { + return "\n" + "CLUSTERED BY " + String.join(", ", clusterByDimensions) + "\n"; + } + } + } + + /** * Compcation Task Tuning Config. * diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java index 90b752697780..5a5b44ea7f6d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java @@ -53,6 +53,7 @@ public class Tasks public static final String FORCE_TIME_CHUNK_LOCK_KEY = "forceTimeChunkLock"; public static final String USE_SHARED_LOCK = "useSharedLock"; public static final String STORE_EMPTY_COLUMNS_KEY = "storeEmptyColumns"; + public static final String USE_MSQ_COMPACTION = "useMSQCompaction"; /** * Context flag denoting if maximum possible values should be used to estimate diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java index aaeee9fcd543..6593901046f2 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java @@ -25,6 +25,7 @@ import org.apache.druid.client.cache.CachePopulatorStats; import org.apache.druid.client.coordinator.NoopCoordinatorClient; import org.apache.druid.client.indexing.NoopOverlordClient; +import org.apache.druid.client.indexing.NoopRouterClient; import org.apache.druid.indexing.common.actions.TaskActionClientFactory; import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.common.config.TaskConfigBuilder; @@ -148,6 +149,7 @@ public void setUp() throws IOException new TestAppenderatorsManager(), new NoopOverlordClient(), new NoopCoordinatorClient(), + new NoopRouterClient(), null, null, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java index 7ea82b8f57b0..cce13af921f7 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java @@ -33,6 +33,7 @@ import org.apache.druid.client.cache.MapCache; import org.apache.druid.client.coordinator.NoopCoordinatorClient; import org.apache.druid.client.indexing.NoopOverlordClient; +import org.apache.druid.client.indexing.NoopRouterClient; import org.apache.druid.common.config.NullHandling; import org.apache.druid.data.input.Firehose; import org.apache.druid.data.input.FirehoseFactory; @@ -1641,6 +1642,7 @@ public void close() new TestAppenderatorsManager(), new NoopOverlordClient(), new NoopCoordinatorClient(), + new NoopRouterClient(), null, null, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java index c7f6168d85da..46e5174d6325 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java @@ -83,6 +83,7 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularity; +import org.apache.druid.java.util.common.granularity.GranularityType; import org.apache.druid.java.util.common.granularity.PeriodGranularity; import org.apache.druid.java.util.common.guava.Comparators; import org.apache.druid.java.util.emitter.core.NoopEmitter; @@ -439,6 +440,21 @@ public void testCompactionTaskEmitter() ServiceEmitter noopEmitter = new ServiceEmitter("service", "host", new NoopEmitter()); taskCreatedWithSegmentGranularity.emitCompactIngestionModeMetrics(noopEmitter, false); taskCreatedWithSegmentGranularity.emitCompactIngestionModeMetrics(noopEmitter, true); + + final CompactionTask.MSQReplaceCompaction msqCompaction = new CompactionTask.MSQReplaceCompaction( + DATA_SOURCE, + COMPACTION_INTERVAL, + ImmutableList.of("d1", "d2", "d3", "d4"), + ImmutableList.of("m1", "m2"), + null, + GranularityType.HOUR, + false, + ImmutableList.of("d1"), + 3, + 25 + ); + System.out.println(msqCompaction.buildQuery()); + System.out.println(msqCompaction.getContext()); } @Test(expected = IAE.class) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java index 9881561d61f1..781fbfa36e40 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java @@ -32,6 +32,7 @@ import org.apache.druid.client.cache.MapCache; import org.apache.druid.client.coordinator.NoopCoordinatorClient; import org.apache.druid.client.indexing.NoopOverlordClient; +import org.apache.druid.client.indexing.NoopRouterClient; import org.apache.druid.common.config.NullHandling; import org.apache.druid.data.input.FirehoseFactory; import org.apache.druid.data.input.impl.DimensionsSpec; @@ -1013,6 +1014,7 @@ public void close() new TestAppenderatorsManager(), new NoopOverlordClient(), new NoopCoordinatorClient(), + new NoopRouterClient(), null, null, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java index e683c828ddc6..3d94fcda7a75 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java @@ -22,6 +22,7 @@ import com.google.common.util.concurrent.ListenableFuture; import org.apache.druid.client.coordinator.NoopCoordinatorClient; import org.apache.druid.client.indexing.NoopOverlordClient; +import org.apache.druid.client.indexing.NoopRouterClient; import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatus; @@ -131,6 +132,7 @@ public void setup() throws IOException new TestAppenderatorsManager(), new NoopOverlordClient(), new NoopCoordinatorClient(), + new NoopRouterClient(), null, null, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java index 31ec645b3fdd..c6333a44b14c 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java @@ -36,6 +36,7 @@ import org.apache.druid.client.cache.MapCache; import org.apache.druid.client.coordinator.NoopCoordinatorClient; import org.apache.druid.client.indexing.NoopOverlordClient; +import org.apache.druid.client.indexing.NoopRouterClient; import org.apache.druid.data.input.AbstractInputSource; import org.apache.druid.data.input.Firehose; import org.apache.druid.data.input.FirehoseFactory; @@ -695,6 +696,7 @@ public void unannounceSegments(Iterable segments) appenderatorsManager, new NoopOverlordClient(), new NoopCoordinatorClient(), + new NoopRouterClient(), null, null, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskToolboxFactory.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskToolboxFactory.java index 5c6afdbb61b1..fb8b7f8d42f3 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskToolboxFactory.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskToolboxFactory.java @@ -27,6 +27,7 @@ import org.apache.druid.client.coordinator.CoordinatorClient; import org.apache.druid.client.coordinator.NoopCoordinatorClient; import org.apache.druid.client.indexing.NoopOverlordClient; +import org.apache.druid.client.indexing.NoopRouterClient; import org.apache.druid.discovery.DataNodeService; import org.apache.druid.discovery.DruidNodeAnnouncer; import org.apache.druid.discovery.LookupNodeService; @@ -45,6 +46,7 @@ import org.apache.druid.query.QueryProcessingPool; import org.apache.druid.query.QueryRunnerFactoryConglomerate; import org.apache.druid.rpc.indexing.OverlordClient; +import org.apache.druid.rpc.indexing.RouterClient; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.IndexMergerV9Factory; import org.apache.druid.segment.TestHelper; @@ -111,6 +113,7 @@ public TestTaskToolboxFactory( bob.appenderatorsManager, bob.overlordClient, bob.coordinatorClient, + bob.routerClient, bob.supervisorTaskClientProvider, bob.shuffleClient, bob.taskLogPusher, @@ -154,6 +157,7 @@ public static class Builder private AppenderatorsManager appenderatorsManager; private OverlordClient overlordClient = new NoopOverlordClient(); private CoordinatorClient coordinatorClient = new NoopCoordinatorClient(); + private RouterClient routerClient = new NoopRouterClient(); private ParallelIndexSupervisorTaskClientProvider supervisorTaskClientProvider; private ShuffleClient shuffleClient; private TaskLogPusher taskLogPusher; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java index 6be23407a418..f4f9c0fe5eab 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java @@ -36,6 +36,7 @@ import org.apache.druid.client.cache.MapCache; import org.apache.druid.client.coordinator.NoopCoordinatorClient; import org.apache.druid.client.indexing.NoopOverlordClient; +import org.apache.druid.client.indexing.NoopRouterClient; import org.apache.druid.common.config.NullHandling; import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.impl.ByteEntity; @@ -696,6 +697,7 @@ public void close() new TestAppenderatorsManager(), new NoopOverlordClient(), new NoopCoordinatorClient(), + new NoopRouterClient(), null, null, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java index d36bf1c04bab..b949a8aa0264 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java @@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableSet; import com.google.common.util.concurrent.Futures; import org.apache.druid.client.coordinator.NoopCoordinatorClient; +import org.apache.druid.client.indexing.NoopRouterClient; import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatus; @@ -162,6 +163,7 @@ private WorkerTaskManager createWorkerTaskManager() new TestAppenderatorsManager(), overlordClient, new NoopCoordinatorClient(), + new NoopRouterClient(), null, null, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java index aecfe29ab1fa..1357b05f72bb 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java @@ -28,6 +28,7 @@ import org.apache.curator.test.TestingCluster; import org.apache.druid.client.coordinator.NoopCoordinatorClient; import org.apache.druid.client.indexing.NoopOverlordClient; +import org.apache.druid.client.indexing.NoopRouterClient; import org.apache.druid.curator.PotentiallyGzippedCompressionProvider; import org.apache.druid.indexer.TaskState; import org.apache.druid.indexing.common.IndexingServiceCondition; @@ -204,6 +205,7 @@ private WorkerTaskMonitor createTaskMonitor() new TestAppenderatorsManager(), new NoopOverlordClient(), new NoopCoordinatorClient(), + new NoopRouterClient(), null, null, null, diff --git a/processing/src/main/java/org/apache/druid/java/util/common/granularity/GranularityType.java b/processing/src/main/java/org/apache/druid/java/util/common/granularity/GranularityType.java index b4b78390605d..a68db432b93f 100644 --- a/processing/src/main/java/org/apache/druid/java/util/common/granularity/GranularityType.java +++ b/processing/src/main/java/org/apache/druid/java/util/common/granularity/GranularityType.java @@ -185,6 +185,17 @@ public static boolean isStandard(Granularity granularity) return false; } + public static GranularityType fromGranularity(Granularity granularity) + { + final GranularityType[] values = GranularityType.values(); + for (GranularityType value : values) { + if (value.getDefaultGranularity().equals(granularity)) { + return value; + } + } + return null; + } + /** * Note: This is only an estimate based on the values in period. * This will not work for complicated periods that represent say 1 year 1 day diff --git a/server/src/main/java/org/apache/druid/rpc/guice/ServiceClientModule.java b/server/src/main/java/org/apache/druid/rpc/guice/ServiceClientModule.java index 51dd2b89d736..1882a52a66fd 100644 --- a/server/src/main/java/org/apache/druid/rpc/guice/ServiceClientModule.java +++ b/server/src/main/java/org/apache/druid/rpc/guice/ServiceClientModule.java @@ -42,6 +42,9 @@ import org.apache.druid.rpc.StandardRetryPolicy; import org.apache.druid.rpc.indexing.OverlordClient; import org.apache.druid.rpc.indexing.OverlordClientImpl; +import org.apache.druid.rpc.indexing.RouterClient; +import org.apache.druid.rpc.indexing.RouterClientImpl; +import org.apache.druid.server.router.Router; import java.util.concurrent.ScheduledExecutorService; @@ -117,4 +120,30 @@ public CoordinatorClient makeCoordinatorClient( jsonMapper ); } + + @Provides + @ManageLifecycle + @Router + public ServiceLocator makeRouterServiceLocator(final DruidNodeDiscoveryProvider discoveryProvider) + { + return new DiscoveryServiceLocator(discoveryProvider, NodeRole.ROUTER); + } + + @Provides + @LazySingleton + public RouterClient makeRouterClient( + @Json final ObjectMapper jsonMapper, + @EscalatedGlobal final ServiceClientFactory clientFactory, + @Router final ServiceLocator serviceLocator + ) + { + return new RouterClientImpl( + clientFactory.makeClient( + NodeRole.ROUTER.getJsonName(), + serviceLocator, + StandardRetryPolicy.builder().maxAttempts(CLIENT_MAX_ATTEMPTS).build() + ), + jsonMapper + ); + } } diff --git a/server/src/main/java/org/apache/druid/rpc/indexing/RouterClient.java b/server/src/main/java/org/apache/druid/rpc/indexing/RouterClient.java new file mode 100644 index 000000000000..517c8db815f4 --- /dev/null +++ b/server/src/main/java/org/apache/druid/rpc/indexing/RouterClient.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.rpc.indexing; + + +import com.google.common.util.concurrent.ListenableFuture; + +/** + * High-level Router client. + * + */ +public interface RouterClient +{ + /** + * Run a given query on a router + * + * @return + */ + ListenableFuture runQuery(Object query); +} diff --git a/server/src/main/java/org/apache/druid/rpc/indexing/RouterClientImpl.java b/server/src/main/java/org/apache/druid/rpc/indexing/RouterClientImpl.java new file mode 100644 index 000000000000..5fde61be648b --- /dev/null +++ b/server/src/main/java/org/apache/druid/rpc/indexing/RouterClientImpl.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.rpc.indexing; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.druid.common.guava.FutureUtils; +import org.apache.druid.java.util.http.client.response.BytesFullResponseHandler; +import org.apache.druid.rpc.RequestBuilder; +import org.apache.druid.rpc.ServiceClient; +import org.jboss.netty.handler.codec.http.HttpMethod; + +/** + * Production implementation of {@link RouterClient}. + */ +public class RouterClientImpl implements RouterClient +{ + private final ServiceClient client; + private final ObjectMapper jsonMapper; + + public RouterClientImpl(final ServiceClient client, final ObjectMapper jsonMapper) + { + this.client = Preconditions.checkNotNull(client, "client"); + this.jsonMapper = Preconditions.checkNotNull(jsonMapper, "jsonMapper"); + } + + @Override + public ListenableFuture runQuery(final Object query) + { + return FutureUtils.transform( + client.asyncRequest( + new RequestBuilder(HttpMethod.POST, "/druid/v2/sql/task") + .jsonContent(jsonMapper, query), + new BytesFullResponseHandler() + ), + holder -> null + ); + } +} diff --git a/server/src/test/java/org/apache/druid/client/indexing/NoopRouterClient.java b/server/src/test/java/org/apache/druid/client/indexing/NoopRouterClient.java new file mode 100644 index 000000000000..35faf98eca51 --- /dev/null +++ b/server/src/test/java/org/apache/druid/client/indexing/NoopRouterClient.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.client.indexing; + +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.druid.rpc.indexing.RouterClient; + +public class NoopRouterClient implements RouterClient +{ + @Override + public ListenableFuture runQuery(Object query) + { + throw new UnsupportedOperationException(); + } +}