Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add worker status and duration metrics in live and task reports #15180

Merged
85 changes: 45 additions & 40 deletions docs/api-reference/sql-ingestion-api.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -2221,11 +2221,13 @@ private static MSQStatusReport makeStatusReport(
{
int pendingTasks = -1;
int runningTasks = 1;
Map<Integer, List<MSQWorkerTaskLauncher.WorkerStats>> workerStatsMap = new HashMap<>();

if (taskLauncher != null) {
WorkerCount workerTaskCount = taskLauncher.getWorkerTaskCount();
pendingTasks = workerTaskCount.getPendingWorkerCount();
runningTasks = workerTaskCount.getRunningWorkerCount() + 1; // To account for controller.
workerStatsMap = taskLauncher.getWorkerStats();
}

SegmentLoadStatusFetcher.SegmentLoadWaiterStatus status = segmentLoadWaiter == null ? null : segmentLoadWaiter.status();
Expand All @@ -2236,6 +2238,7 @@ private static MSQStatusReport makeStatusReport(
errorReports,
queryStartTime,
queryDuration,
workerStatsMap,
pendingTasks,
runningTasks,
status
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.druid.msq.indexing;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
Expand Down Expand Up @@ -47,6 +48,8 @@

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
Expand All @@ -55,6 +58,7 @@
import java.util.Map;
import java.util.OptionalLong;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -111,7 +115,7 @@ private enum State
// Mutable state accessible only to the main loop. LinkedHashMap since order of key set matters. Tasks are added
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets update the comments as well.

// here once they are submitted for running, but before they are fully started up.
// taskId -> taskTracker
private final Map<String, TaskTracker> taskTrackers = new LinkedHashMap<>();
private final Map<String, TaskTracker> taskTrackers = Collections.synchronizedMap(new LinkedHashMap<>());

// Set of tasks which are issued a cancel request by the controller.
private final Set<String> canceledWorkerTasks = ConcurrentHashMap.newKeySet();
Expand Down Expand Up @@ -348,6 +352,66 @@ public boolean isTaskLatest(String taskId)
}
}

public static class WorkerStats
{
String workerId;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets mark these field final ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't - because of the default constructor.

TaskState state;
long duration;

/**
* For JSON deserialization only
*/
public WorkerStats()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We actually want serialization only rite ? so this method can go no ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Many tests in SQLMSQStatementResourceTypeTest fail as they require deserialization -- such as testReplaceAll, testWithDurableStorage, and testResultFormatWithParamInSelect

{
}

public WorkerStats(String workerId, TaskState state, long duration)
{
this.workerId = workerId;
this.state = state;
this.duration = duration;
}

@JsonProperty
public String getWorkerId()
{
return workerId;
}

@JsonProperty
public TaskState getState()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also document these properties in docs/api-reference/sql-ingestion-api.md

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added these properties.

{
return state;
}

@JsonProperty("durationMs")
public long getDuration()
{
return duration;
}
}

public Map<Integer, List<WorkerStats>> getWorkerStats()
{
final Map<Integer, List<WorkerStats>> workerStats = new TreeMap<>();

for (Map.Entry<String, TaskTracker> taskEntry : taskTrackers.entrySet()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private final Map<String, TaskTracker> taskTrackers = new LinkedHashMap<>();
was only called from the main worker loop until now.
WIth this change, the taskTrackers can be called from the jetty thread or the main controller impl thread.

So we should either make TaskTracker thread safe

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! Now wrapped in Collections.synchronizedMap(). Believe this won't have any significant impact on performance.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets use a concurrentHashMap ?

Returns a synchronized (thread-safe) map backed by the specified map. In order to guarantee serial access, it is critical that all access to the backing map is accomplished through the returned map.
It is imperative that the user manually synchronize on the returned map when traversing any of its collection views via Iterator, Spliterator or Stream:
   Map m = Collections.synchronizedMap(new HashMap());
       ...
   Set s = m.keySet();  // Needn't be in synchronized block
       ...
   synchronized (m) {  // Synchronizing on m, not s!
       Iterator i = s.iterator(); // Must be in synchronized block
       while (i.hasNext())
           foo(i.next());
   }
  

Since its every easy to use s.iteratator() no ?


TaskTracker taskTracker = taskEntry.getValue();

workerStats.computeIfAbsent(taskTracker.workerNumber, k -> new ArrayList<>())
.add(new WorkerStats(taskEntry.getKey(),
taskTracker.status.getStatusCode(),
taskTracker.status.getDuration()
));
}

for (List<WorkerStats> workerStatsList : workerStats.values()) {
workerStatsList.sort(Comparator.comparing(WorkerStats::getWorkerId));
}
return workerStats;
}

private void mainLoop()
{
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,15 @@
import com.google.common.base.Preconditions;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.msq.exec.SegmentLoadStatusFetcher;
import org.apache.druid.msq.indexing.MSQWorkerTaskLauncher;
import org.apache.druid.msq.indexing.error.MSQErrorReport;
import org.joda.time.DateTime;

import javax.annotation.Nullable;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;

public class MSQStatusReport
Expand All @@ -47,6 +50,8 @@ public class MSQStatusReport

private final long durationMs;

private final Map<Integer, List<MSQWorkerTaskLauncher.WorkerStats>> workerStats;

private final int pendingTasks;

private final int runningTasks;
Expand All @@ -61,6 +66,7 @@ public MSQStatusReport(
@JsonProperty("warnings") Collection<MSQErrorReport> warningReports,
@JsonProperty("startTime") @Nullable DateTime startTime,
@JsonProperty("durationMs") long durationMs,
@JsonProperty("workers") Map<Integer, List<MSQWorkerTaskLauncher.WorkerStats>> workerStats,
@JsonProperty("pendingTasks") int pendingTasks,
@JsonProperty("runningTasks") int runningTasks,
@JsonProperty("segmentLoadWaiterStatus") @Nullable SegmentLoadStatusFetcher.SegmentLoadWaiterStatus segmentLoadWaiterStatus
Expand All @@ -71,6 +77,7 @@ public MSQStatusReport(
this.warningReports = warningReports != null ? warningReports : Collections.emptyList();
this.startTime = startTime;
this.durationMs = durationMs;
this.workerStats = workerStats;
this.pendingTasks = pendingTasks;
this.runningTasks = runningTasks;
this.segmentLoadWaiterStatus = segmentLoadWaiterStatus;
Expand Down Expand Up @@ -123,6 +130,12 @@ public long getDurationMs()
return durationMs;
}

@JsonProperty("workers")
public Map<Integer, List<MSQWorkerTaskLauncher.WorkerStats>> getWorkerStats()
{
return workerStats;
}

@Nullable
@JsonProperty
@JsonInclude(JsonInclude.Include.NON_NULL)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -107,7 +108,7 @@ public void testSerdeResultsReport() throws Exception
final MSQTaskReport report = new MSQTaskReport(
TASK_ID,
new MSQTaskReportPayload(
new MSQStatusReport(TaskState.SUCCESS, null, new ArrayDeque<>(), null, 0, 1, 2, status),
new MSQStatusReport(TaskState.SUCCESS, null, new ArrayDeque<>(), null, 0, new HashMap<>(), 1, 2, status),
MSQStagesReport.create(
QUERY_DEFINITION,
ImmutableMap.of(),
Expand Down Expand Up @@ -172,7 +173,7 @@ public void testSerdeErrorReport() throws Exception
final MSQTaskReport report = new MSQTaskReport(
TASK_ID,
new MSQTaskReportPayload(
new MSQStatusReport(TaskState.FAILED, errorReport, new ArrayDeque<>(), null, 0, 1, 2, status),
new MSQStatusReport(TaskState.FAILED, errorReport, new ArrayDeque<>(), null, 0, new HashMap<>(), 1, 2, status),
MSQStagesReport.create(
QUERY_DEFINITION,
ImmutableMap.of(),
Expand Down Expand Up @@ -220,7 +221,7 @@ public void testWriteTaskReport() throws Exception
final MSQTaskReport report = new MSQTaskReport(
TASK_ID,
new MSQTaskReportPayload(
new MSQStatusReport(TaskState.SUCCESS, null, new ArrayDeque<>(), null, 0, 1, 2, status),
new MSQStatusReport(TaskState.SUCCESS, null, new ArrayDeque<>(), null, 0, new HashMap<>(), 1, 2, status),
MSQStagesReport.create(
QUERY_DEFINITION,
ImmutableMap.of(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ public class SqlStatementResourceTest extends MSQTestBase
new ArrayDeque<>(),
null,
0,
new HashMap<>(),
1,
2,
null
Expand Down Expand Up @@ -310,6 +311,7 @@ public class SqlStatementResourceTest extends MSQTestBase
new ArrayDeque<>(),
null,
0,
new HashMap<>(),
1,
2,
null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public void testDistinctPartitionsOnEachWorker()
new ArrayDeque<>(),
null,
0,
new HashMap<>(),
1,
2,
null
Expand Down Expand Up @@ -105,6 +106,7 @@ public void testOnePartitionOnEachWorker()
new ArrayDeque<>(),
null,
0,
new HashMap<>(),
1,
2,
null
Expand Down Expand Up @@ -144,6 +146,7 @@ public void testCommonPartitionsOnEachWorker()
new ArrayDeque<>(),
null,
0,
new HashMap<>(),
1,
2,
null
Expand Down Expand Up @@ -181,6 +184,7 @@ public void testNullChannelCounters()
new ArrayDeque<>(),
null,
0,
new HashMap<>(),
1,
2,
null
Expand Down Expand Up @@ -220,6 +224,7 @@ public void testConsecutivePartitionsOnEachWorker()
new ArrayDeque<>(),
null,
0,
new HashMap<>(),
1,
2,
null
Expand Down