Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add worker status and duration metrics in live and task reports #15180

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -2221,11 +2221,13 @@ private static MSQStatusReport makeStatusReport(
{
int pendingTasks = -1;
int runningTasks = 1;
Map<Integer, List<MSQWorkerTaskLauncher.WorkerStats>> workerStatsMap = new HashMap<>();

if (taskLauncher != null) {
WorkerCount workerTaskCount = taskLauncher.getWorkerTaskCount();
pendingTasks = workerTaskCount.getPendingWorkerCount();
runningTasks = workerTaskCount.getRunningWorkerCount() + 1; // To account for controller.
workerStatsMap = taskLauncher.getWorkerStats();
}

SegmentLoadStatusFetcher.SegmentLoadWaiterStatus status = segmentLoadWaiter == null ? null : segmentLoadWaiter.status();
Expand All @@ -2236,6 +2238,7 @@ private static MSQStatusReport makeStatusReport(
errorReports,
queryStartTime,
queryDuration,
workerStatsMap,
pendingTasks,
runningTasks,
status
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.druid.msq.indexing;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
Expand Down Expand Up @@ -47,6 +48,7 @@

import java.time.Duration;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
Expand All @@ -55,6 +57,7 @@
import java.util.Map;
import java.util.OptionalLong;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -348,6 +351,68 @@ public boolean isTaskLatest(String taskId)
}
}

public static class WorkerStats
{
String workerId;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets mark these field final ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't - because of the default constructor.

TaskState state;
long duration;

/**
* For JSON deserialization only
*/
public WorkerStats()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We actually want serialization only rite ? so this method can go no ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Many tests in SQLMSQStatementResourceTypeTest fail as they require deserialization -- such as testReplaceAll, testWithDurableStorage, and testResultFormatWithParamInSelect

{
}

public WorkerStats(String workerId, TaskState state, long duration)
{
this.workerId = workerId;
this.state = state;
this.duration = duration;
}

@JsonProperty()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit is the () required ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed. Thanks!

public String getWorkerId()
{
return workerId;
}

@JsonProperty()
public TaskState getState()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also document these properties in docs/api-reference/sql-ingestion-api.md

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added these properties.

{
return state;
}

@JsonProperty("durationMs")
public long getDuration()
{
return duration;
}
}

public Map<Integer, List<WorkerStats>> getWorkerStats()
{
final Map<Integer, List<WorkerStats>> workerStats = new TreeMap<>();

for (Map.Entry<String, TaskTracker> taskEntry : taskTrackers.entrySet()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private final Map<String, TaskTracker> taskTrackers = new LinkedHashMap<>();
was only called from the main worker loop until now.
WIth this change, the taskTrackers can be called from the jetty thread or the main controller impl thread.

So we should either make TaskTracker thread safe

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! Now wrapped in Collections.synchronizedMap(). Believe this won't have any significant impact on performance.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets use a concurrentHashMap ?

Returns a synchronized (thread-safe) map backed by the specified map. In order to guarantee serial access, it is critical that all access to the backing map is accomplished through the returned map.
It is imperative that the user manually synchronize on the returned map when traversing any of its collection views via Iterator, Spliterator or Stream:
   Map m = Collections.synchronizedMap(new HashMap());
       ...
   Set s = m.keySet();  // Needn't be in synchronized block
       ...
   synchronized (m) {  // Synchronizing on m, not s!
       Iterator i = s.iterator(); // Must be in synchronized block
       while (i.hasNext())
           foo(i.next());
   }
  

Since its every easy to use s.iteratator() no ?


TaskTracker taskTracker = taskEntry.getValue();

long duration = (taskTracker.status.getDuration() == -1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should be okay to remove the -1 duration check and always report taskTracker.status.getDuration().
We always rely on the overlord system to gives us the task duration without changing anything.
wdyt ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason was that duration is always -1 for RUNNING workers (since it gets updated only upon completion), so publishing their duration wouldn't ever be useful.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MSQ tracks the start time as the time it requested to launch the job. I currently do not know if duration counter in the overlord is started as soon as the overlord gets the request or when ever that task goes into running state.

To test it what you could do is

  1. Make changes to the MSQWorkerTaskLauncher to log the report every time liveReports is called.
  2. Start a cluster with lets say 4 task slots.
  3. Use 2 slots for other ingestions.
  4. Schedule an MSQ job with 4 slots. (The MSQ job will wait and the worker task launcher will set the start time for each task ).
  5. Kill the job running on the other 2 slots.

Check if the taskDuration in the report is going backward.

Copy link
Contributor Author

@gargvishesh gargvishesh Oct 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did the check both from a run and in code and found it to be correct: the start time in MSQ is recorded when the task is submitted whereas in overlord is upon start of the run.

Currently TaskStatus doesn't have any field to record a startTime, so for MSQ to get a worker's startTime from the Overlord, this field needs to be added and persisted in the database upon the worker task's start.

There is no such issue with reporting of query's duration periodically in live reports since it's a single hop from (from overlord to indexer) and the query's start time is maintained inside the controller -- so the duration can be calculated on-the-fly.

Since we are more interested in timings of finished workers, I think for now it is fine to just report the duration as -1 for worker tasks instead of adding a new field in TaskStatus which is used at multiple places. But do let me know if you think we should take the route of adding a new field.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for digging in. Reporting -1 for running tasks SGTM.

&& taskTracker.status.getStatusCode() == TaskState.RUNNING)
? System.currentTimeMillis() - taskTracker.startTimeMillis
: taskTracker.status.getDuration();

workerStats.computeIfAbsent(taskTracker.workerNumber, k -> new ArrayList<>())
.add(new WorkerStats(taskEntry.getKey(), taskTracker.status.getStatusCode(), duration));
}

for (List<WorkerStats> workerStatsList : workerStats.values()) {
workerStatsList.sort(Comparator.comparing(WorkerStats::getWorkerId));
}
return workerStats;
}

private void mainLoop()
{
try {
Expand Down Expand Up @@ -823,5 +888,6 @@ public boolean isRetrying()
{
return isRetrying;
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,15 @@
import com.google.common.base.Preconditions;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.msq.exec.SegmentLoadStatusFetcher;
import org.apache.druid.msq.indexing.MSQWorkerTaskLauncher;
import org.apache.druid.msq.indexing.error.MSQErrorReport;
import org.joda.time.DateTime;

import javax.annotation.Nullable;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;

public class MSQStatusReport
Expand All @@ -47,6 +50,8 @@ public class MSQStatusReport

private final long durationMs;

private final Map<Integer, List<MSQWorkerTaskLauncher.WorkerStats>> workerStats;

private final int pendingTasks;

private final int runningTasks;
Expand All @@ -61,6 +66,7 @@ public MSQStatusReport(
@JsonProperty("warnings") Collection<MSQErrorReport> warningReports,
@JsonProperty("startTime") @Nullable DateTime startTime,
@JsonProperty("durationMs") long durationMs,
@JsonProperty("workers") Map<Integer, List<MSQWorkerTaskLauncher.WorkerStats>> workerStats,
@JsonProperty("pendingTasks") int pendingTasks,
@JsonProperty("runningTasks") int runningTasks,
@JsonProperty("segmentLoadWaiterStatus") @Nullable SegmentLoadStatusFetcher.SegmentLoadWaiterStatus segmentLoadWaiterStatus
Expand All @@ -71,6 +77,7 @@ public MSQStatusReport(
this.warningReports = warningReports != null ? warningReports : Collections.emptyList();
this.startTime = startTime;
this.durationMs = durationMs;
this.workerStats = workerStats;
this.pendingTasks = pendingTasks;
this.runningTasks = runningTasks;
this.segmentLoadWaiterStatus = segmentLoadWaiterStatus;
Expand Down Expand Up @@ -123,6 +130,12 @@ public long getDurationMs()
return durationMs;
}

@JsonProperty("workers")
public Map<Integer, List<MSQWorkerTaskLauncher.WorkerStats>> getWorkersDurationsMs()
{
return workerStats;
}

@Nullable
@JsonProperty
@JsonInclude(JsonInclude.Include.NON_NULL)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -107,7 +108,7 @@ public void testSerdeResultsReport() throws Exception
final MSQTaskReport report = new MSQTaskReport(
TASK_ID,
new MSQTaskReportPayload(
new MSQStatusReport(TaskState.SUCCESS, null, new ArrayDeque<>(), null, 0, 1, 2, status),
new MSQStatusReport(TaskState.SUCCESS, null, new ArrayDeque<>(), null, 0, new HashMap<>(), 1, 2, status),
MSQStagesReport.create(
QUERY_DEFINITION,
ImmutableMap.of(),
Expand Down Expand Up @@ -172,7 +173,7 @@ public void testSerdeErrorReport() throws Exception
final MSQTaskReport report = new MSQTaskReport(
TASK_ID,
new MSQTaskReportPayload(
new MSQStatusReport(TaskState.FAILED, errorReport, new ArrayDeque<>(), null, 0, 1, 2, status),
new MSQStatusReport(TaskState.FAILED, errorReport, new ArrayDeque<>(), null, 0, new HashMap<>(), 1, 2, status),
MSQStagesReport.create(
QUERY_DEFINITION,
ImmutableMap.of(),
Expand Down Expand Up @@ -220,7 +221,7 @@ public void testWriteTaskReport() throws Exception
final MSQTaskReport report = new MSQTaskReport(
TASK_ID,
new MSQTaskReportPayload(
new MSQStatusReport(TaskState.SUCCESS, null, new ArrayDeque<>(), null, 0, 1, 2, status),
new MSQStatusReport(TaskState.SUCCESS, null, new ArrayDeque<>(), null, 0, new HashMap<>(), 1, 2, status),
MSQStagesReport.create(
QUERY_DEFINITION,
ImmutableMap.of(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ public class SqlStatementResourceTest extends MSQTestBase
new ArrayDeque<>(),
null,
0,
new HashMap<>(),
1,
2,
null
Expand Down Expand Up @@ -310,6 +311,7 @@ public class SqlStatementResourceTest extends MSQTestBase
new ArrayDeque<>(),
null,
0,
new HashMap<>(),
1,
2,
null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public void testDistinctPartitionsOnEachWorker()
new ArrayDeque<>(),
null,
0,
new HashMap<>(),
1,
2,
null
Expand Down Expand Up @@ -105,6 +106,7 @@ public void testOnePartitionOnEachWorker()
new ArrayDeque<>(),
null,
0,
new HashMap<>(),
1,
2,
null
Expand Down Expand Up @@ -144,6 +146,7 @@ public void testCommonPartitionsOnEachWorker()
new ArrayDeque<>(),
null,
0,
new HashMap<>(),
1,
2,
null
Expand Down Expand Up @@ -181,6 +184,7 @@ public void testNullChannelCounters()
new ArrayDeque<>(),
null,
0,
new HashMap<>(),
1,
2,
null
Expand Down Expand Up @@ -220,6 +224,7 @@ public void testConsecutivePartitionsOnEachWorker()
new ArrayDeque<>(),
null,
0,
new HashMap<>(),
1,
2,
null
Expand Down
Loading