diff --git a/docs/api-reference/sql-ingestion-api.md b/docs/api-reference/sql-ingestion-api.md index 3daadfa5085f..5492c3ea46dd 100644 --- a/docs/api-reference/sql-ingestion-api.md +++ b/docs/api-reference/sql-ingestion-api.md @@ -603,6 +603,11 @@ The following table describes the response fields when you retrieve a report for | `multiStageQuery.payload.status.status` | RUNNING, SUCCESS, or FAILED. | | `multiStageQuery.payload.status.startTime` | Start time of the query in ISO format. Only present if the query has started running. | | `multiStageQuery.payload.status.durationMs` | Milliseconds elapsed after the query has started running. -1 denotes that the query hasn't started running yet. | +| `multiStageQuery.payload.status.workers` | Workers for the controller task.| +| `multiStageQuery.payload.status.workers.` | Array of worker tasks including retries. | +| `multiStageQuery.payload.status.workers.[].workerId` | Id of the worker task.| | +| `multiStageQuery.payload.status.workers.[].status` | RUNNING, SUCCESS, or FAILED.| +| `multiStageQuery.payload.status.workers.[].durationMs` | Milliseconds elapsed after the worker task started running. It is -1 for worker tasks with status RUNNING.| | `multiStageQuery.payload.status.pendingTasks` | Number of tasks that are not fully started. -1 denotes that the number is currently unknown. | | `multiStageQuery.payload.status.runningTasks` | Number of currently running tasks. Should be at least 1 since the controller is included. | | `multiStageQuery.payload.status.segmentLoadStatus` | Segment loading container. Only present after the segments have been published. | diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index c108c7d679e9..362e62462f6d 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -2221,11 +2221,13 @@ private static MSQStatusReport makeStatusReport( { int pendingTasks = -1; int runningTasks = 1; + Map> workerStatsMap = new HashMap<>(); if (taskLauncher != null) { WorkerCount workerTaskCount = taskLauncher.getWorkerTaskCount(); pendingTasks = workerTaskCount.getPendingWorkerCount(); runningTasks = workerTaskCount.getRunningWorkerCount() + 1; // To account for controller. + workerStatsMap = taskLauncher.getWorkerStats(); } SegmentLoadStatusFetcher.SegmentLoadWaiterStatus status = segmentLoadWaiter == null ? null : segmentLoadWaiter.status(); @@ -2236,6 +2238,7 @@ private static MSQStatusReport makeStatusReport( errorReports, queryStartTime, queryDuration, + workerStatsMap, pendingTasks, runningTasks, status diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java index dcc81d868644..c2092e7f24ac 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java @@ -19,6 +19,7 @@ package org.apache.druid.msq.indexing; +import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; @@ -47,15 +48,18 @@ import java.time.Duration; import java.util.ArrayList; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.OptionalLong; import java.util.Set; +import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -108,10 +112,11 @@ private enum State @GuardedBy("taskIds") private final IntSet fullyStartedTasks = new IntOpenHashSet(); - // Mutable state accessible only to the main loop. LinkedHashMap since order of key set matters. Tasks are added - // here once they are submitted for running, but before they are fully started up. + // Mutable state accessed by mainLoop, ControllerImpl, and jetty (/liveReports) threads. + // Tasks are added here once they are submitted for running, but before they are fully started up. // taskId -> taskTracker - private final Map taskTrackers = new LinkedHashMap<>(); + private final ConcurrentMap taskTrackers = new ConcurrentSkipListMap<>(Comparator.comparingInt( + MSQTasks::workerFromTaskId)); // Set of tasks which are issued a cancel request by the controller. private final Set canceledWorkerTasks = ConcurrentHashMap.newKeySet(); @@ -348,6 +353,70 @@ public boolean isTaskLatest(String taskId) } } + public static class WorkerStats + { + String workerId; + TaskState state; + long duration; + + /** + * For JSON deserialization only + */ + public WorkerStats() + { + } + + public WorkerStats(String workerId, TaskState state, long duration) + { + this.workerId = workerId; + this.state = state; + this.duration = duration; + } + + @JsonProperty + public String getWorkerId() + { + return workerId; + } + + @JsonProperty + public TaskState getState() + { + return state; + } + + @JsonProperty("durationMs") + public long getDuration() + { + return duration; + } + } + + public Map> getWorkerStats() + { + final Map> workerStats = new TreeMap<>(); + + for (Map.Entry taskEntry : taskTrackers.entrySet()) { + + TaskTracker taskTracker = taskEntry.getValue(); + + workerStats.computeIfAbsent(taskTracker.workerNumber, k -> new ArrayList<>()) + .add(new WorkerStats(taskEntry.getKey(), + taskTracker.status.getStatusCode(), + // getDuration() returns -1 for running tasks. + // It's not calculated on-the-fly here since + // taskTracker.startTimeMillis marks task + // submission time rather than the actual start. + taskTracker.status.getDuration() + )); + } + + for (List workerStatsList : workerStats.values()) { + workerStatsList.sort(Comparator.comparing(WorkerStats::getWorkerId)); + } + return workerStats; + } + private void mainLoop() { try { diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/report/MSQStatusReport.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/report/MSQStatusReport.java index d3864498349d..1dd7d6589031 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/report/MSQStatusReport.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/report/MSQStatusReport.java @@ -25,12 +25,15 @@ import com.google.common.base.Preconditions; import org.apache.druid.indexer.TaskState; import org.apache.druid.msq.exec.SegmentLoadStatusFetcher; +import org.apache.druid.msq.indexing.MSQWorkerTaskLauncher; import org.apache.druid.msq.indexing.error.MSQErrorReport; import org.joda.time.DateTime; import javax.annotation.Nullable; import java.util.Collection; import java.util.Collections; +import java.util.List; +import java.util.Map; import java.util.Objects; public class MSQStatusReport @@ -47,6 +50,8 @@ public class MSQStatusReport private final long durationMs; + private final Map> workerStats; + private final int pendingTasks; private final int runningTasks; @@ -61,6 +66,7 @@ public MSQStatusReport( @JsonProperty("warnings") Collection warningReports, @JsonProperty("startTime") @Nullable DateTime startTime, @JsonProperty("durationMs") long durationMs, + @JsonProperty("workers") Map> workerStats, @JsonProperty("pendingTasks") int pendingTasks, @JsonProperty("runningTasks") int runningTasks, @JsonProperty("segmentLoadWaiterStatus") @Nullable SegmentLoadStatusFetcher.SegmentLoadWaiterStatus segmentLoadWaiterStatus @@ -71,6 +77,7 @@ public MSQStatusReport( this.warningReports = warningReports != null ? warningReports : Collections.emptyList(); this.startTime = startTime; this.durationMs = durationMs; + this.workerStats = workerStats; this.pendingTasks = pendingTasks; this.runningTasks = runningTasks; this.segmentLoadWaiterStatus = segmentLoadWaiterStatus; @@ -123,6 +130,12 @@ public long getDurationMs() return durationMs; } + @JsonProperty("workers") + public Map> getWorkerStats() + { + return workerStats; + } + @Nullable @JsonProperty @JsonInclude(JsonInclude.Include.NON_NULL) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/report/MSQTaskReportTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/report/MSQTaskReportTest.java index 3b49572996ec..4bc3d1075c10 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/report/MSQTaskReportTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/report/MSQTaskReportTest.java @@ -55,6 +55,7 @@ import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -107,7 +108,7 @@ public void testSerdeResultsReport() throws Exception final MSQTaskReport report = new MSQTaskReport( TASK_ID, new MSQTaskReportPayload( - new MSQStatusReport(TaskState.SUCCESS, null, new ArrayDeque<>(), null, 0, 1, 2, status), + new MSQStatusReport(TaskState.SUCCESS, null, new ArrayDeque<>(), null, 0, new HashMap<>(), 1, 2, status), MSQStagesReport.create( QUERY_DEFINITION, ImmutableMap.of(), @@ -172,7 +173,7 @@ public void testSerdeErrorReport() throws Exception final MSQTaskReport report = new MSQTaskReport( TASK_ID, new MSQTaskReportPayload( - new MSQStatusReport(TaskState.FAILED, errorReport, new ArrayDeque<>(), null, 0, 1, 2, status), + new MSQStatusReport(TaskState.FAILED, errorReport, new ArrayDeque<>(), null, 0, new HashMap<>(), 1, 2, status), MSQStagesReport.create( QUERY_DEFINITION, ImmutableMap.of(), @@ -220,7 +221,7 @@ public void testWriteTaskReport() throws Exception final MSQTaskReport report = new MSQTaskReport( TASK_ID, new MSQTaskReportPayload( - new MSQStatusReport(TaskState.SUCCESS, null, new ArrayDeque<>(), null, 0, 1, 2, status), + new MSQStatusReport(TaskState.SUCCESS, null, new ArrayDeque<>(), null, 0, new HashMap<>(), 1, 2, status), MSQStagesReport.create( QUERY_DEFINITION, ImmutableMap.of(), diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java index d6572801207f..e3995a4a96c8 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java @@ -244,6 +244,7 @@ public class SqlStatementResourceTest extends MSQTestBase new ArrayDeque<>(), null, 0, + new HashMap<>(), 1, 2, null @@ -310,6 +311,7 @@ public class SqlStatementResourceTest extends MSQTestBase new ArrayDeque<>(), null, 0, + new HashMap<>(), 1, 2, null diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/SqlStatementResourceHelperTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/SqlStatementResourceHelperTest.java index 806bd8ebe988..0254a61a2c71 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/SqlStatementResourceHelperTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/SqlStatementResourceHelperTest.java @@ -67,6 +67,7 @@ public void testDistinctPartitionsOnEachWorker() new ArrayDeque<>(), null, 0, + new HashMap<>(), 1, 2, null @@ -105,6 +106,7 @@ public void testOnePartitionOnEachWorker() new ArrayDeque<>(), null, 0, + new HashMap<>(), 1, 2, null @@ -144,6 +146,7 @@ public void testCommonPartitionsOnEachWorker() new ArrayDeque<>(), null, 0, + new HashMap<>(), 1, 2, null @@ -181,6 +184,7 @@ public void testNullChannelCounters() new ArrayDeque<>(), null, 0, + new HashMap<>(), 1, 2, null @@ -220,6 +224,7 @@ public void testConsecutivePartitionsOnEachWorker() new ArrayDeque<>(), null, 0, + new HashMap<>(), 1, 2, null