Skip to content

Commit

Permalink
Modify the config
Browse files Browse the repository at this point in the history
Signed-off-by: zhangjunfan <junfan.zhang@outlook.com>
  • Loading branch information
zuston committed Nov 28, 2021
1 parent ce9876f commit 6eb393e
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,10 @@ class AM implements Framework.ApplicationMasterAdapter {
private long runtimeInitialTime = System.currentTimeMillis();

// Group dependencies policy.
Map<String, List<String>> groupMembers;
Map<String, List<String>> memberInGroups;
Map<String, List<String>> grpWithMembersIndex;
Map<String, List<String>> taskInGrpsIndex;
// todo: Need to support single group dependent multiple other groups
Map<String, Pair<String, Long>> groupDependencies;
Map<String, Pair<String, Long>> taskWithDependentGrpsIndex;

@Override
public String constructClusterSpec(String taskId) throws IOException {
Expand Down Expand Up @@ -140,11 +140,11 @@ public boolean isHealthy(Configuration tonyConf) {
* So if we use the configuration as follows, when evaluator is still running after timeout and
* chief/workers are finished, the mechanism of dependency group timeout will make job failed.
*
* dependency group timeout configuration as follows:
* Dependency group timeout configuration as follows:
*
* tony.application.group.A = worker,chief
* tony.application.group.B = evaluator
* tony.application.dependency.B.after.timeout.A = 3600
* tony.application.dependency.evaluator.timeout.after.A = 3600
*
*/
String errorMsg = groupDependencyTimeout(tonyConf);
if (errorMsg != null) {
Expand All @@ -156,63 +156,70 @@ public boolean isHealthy(Configuration tonyConf) {

@VisibleForTesting
protected String groupDependencyTimeout(Configuration tonyConf) {
if (groupDependencies == null) {
groupDependencies = Utils.getGroupDependencies(tonyConf);
if (taskWithDependentGrpsIndex == null) {
taskWithDependentGrpsIndex = Utils.getJobTypeDependentGrps(tonyConf);
}

if (groupDependencies == null || groupDependencies.isEmpty()) {
// groupDependencies is map, key: waiting role, value: pre-dependent groups and waiting timeout
if (taskWithDependentGrpsIndex == null || taskWithDependentGrpsIndex.isEmpty()) {
return null;
}

if (groupMembers == null) {
groupMembers = Utils.getAllGroupJobTypes(tonyConf);
// groupMembers is map, key: groupName, value: its members in this group
if (grpWithMembersIndex == null) {
grpWithMembersIndex = Utils.getAllGroupJobTypes(tonyConf);
}

if (memberInGroups == null) {
memberInGroups = getMemberInGroups(groupMembers);
// memberInGroups is map. key: jobtype name, value: in which groups
if (taskInGrpsIndex == null) {
taskInGrpsIndex = getMemberInGroups(grpWithMembersIndex);
}


Map<String, TonySession.TonyTask[]> allTasks = session.getTonyTasks();
List<TonySession.TonyTask> runningTasks = session.getRunningTasks();

// Get the running jobs' type, like the tf roles of ps/worker/chief/evaluator
Set<String> runningJobTypes = runningTasks.stream()
.map(TonySession.TonyTask::getJobName)
.filter(jobname -> memberInGroups.containsKey(jobname))
.filter(jobname -> taskWithDependentGrpsIndex.containsKey(jobname))
.collect(Collectors.toSet());

for (String runningTaskType : runningJobTypes) {
for (String group : memberInGroups.get(runningTaskType)) {
if (!groupDependencies.containsKey(group)) {
continue;
}
Pair<String, Long> dependentGroupPair = taskWithDependentGrpsIndex.get(runningTaskType);
String dependentGroupName = dependentGroupPair.getKey();
long timeout = dependentGroupPair.getValue() * 1000;

Pair<String, Long> dependentGroupPair = groupDependencies.get(group);
String dependentGroupName = dependentGroupPair.getKey();
long timeout = dependentGroupPair.getValue() * 1000;
if (!grpWithMembersIndex.containsKey(dependentGroupName)) {
continue;
}

if (!groupMembers.containsKey(dependentGroupName)) {
continue;
}
boolean allDependentTaskFinished = true;
long latestEndTimeInAllDependentTasks = 0L;
for (String dependentsGroupJobtype : grpWithMembersIndex.get(dependentGroupName)) {

for (String dependentsGroupJobtype : groupMembers.get(dependentGroupName)) {
if (Utils.existRunningTasksWithJobtype(runningTasks, dependentsGroupJobtype)) {
continue;
}
// Find out the latest finished task in this task type, if the specified timeout exceed,
// make the job fail.
long latestFinishedTime =
Arrays.stream(allTasks.get(dependentsGroupJobtype))
.mapToLong(x -> x.getEndTime() == 0L ? System.currentTimeMillis() : x.getEndTime())
.max().getAsLong();

if (System.currentTimeMillis() - latestFinishedTime > timeout) {
return String.format("Jobtype: %s in group: %s runs exceeded timeout due it's "
+ "dependent jobtype: %s in group: %s has been finished.",
runningTaskType, group, dependentsGroupJobtype, dependentGroupName);
}
if (Utils.existRunningTasksWithJobtype(runningTasks, dependentsGroupJobtype)) {
allDependentTaskFinished = false;
break;
}

// Find out the latest finished task in this task type, if the specified timeout exceed,
// make the job fail.
latestEndTimeInAllDependentTasks = Math.max(
Arrays.stream(allTasks.get(dependentsGroupJobtype))
.mapToLong(x -> x.getEndTime())
.max().getAsLong(),
latestEndTimeInAllDependentTasks
);
}

if (!allDependentTaskFinished) {
continue;
}

if (System.currentTimeMillis() - latestEndTimeInAllDependentTasks > timeout) {
return String.format("Jobtype: %s runs exceeded timeout because it's "
+ "dependent group: %s (task set: [%s]) has been finished.",
runningTaskType, dependentGroupName,
StringUtils.join(grpWithMembersIndex.get(dependentGroupName), ","));
}
}

Expand Down
4 changes: 2 additions & 2 deletions tony-core/src/main/java/com/linkedin/tony/util/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -462,11 +462,11 @@ public static int getNumTotalTasks(Configuration conf) {
.sum();
}

public static Map<String, Pair<String, Long>> getGroupDependencies(Configuration tonyConf) {
public static Map<String, Pair<String, Long>> getJobTypeDependentGrps(Configuration tonyConf) {
return tonyConf.getValByRegex(TonyConfigurationKeys.GROUP_DEPEND_TIMEOUT_REGEX).keySet().stream()
.map(Utils::getDependentGrps)
.map(pair -> Utils.getDependentTimeout(tonyConf, pair))
.collect(Collectors.toMap(Triple::getLeft, x -> Pair.of(x.getMiddle(), x.getRight())));
.collect(Collectors.toMap(Triple::getLeft, x -> Pair.of(x.getMiddle(), x.getRight()), (oldV, newV) -> newV));
}

private static Triple<String, String, Long> getDependentTimeout(Configuration tonyConf, Pair<String, String> pair) {
Expand Down
4 changes: 1 addition & 3 deletions tony-core/src/test/java/com/linkedin/tony/TestTonyE2E.java
Original file line number Diff line number Diff line change
Expand Up @@ -642,10 +642,8 @@ public void testGroupDependencyTimeoutShouldPass() throws ParseException, IOExce
"--conf", "tony.worker.instances=2",
"--conf", "tony.worker.command=python forever_not_exit.py",
"--conf", "tony.application.framework=tensorflow",
"--container_env", Constants.SIDECAR_TB_TEST_KEY + "=true",
"--conf", "tony.application.group.A=chief",
"--conf", "tony.application.group.B=worker",
"--conf", "tony.application.dependency.B.timeout.after.A=10",
"--conf", "tony.application.dependency.worker.timeout.after.A=10",
});
client.addListener(handler);
int exitCode = client.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.linkedin.tony.runtime;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.CollectorInfo;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.testng.Assert;
import org.testng.annotations.BeforeTest;
Expand Down Expand Up @@ -96,8 +97,8 @@ public void testNeedReserveTBPort() {
public void testGroupDependencyNoConfShouldPass() {
Configuration conf = new Configuration();
conf.addResource("tony-default.xml");
conf.set("tony.application.dependency.A.timeout.after.B", "3600");
conf.set("tony.application.dependency.B.timeout.after.C", "3600");
conf.set("tony.application.dependency.evaluator.timeout.after.B", "3600");
conf.set("tony.application.dependency.chief.timeout.after.C", "3600");

TonySession session = buildMockSession(conf);
MLGenericRuntime.AM am = (MLGenericRuntime.AM) runtime.getAMAdapter();
Expand All @@ -112,8 +113,7 @@ public void testGroupDependencyShouldPass() {
Configuration conf = new Configuration();
conf.addResource("tony-default.xml");
conf.set("tony.application.group.A", "worker,chief");
conf.set("tony.application.group.B", "evaluator");
conf.set("tony.application.dependency.B.timeout.after.A", "3600");
conf.set("tony.application.dependency.evaluator.timeout.after.A", "3600");

TonySession session = buildMockSession(conf);
TonySession.TonyTask chiefTask = session.getTask("chief", "0");
Expand All @@ -123,8 +123,8 @@ public void testGroupDependencyShouldPass() {
am.setTonySession(session);
Assert.assertEquals(
am.groupDependencyTimeout(conf),
"Jobtype: evaluator in group: B runs exceeded timeout due it's dependent "
+ "jobtype: chief in group: A has been finished."
"Jobtype: evaluator runs exceeded timeout because it's dependent group: A "
+ "(task set: [worker,chief]) has been finished."
);
}

Expand All @@ -133,8 +133,7 @@ public void testGroupDependencyWorkerWhenChiefFinished() {
Configuration conf = new Configuration();
conf.addResource("tony-default.xml");
conf.set("tony.application.group.A", "chief");
conf.set("tony.application.group.B", "otherWorker");
conf.set("tony.application.dependency.B.timeout.after.A", "3600");
conf.set("tony.application.dependency.otherWorker.timeout.after.A", "3600");

TonySession session = buildMockSession(conf);
TonySession.TonyTask chiefTask = session.getTask("chief", "0");
Expand All @@ -144,7 +143,7 @@ public void testGroupDependencyWorkerWhenChiefFinished() {
am.setTonySession(session);
Assert.assertEquals(
am.groupDependencyTimeout(conf),
"Jobtype: otherWorker in group: B runs exceeded timeout due it's dependent jobtype: chief in group: A has been finished."
"Jobtype: otherWorker runs exceeded timeout because it's dependent group: A (task set: [chief]) has been finished."
);
}

Expand All @@ -153,12 +152,10 @@ public void testGroupDependencyWithMultipleGroup() {
Configuration conf = new Configuration();
conf.addResource("tony-default.xml");
conf.set("tony.application.group.A", "chief");
conf.set("tony.application.group.B", "otherWorker");
conf.set("tony.application.dependency.B.timeout.after.A", String.valueOf(60 * 240));
conf.set("tony.application.dependency.otherWorker.timeout.after.A", String.valueOf(60 * 240));

conf.set("tony.application.group.C", "chief");
conf.set("tony.application.group.D", "otherWorker");
conf.set("tony.application.dependency.D.timeout.after.C", "3600");
conf.set("tony.application.group.B", "chief,worker");
conf.set("tony.application.dependency.evaluator.timeout.after.B", "3600");

TonySession session = buildMockSession(conf);
TonySession.TonyTask chiefTask = session.getTask("chief", "0");
Expand All @@ -168,17 +165,43 @@ public void testGroupDependencyWithMultipleGroup() {
am.setTonySession(session);
Assert.assertEquals(
am.groupDependencyTimeout(conf),
"Jobtype: otherWorker in group: D runs exceeded timeout due it's dependent jobtype: chief in group: C has been finished."
"Jobtype: evaluator runs exceeded timeout because it's dependent group: B (task set: [chief,worker]) has been finished."
);
}

/**
* Test case as follows:
* the role of chief has been finished, and otherWorker is running and not exceed the timeout. so it should pass
*/
@Test
public void testGroupDependencyWithoutTimeoutMultipleGroup() {
Configuration conf = new Configuration();
conf.addResource("tony-default.xml");
conf.set("tony.application.group.A", "chief");
conf.set("tony.application.group.B", "otherWorker");
conf.set("tony.application.dependency.B.timeout.after.A", String.valueOf(60 * 240));
conf.set("tony.application.dependency.otherWorker.timeout.after.A", String.valueOf(60 * 240));

TonySession session = buildMockSession(conf);
TonySession.TonyTask chiefTask = session.getTask("chief", "0");
chiefTask.setEndTime(System.currentTimeMillis() - 1000 * 60 * 120);

MLGenericRuntime.AM am = (MLGenericRuntime.AM) runtime.getAMAdapter();
am.setTonySession(session);
Assert.assertNull(
am.groupDependencyTimeout(conf)
);
}

/**
* Test case as follows:
* the role of chief has finished, but otherWorker is running.
* And the role of evaluator depends on GroupA including chief and otherWorker, so it will not throw exception.
*/
@Test
public void testGrpDependentWithoutTimeout() {
Configuration conf = new Configuration();
conf.addResource("tony-default.xml");
conf.set("tony.application.group.A", "chief,otherWorker");
conf.set("tony.application.dependency.evaluator.timeout.after.A", String.valueOf(60 * 240));

TonySession session = buildMockSession(conf);
TonySession.TonyTask chiefTask = session.getTask("chief", "0");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ public void testGetGroupDependencies() {
conf.set("tony.application.dependency.A.timeout.after.B", "3600");
conf.set("tony.application.dependency.B.timeout.after.C", "3600");

Map<String, Pair<String, Long>> dependenciesIndex = Utils.getGroupDependencies(conf);
Map<String, Pair<String, Long>> dependenciesIndex = Utils.getJobTypeDependentGrps(conf);
assertTrue(dependenciesIndex.containsKey("A"));
assertTrue(dependenciesIndex.containsKey("B"));
assertEquals(dependenciesIndex.get("A").getKey(), "B");
Expand Down

0 comments on commit 6eb393e

Please sign in to comment.