-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add worker status and duration metrics in live and task reports #15180
Changes from 5 commits
613023c
3a23d03
4804c2a
d6dc8d9
50a0eb7
2a8024b
ea490fd
23497f3
e98c4f6
7381596
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,6 +19,7 @@ | |
|
||
package org.apache.druid.msq.indexing; | ||
|
||
import com.fasterxml.jackson.annotation.JsonProperty; | ||
import com.google.common.collect.ImmutableList; | ||
import com.google.common.util.concurrent.ListenableFuture; | ||
import com.google.common.util.concurrent.SettableFuture; | ||
|
@@ -47,6 +48,7 @@ | |
|
||
import java.time.Duration; | ||
import java.util.ArrayList; | ||
import java.util.Comparator; | ||
import java.util.HashMap; | ||
import java.util.HashSet; | ||
import java.util.Iterator; | ||
|
@@ -55,6 +57,7 @@ | |
import java.util.Map; | ||
import java.util.OptionalLong; | ||
import java.util.Set; | ||
import java.util.TreeMap; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.concurrent.ExecutorService; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
|
@@ -348,6 +351,68 @@ public boolean isTaskLatest(String taskId) | |
} | ||
} | ||
|
||
public static class WorkerStats | ||
{ | ||
String workerId; | ||
TaskState state; | ||
long duration; | ||
|
||
/** | ||
* For JSON deserialization only | ||
*/ | ||
public WorkerStats() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We actually want serialization only rite ? so this method can go no ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Many tests in SQLMSQStatementResourceTypeTest fail as they require deserialization -- such as |
||
{ | ||
} | ||
|
||
public WorkerStats(String workerId, TaskState state, long duration) | ||
{ | ||
this.workerId = workerId; | ||
this.state = state; | ||
this.duration = duration; | ||
} | ||
|
||
@JsonProperty() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit is the () required ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removed. Thanks! |
||
public String getWorkerId() | ||
{ | ||
return workerId; | ||
} | ||
|
||
@JsonProperty() | ||
public TaskState getState() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should also document these properties in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added these properties. |
||
{ | ||
return state; | ||
} | ||
|
||
@JsonProperty("durationMs") | ||
public long getDuration() | ||
{ | ||
return duration; | ||
} | ||
} | ||
|
||
public Map<Integer, List<WorkerStats>> getWorkerStats() | ||
{ | ||
final Map<Integer, List<WorkerStats>> workerStats = new TreeMap<>(); | ||
|
||
for (Map.Entry<String, TaskTracker> taskEntry : taskTrackers.entrySet()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
So we should either make TaskTracker thread safe There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch! Now wrapped in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Lets use a concurrentHashMap ?
Since its every easy to use s.iteratator() no ? |
||
|
||
TaskTracker taskTracker = taskEntry.getValue(); | ||
|
||
long duration = (taskTracker.status.getDuration() == -1 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it should be okay to remove the -1 duration check and always report taskTracker.status.getDuration(). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The reason was that duration is always There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. MSQ tracks the start time as the time it requested to launch the job. I currently do not know if duration counter in the overlord is started as soon as the overlord gets the request or when ever that task goes into running state. To test it what you could do is
Check if the taskDuration in the report is going backward. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Did the check both from a run and in code and found it to be correct: the start time in MSQ is recorded when the task is submitted whereas in overlord is upon start of the run. Currently There is no such issue with reporting of query's duration periodically in live reports since it's a single hop from (from overlord to indexer) and the query's start time is maintained inside the controller -- so the duration can be calculated on-the-fly. Since we are more interested in timings of finished workers, I think for now it is fine to just report the duration as There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for digging in. Reporting -1 for running tasks SGTM. |
||
&& taskTracker.status.getStatusCode() == TaskState.RUNNING) | ||
? System.currentTimeMillis() - taskTracker.startTimeMillis | ||
: taskTracker.status.getDuration(); | ||
|
||
workerStats.computeIfAbsent(taskTracker.workerNumber, k -> new ArrayList<>()) | ||
.add(new WorkerStats(taskEntry.getKey(), taskTracker.status.getStatusCode(), duration)); | ||
} | ||
|
||
for (List<WorkerStats> workerStatsList : workerStats.values()) { | ||
workerStatsList.sort(Comparator.comparing(WorkerStats::getWorkerId)); | ||
} | ||
return workerStats; | ||
} | ||
|
||
private void mainLoop() | ||
{ | ||
try { | ||
|
@@ -823,5 +888,6 @@ public boolean isRetrying() | |
{ | ||
return isRetrying; | ||
} | ||
|
||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets mark these field final ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't - because of the default constructor.