diff --git a/indexing-service/pom.xml b/indexing-service/pom.xml
index 067abfdb0fb2..29cd39f165ad 100644
--- a/indexing-service/pom.xml
+++ b/indexing-service/pom.xml
@@ -48,6 +48,11 @@
druid-indexing-hadoop
${project.parent.version}
+
+ org.apache.druid
+ druid-sql
+ ${project.parent.version}
+
io.dropwizard.metrics
metrics-core
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java
index 9ebfb96b5677..6be1c0085338 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java
@@ -45,6 +45,7 @@
import org.apache.druid.query.QueryProcessingPool;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.rpc.indexing.OverlordClient;
+import org.apache.druid.rpc.indexing.RouterClient;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMergerV9;
import org.apache.druid.segment.handoff.SegmentHandoffNotifierFactory;
@@ -120,6 +121,7 @@ public class TaskToolbox
private final AppenderatorsManager appenderatorsManager;
private final OverlordClient overlordClient;
private final CoordinatorClient coordinatorClient;
+ private final RouterClient routerClient;
// Used by only native parallel tasks
private final IntermediaryDataManager intermediaryDataManager;
@@ -165,6 +167,7 @@ public TaskToolbox(
AppenderatorsManager appenderatorsManager,
OverlordClient overlordClient,
CoordinatorClient coordinatorClient,
+ RouterClient routerClient,
ParallelIndexSupervisorTaskClientProvider supervisorTaskClientProvider,
ShuffleClient shuffleClient,
TaskLogPusher taskLogPusher,
@@ -207,6 +210,7 @@ public TaskToolbox(
this.appenderatorsManager = appenderatorsManager;
this.overlordClient = overlordClient;
this.coordinatorClient = coordinatorClient;
+ this.routerClient = routerClient;
this.supervisorTaskClientProvider = supervisorTaskClientProvider;
this.shuffleClient = shuffleClient;
this.taskLogPusher = taskLogPusher;
@@ -448,6 +452,11 @@ public CoordinatorClient getCoordinatorClient()
return coordinatorClient;
}
+ public RouterClient getRouterClient()
+ {
+ return routerClient;
+ }
+
public ParallelIndexSupervisorTaskClientProvider getSupervisorTaskClientProvider()
{
return supervisorTaskClientProvider;
@@ -538,6 +547,7 @@ public static class Builder
private AppenderatorsManager appenderatorsManager;
private OverlordClient overlordClient;
private CoordinatorClient coordinatorClient;
+ private RouterClient routerClient;
private IntermediaryDataManager intermediaryDataManager;
private ParallelIndexSupervisorTaskClientProvider supervisorTaskClientProvider;
private ShuffleClient shuffleClient;
@@ -584,6 +594,7 @@ public Builder(TaskToolbox other)
this.appenderatorsManager = other.appenderatorsManager;
this.overlordClient = other.overlordClient;
this.coordinatorClient = other.coordinatorClient;
+ this.routerClient = other.routerClient;
this.intermediaryDataManager = other.intermediaryDataManager;
this.supervisorTaskClientProvider = other.supervisorTaskClientProvider;
this.shuffleClient = other.shuffleClient;
@@ -793,6 +804,12 @@ public Builder coordinatorClient(final CoordinatorClient coordinatorClient)
return this;
}
+ public Builder routerClient(final RouterClient routerClient)
+ {
+ this.routerClient = routerClient;
+ return this;
+ }
+
public Builder intermediaryDataManager(final IntermediaryDataManager intermediaryDataManager)
{
this.intermediaryDataManager = intermediaryDataManager;
@@ -861,6 +878,7 @@ public TaskToolbox build()
appenderatorsManager,
overlordClient,
coordinatorClient,
+ routerClient,
supervisorTaskClientProvider,
shuffleClient,
taskLogPusher,
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java
index 288d89919b98..a9e87566c01f 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java
@@ -47,6 +47,7 @@
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.rpc.StandardRetryPolicy;
import org.apache.druid.rpc.indexing.OverlordClient;
+import org.apache.druid.rpc.indexing.RouterClient;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMergerV9Factory;
import org.apache.druid.segment.handoff.SegmentHandoffNotifierFactory;
@@ -105,6 +106,7 @@ public class TaskToolboxFactory
private final AppenderatorsManager appenderatorsManager;
private final OverlordClient overlordClient;
private final CoordinatorClient coordinatorClient;
+ private final RouterClient routerClient;
// Used by only native parallel tasks
private final IntermediaryDataManager intermediaryDataManager;
@@ -149,6 +151,7 @@ public TaskToolboxFactory(
AppenderatorsManager appenderatorsManager,
OverlordClient overlordClient,
CoordinatorClient coordinatorClient,
+ RouterClient routerClient,
ParallelIndexSupervisorTaskClientProvider supervisorTaskClientProvider,
ShuffleClient shuffleClient,
TaskLogPusher taskLogPusher,
@@ -189,6 +192,7 @@ public TaskToolboxFactory(
this.appenderatorsManager = appenderatorsManager;
this.overlordClient = overlordClient;
this.coordinatorClient = coordinatorClient;
+ this.routerClient = routerClient;
this.supervisorTaskClientProvider = supervisorTaskClientProvider;
this.shuffleClient = shuffleClient;
this.taskLogPusher = taskLogPusher;
@@ -251,6 +255,7 @@ public TaskToolbox build(TaskConfig config, Task task)
// Calls will still eventually fail if problems persist.
.overlordClient(overlordClient.withRetryPolicy(StandardRetryPolicy.aboutAnHour()))
.coordinatorClient(coordinatorClient.withRetryPolicy(StandardRetryPolicy.aboutAnHour()))
+ .routerClient(routerClient)
.supervisorTaskClientProvider(supervisorTaskClientProvider)
.shuffleClient(shuffleClient)
.taskLogPusher(taskLogPusher)
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
index c068aa1433fb..2005f0b513d7 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
@@ -30,6 +30,7 @@
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
@@ -46,6 +47,7 @@
import org.apache.druid.indexer.Checks;
import org.apache.druid.indexer.Property;
import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexing.common.LockGranularity;
@@ -93,10 +95,12 @@
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.apache.druid.server.coordinator.duty.CompactSegments;
import org.apache.druid.server.security.ResourceAction;
+import org.apache.druid.sql.http.SqlQuery;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentTimeline;
import org.apache.druid.timeline.TimelineObjectHolder;
import org.apache.druid.timeline.VersionedIntervalTimeline;
+import org.apache.druid.utils.CollectionUtils;
import org.joda.time.Duration;
import org.joda.time.Interval;
@@ -104,6 +108,7 @@
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
+import java.sql.Timestamp;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Arrays;
@@ -473,6 +478,45 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception
segmentCacheManagerFactory,
getMetricBuilder()
);
+
+ if (getContextValue(Tasks.USE_MSQ_COMPACTION, false)) {
+ for (ParallelIndexIngestionSpec ingestionSpec : ingestionSpecs) {
+ final PartitionsSpec partitionsSpec = ingestionSpec.getTuningConfig().getPartitionsSpec();
+ List clusteredBy = new ArrayList<>();
+ if (partitionsSpec instanceof DimensionRangePartitionsSpec) {
+ clusteredBy.addAll(((DimensionRangePartitionsSpec) partitionsSpec).getPartitionDimensions());
+ }
+ final MSQReplaceCompaction msqReplaceQuery = new MSQReplaceCompaction(
+ getDataSource(),
+ ((DruidInputSource) ingestionSpec.getIOConfig().getInputSource()).getInterval(),
+ ingestionSpec.getDataSchema().getDimensionsSpec().getDimensionNames(),
+ Arrays.stream(ingestionSpec.getDataSchema().getAggregators())
+ .map(AggregatorFactory::getName)
+ .collect(Collectors.toList()),
+ GranularityType
+ .fromGranularity(ingestionSpec.getDataSchema().getGranularitySpec().getQueryGranularity()),
+ GranularityType
+ .fromGranularity(ingestionSpec.getDataSchema().getGranularitySpec().getSegmentGranularity()),
+ ingestionSpec.getDataSchema().getGranularitySpec().isRollup(),
+ clusteredBy,
+ ingestionSpec.getTuningConfig().getMaxNumConcurrentSubTasks(),
+ getPriority()
+ );
+ final SqlQuery sqlQuery = new SqlQuery(
+ msqReplaceQuery.buildQuery(),
+ null,
+ true,
+ true,
+ true,
+ msqReplaceQuery.getContext(),
+ null
+ );
+ log.info("Submitting SQL replace: [%s]", sqlQuery);
+ toolbox.getRouterClient().runQuery(sqlQuery).get();
+ }
+ return TaskStatus.success(getId());
+ }
+
final List indexTaskSpecs = IntStream
.range(0, ingestionSpecs.size())
.mapToObj(i -> {
@@ -1338,6 +1382,199 @@ public CompactionTask build()
}
}
+ public static class MSQReplaceCompaction
+ {
+ private static final String MAX_NUM_TASKS = "maxNumTasks";
+
+ private static final String FINALIZE_AGGREGATIONS = "finalizeAggregations";
+
+ private static final String SOURCE_TYPE = "sourceType";
+
+ private static final Set QUERY_GRANULARITIES = ImmutableSet.of(
+ GranularityType.NONE,
+ GranularityType.SECOND,
+ GranularityType.MINUTE,
+ GranularityType.HOUR,
+ GranularityType.DAY,
+ GranularityType.MONTH,
+ GranularityType.QUARTER,
+ GranularityType.YEAR
+ );
+
+ private static final Set SEGMENT_GRANULARITIES = ImmutableSet.of(
+ GranularityType.NONE,
+ GranularityType.SECOND,
+ GranularityType.MINUTE,
+ GranularityType.HOUR,
+ GranularityType.DAY,
+ GranularityType.MONTH,
+ GranularityType.QUARTER,
+ GranularityType.YEAR,
+ GranularityType.ALL
+ );
+
+ private final List dimensions;
+
+ private final List metrics;
+
+ private final Interval interval;
+
+ private final String datasource;
+
+ private final GranularityType queryGranularity;
+
+ private final GranularityType segmentGranularity;
+
+ private final boolean rollup;
+
+ private final List clusterByDimensions;
+
+ private final Map context;
+
+ public MSQReplaceCompaction(
+ String datasource,
+ Interval interval,
+ List dimensions,
+ List metrics,
+ GranularityType queryGranularity,
+ GranularityType segmentGranularity,
+ boolean rollup,
+ List clusterByDimensions,
+ int numSubTasks,
+ int priority
+ )
+ {
+ this.context = ImmutableMap.of(
+ MAX_NUM_TASKS,
+ numSubTasks + 1,
+ FINALIZE_AGGREGATIONS,
+ false,
+ SOURCE_TYPE,
+ "sql",
+ Tasks.PRIORITY_KEY,
+ priority
+ );
+ this.datasource = datasource;
+ this.interval = interval;
+ this.dimensions = dimensions;
+ this.metrics = metrics;
+ if (queryGranularity != null && !QUERY_GRANULARITIES.contains(queryGranularity)) {
+ throw new IAE("Invalid query granularity: " + queryGranularity);
+ }
+ this.queryGranularity = queryGranularity;
+ if (!SEGMENT_GRANULARITIES.contains(segmentGranularity)) {
+ throw new IAE("Invalid segment granularity: " + segmentGranularity);
+ }
+ this.segmentGranularity = segmentGranularity;
+ this.rollup = rollup;
+ this.clusterByDimensions = clusterByDimensions;
+ }
+
+ public Map getContext()
+ {
+ return this.context;
+ }
+
+ public String buildQuery()
+ {
+ return buildFrom(
+ datasource,
+ makeWhereExpression(),
+ makeSelectExpression(),
+ makeFromExpression(),
+ makeGroupByExpression(),
+ makePartitionedByExpression(),
+ makeClusteredByExpression()
+ );
+ }
+
+ private String buildFrom(
+ final String datasource,
+ final String whereExpression,
+ final String selectExpression,
+ final String fromExpression,
+ final String groupByExpression,
+ final String partitionedByExpression,
+ final String clusteredByExpression
+ )
+ {
+ return "REPLACE INTO " + datasource + "\n"
+ + "OVERWRITE " + whereExpression + "\n"
+ + selectExpression + "\n"
+ + fromExpression + "\n"
+ + groupByExpression
+ + partitionedByExpression
+ + clusteredByExpression;
+ }
+
+ private String makeWhereExpression()
+ {
+ final String start = Arrays.stream(new Timestamp(interval.getStartMillis()).toString().split("\\."))
+ .findFirst().get();
+ final String end = Arrays.stream(new Timestamp(interval.getEndMillis()).toString().split("\\."))
+ .findFirst().get();
+
+ return "WHERE __time >= TIMESTAMP '" + start + "'"
+ + " AND __time < TIMESTAMP '" + end + "'";
+ }
+
+ private String makeTimeExpression()
+ {
+ if (queryGranularity == null || GranularityType.NONE.equals(queryGranularity)) {
+ return "__time";
+ } else {
+ return "FLOOR (__time TO " + queryGranularity + ") AS __time";
+ }
+ }
+
+ private String makeSelectExpression()
+ {
+ StringBuilder selectBuilder = new StringBuilder();
+ selectBuilder.append("SELECT ").append(makeTimeExpression());
+ for (String dimension : dimensions) {
+ selectBuilder.append(", ");
+ selectBuilder.append(dimension);
+ }
+ for (String metric : metrics) {
+ selectBuilder.append(", ");
+ selectBuilder.append(metric);
+ }
+ return selectBuilder.toString();
+ }
+
+ private String makeFromExpression()
+ {
+ return "FROM " + datasource + " " + makeWhereExpression();
+ }
+
+ private String makeGroupByExpression()
+ {
+ if (rollup) {
+ return "GROUP BY " + IntStream.rangeClosed(1, 1 + dimensions.size())
+ .mapToObj(String::valueOf)
+ .collect(Collectors.joining(", "))
+ + "\n";
+ } else {
+ return "";
+ }
+ }
+
+ private String makePartitionedByExpression()
+ {
+ return "PARTITIONED BY " + segmentGranularity;
+ }
+
+ private String makeClusteredByExpression()
+ {
+ if (CollectionUtils.isNullOrEmpty(clusterByDimensions)) {
+ return "";
+ } else {
+ return "\n" + "CLUSTERED BY " + String.join(", ", clusterByDimensions) + "\n";
+ }
+ }
+ }
+
+
/**
* Compcation Task Tuning Config.
*
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java
index 90b752697780..5a5b44ea7f6d 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java
@@ -53,6 +53,7 @@ public class Tasks
public static final String FORCE_TIME_CHUNK_LOCK_KEY = "forceTimeChunkLock";
public static final String USE_SHARED_LOCK = "useSharedLock";
public static final String STORE_EMPTY_COLUMNS_KEY = "storeEmptyColumns";
+ public static final String USE_MSQ_COMPACTION = "useMSQCompaction";
/**
* Context flag denoting if maximum possible values should be used to estimate
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java
index aaeee9fcd543..6593901046f2 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java
@@ -25,6 +25,7 @@
import org.apache.druid.client.cache.CachePopulatorStats;
import org.apache.druid.client.coordinator.NoopCoordinatorClient;
import org.apache.druid.client.indexing.NoopOverlordClient;
+import org.apache.druid.client.indexing.NoopRouterClient;
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskConfigBuilder;
@@ -148,6 +149,7 @@ public void setUp() throws IOException
new TestAppenderatorsManager(),
new NoopOverlordClient(),
new NoopCoordinatorClient(),
+ new NoopRouterClient(),
null,
null,
null,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
index 7ea82b8f57b0..cce13af921f7 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
@@ -33,6 +33,7 @@
import org.apache.druid.client.cache.MapCache;
import org.apache.druid.client.coordinator.NoopCoordinatorClient;
import org.apache.druid.client.indexing.NoopOverlordClient;
+import org.apache.druid.client.indexing.NoopRouterClient;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.Firehose;
import org.apache.druid.data.input.FirehoseFactory;
@@ -1641,6 +1642,7 @@ public void close()
new TestAppenderatorsManager(),
new NoopOverlordClient(),
new NoopCoordinatorClient(),
+ new NoopRouterClient(),
null,
null,
null,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
index c7f6168d85da..46e5174d6325 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
@@ -83,6 +83,7 @@
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.java.util.common.granularity.GranularityType;
import org.apache.druid.java.util.common.granularity.PeriodGranularity;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.java.util.emitter.core.NoopEmitter;
@@ -439,6 +440,21 @@ public void testCompactionTaskEmitter()
ServiceEmitter noopEmitter = new ServiceEmitter("service", "host", new NoopEmitter());
taskCreatedWithSegmentGranularity.emitCompactIngestionModeMetrics(noopEmitter, false);
taskCreatedWithSegmentGranularity.emitCompactIngestionModeMetrics(noopEmitter, true);
+
+ final CompactionTask.MSQReplaceCompaction msqCompaction = new CompactionTask.MSQReplaceCompaction(
+ DATA_SOURCE,
+ COMPACTION_INTERVAL,
+ ImmutableList.of("d1", "d2", "d3", "d4"),
+ ImmutableList.of("m1", "m2"),
+ null,
+ GranularityType.HOUR,
+ false,
+ ImmutableList.of("d1"),
+ 3,
+ 25
+ );
+ System.out.println(msqCompaction.buildQuery());
+ System.out.println(msqCompaction.getContext());
}
@Test(expected = IAE.class)
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java
index 9881561d61f1..781fbfa36e40 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java
@@ -32,6 +32,7 @@
import org.apache.druid.client.cache.MapCache;
import org.apache.druid.client.coordinator.NoopCoordinatorClient;
import org.apache.druid.client.indexing.NoopOverlordClient;
+import org.apache.druid.client.indexing.NoopRouterClient;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.data.input.impl.DimensionsSpec;
@@ -1013,6 +1014,7 @@ public void close()
new TestAppenderatorsManager(),
new NoopOverlordClient(),
new NoopCoordinatorClient(),
+ new NoopRouterClient(),
null,
null,
null,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
index e683c828ddc6..3d94fcda7a75 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
@@ -22,6 +22,7 @@
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.client.coordinator.NoopCoordinatorClient;
import org.apache.druid.client.indexing.NoopOverlordClient;
+import org.apache.druid.client.indexing.NoopRouterClient;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
@@ -131,6 +132,7 @@ public void setup() throws IOException
new TestAppenderatorsManager(),
new NoopOverlordClient(),
new NoopCoordinatorClient(),
+ new NoopRouterClient(),
null,
null,
null,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
index 31ec645b3fdd..c6333a44b14c 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
@@ -36,6 +36,7 @@
import org.apache.druid.client.cache.MapCache;
import org.apache.druid.client.coordinator.NoopCoordinatorClient;
import org.apache.druid.client.indexing.NoopOverlordClient;
+import org.apache.druid.client.indexing.NoopRouterClient;
import org.apache.druid.data.input.AbstractInputSource;
import org.apache.druid.data.input.Firehose;
import org.apache.druid.data.input.FirehoseFactory;
@@ -695,6 +696,7 @@ public void unannounceSegments(Iterable segments)
appenderatorsManager,
new NoopOverlordClient(),
new NoopCoordinatorClient(),
+ new NoopRouterClient(),
null,
null,
null,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskToolboxFactory.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskToolboxFactory.java
index 5c6afdbb61b1..fb8b7f8d42f3 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskToolboxFactory.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskToolboxFactory.java
@@ -27,6 +27,7 @@
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.client.coordinator.NoopCoordinatorClient;
import org.apache.druid.client.indexing.NoopOverlordClient;
+import org.apache.druid.client.indexing.NoopRouterClient;
import org.apache.druid.discovery.DataNodeService;
import org.apache.druid.discovery.DruidNodeAnnouncer;
import org.apache.druid.discovery.LookupNodeService;
@@ -45,6 +46,7 @@
import org.apache.druid.query.QueryProcessingPool;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.rpc.indexing.OverlordClient;
+import org.apache.druid.rpc.indexing.RouterClient;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMergerV9Factory;
import org.apache.druid.segment.TestHelper;
@@ -111,6 +113,7 @@ public TestTaskToolboxFactory(
bob.appenderatorsManager,
bob.overlordClient,
bob.coordinatorClient,
+ bob.routerClient,
bob.supervisorTaskClientProvider,
bob.shuffleClient,
bob.taskLogPusher,
@@ -154,6 +157,7 @@ public static class Builder
private AppenderatorsManager appenderatorsManager;
private OverlordClient overlordClient = new NoopOverlordClient();
private CoordinatorClient coordinatorClient = new NoopCoordinatorClient();
+ private RouterClient routerClient = new NoopRouterClient();
private ParallelIndexSupervisorTaskClientProvider supervisorTaskClientProvider;
private ShuffleClient shuffleClient;
private TaskLogPusher taskLogPusher;
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
index 6be23407a418..f4f9c0fe5eab 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java
@@ -36,6 +36,7 @@
import org.apache.druid.client.cache.MapCache;
import org.apache.druid.client.coordinator.NoopCoordinatorClient;
import org.apache.druid.client.indexing.NoopOverlordClient;
+import org.apache.druid.client.indexing.NoopRouterClient;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.impl.ByteEntity;
@@ -696,6 +697,7 @@ public void close()
new TestAppenderatorsManager(),
new NoopOverlordClient(),
new NoopCoordinatorClient(),
+ new NoopRouterClient(),
null,
null,
null,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java
index d36bf1c04bab..b949a8aa0264 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java
@@ -24,6 +24,7 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.Futures;
import org.apache.druid.client.coordinator.NoopCoordinatorClient;
+import org.apache.druid.client.indexing.NoopRouterClient;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
@@ -162,6 +163,7 @@ private WorkerTaskManager createWorkerTaskManager()
new TestAppenderatorsManager(),
overlordClient,
new NoopCoordinatorClient(),
+ new NoopRouterClient(),
null,
null,
null,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java
index aecfe29ab1fa..1357b05f72bb 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java
@@ -28,6 +28,7 @@
import org.apache.curator.test.TestingCluster;
import org.apache.druid.client.coordinator.NoopCoordinatorClient;
import org.apache.druid.client.indexing.NoopOverlordClient;
+import org.apache.druid.client.indexing.NoopRouterClient;
import org.apache.druid.curator.PotentiallyGzippedCompressionProvider;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexing.common.IndexingServiceCondition;
@@ -204,6 +205,7 @@ private WorkerTaskMonitor createTaskMonitor()
new TestAppenderatorsManager(),
new NoopOverlordClient(),
new NoopCoordinatorClient(),
+ new NoopRouterClient(),
null,
null,
null,
diff --git a/processing/src/main/java/org/apache/druid/java/util/common/granularity/GranularityType.java b/processing/src/main/java/org/apache/druid/java/util/common/granularity/GranularityType.java
index b4b78390605d..a68db432b93f 100644
--- a/processing/src/main/java/org/apache/druid/java/util/common/granularity/GranularityType.java
+++ b/processing/src/main/java/org/apache/druid/java/util/common/granularity/GranularityType.java
@@ -185,6 +185,17 @@ public static boolean isStandard(Granularity granularity)
return false;
}
+ public static GranularityType fromGranularity(Granularity granularity)
+ {
+ final GranularityType[] values = GranularityType.values();
+ for (GranularityType value : values) {
+ if (value.getDefaultGranularity().equals(granularity)) {
+ return value;
+ }
+ }
+ return null;
+ }
+
/**
* Note: This is only an estimate based on the values in period.
* This will not work for complicated periods that represent say 1 year 1 day
diff --git a/server/src/main/java/org/apache/druid/rpc/guice/ServiceClientModule.java b/server/src/main/java/org/apache/druid/rpc/guice/ServiceClientModule.java
index 51dd2b89d736..1882a52a66fd 100644
--- a/server/src/main/java/org/apache/druid/rpc/guice/ServiceClientModule.java
+++ b/server/src/main/java/org/apache/druid/rpc/guice/ServiceClientModule.java
@@ -42,6 +42,9 @@
import org.apache.druid.rpc.StandardRetryPolicy;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.rpc.indexing.OverlordClientImpl;
+import org.apache.druid.rpc.indexing.RouterClient;
+import org.apache.druid.rpc.indexing.RouterClientImpl;
+import org.apache.druid.server.router.Router;
import java.util.concurrent.ScheduledExecutorService;
@@ -117,4 +120,30 @@ public CoordinatorClient makeCoordinatorClient(
jsonMapper
);
}
+
+ @Provides
+ @ManageLifecycle
+ @Router
+ public ServiceLocator makeRouterServiceLocator(final DruidNodeDiscoveryProvider discoveryProvider)
+ {
+ return new DiscoveryServiceLocator(discoveryProvider, NodeRole.ROUTER);
+ }
+
+ @Provides
+ @LazySingleton
+ public RouterClient makeRouterClient(
+ @Json final ObjectMapper jsonMapper,
+ @EscalatedGlobal final ServiceClientFactory clientFactory,
+ @Router final ServiceLocator serviceLocator
+ )
+ {
+ return new RouterClientImpl(
+ clientFactory.makeClient(
+ NodeRole.ROUTER.getJsonName(),
+ serviceLocator,
+ StandardRetryPolicy.builder().maxAttempts(CLIENT_MAX_ATTEMPTS).build()
+ ),
+ jsonMapper
+ );
+ }
}
diff --git a/server/src/main/java/org/apache/druid/rpc/indexing/RouterClient.java b/server/src/main/java/org/apache/druid/rpc/indexing/RouterClient.java
new file mode 100644
index 000000000000..517c8db815f4
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/rpc/indexing/RouterClient.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.rpc.indexing;
+
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+/**
+ * High-level Router client.
+ *
+ */
+public interface RouterClient
+{
+ /**
+ * Run a given query on a router
+ *
+ * @return
+ */
+ ListenableFuture runQuery(Object query);
+}
diff --git a/server/src/main/java/org/apache/druid/rpc/indexing/RouterClientImpl.java b/server/src/main/java/org/apache/druid/rpc/indexing/RouterClientImpl.java
new file mode 100644
index 000000000000..5fde61be648b
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/rpc/indexing/RouterClientImpl.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.rpc.indexing;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.common.guava.FutureUtils;
+import org.apache.druid.java.util.http.client.response.BytesFullResponseHandler;
+import org.apache.druid.rpc.RequestBuilder;
+import org.apache.druid.rpc.ServiceClient;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+
+/**
+ * Production implementation of {@link RouterClient}.
+ */
+public class RouterClientImpl implements RouterClient
+{
+ private final ServiceClient client;
+ private final ObjectMapper jsonMapper;
+
+ public RouterClientImpl(final ServiceClient client, final ObjectMapper jsonMapper)
+ {
+ this.client = Preconditions.checkNotNull(client, "client");
+ this.jsonMapper = Preconditions.checkNotNull(jsonMapper, "jsonMapper");
+ }
+
+ @Override
+ public ListenableFuture runQuery(final Object query)
+ {
+ return FutureUtils.transform(
+ client.asyncRequest(
+ new RequestBuilder(HttpMethod.POST, "/druid/v2/sql/task")
+ .jsonContent(jsonMapper, query),
+ new BytesFullResponseHandler()
+ ),
+ holder -> null
+ );
+ }
+}
diff --git a/server/src/test/java/org/apache/druid/client/indexing/NoopRouterClient.java b/server/src/test/java/org/apache/druid/client/indexing/NoopRouterClient.java
new file mode 100644
index 000000000000..35faf98eca51
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/client/indexing/NoopRouterClient.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.client.indexing;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.rpc.indexing.RouterClient;
+
+public class NoopRouterClient implements RouterClient
+{
+ @Override
+ public ListenableFuture runQuery(Object query)
+ {
+ throw new UnsupportedOperationException();
+ }
+}