Skip to content

Commit

Permalink
Configurable status when dependency times out
Browse files Browse the repository at this point in the history
  • Loading branch information
zuston committed Feb 5, 2022
1 parent a014d68 commit f390386
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 6 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -215,4 +215,4 @@ For more information about TonY, check out the following:

3. My tensorflow's partial workers hang when chief finished. Or evaluator hang when chief and workers finished.

Please see the [PR#521](https://github.com/tony-framework/TonY/pull/621) on Tensorflow configuration to solve it.
Please see the [PR#521](https://github.com/tony-framework/TonY/pull/621) and [PR#641](https://github.com/tony-framework/TonY/issues/641) on Tensorflow configuration to solve it.
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,8 @@ public static String getContainerDockerMountKey() {
public static final String GROUP_REGEX = TONY_APPLICATION_PREFIX + "group\\.([A-Za-z]+)$";
public static final String GROUP_DEPEND_TIMEOUT_REGEX =
TONY_APPLICATION_PREFIX + "dependency\\.([A-Za-z]+)\\.timeout\\.after\\.([A-Za-z]+)$";
public static final String GROUP_DEPEND_TIMEOUT_IGNORE_REGEX =
TONY_APPLICATION_PREFIX + "dependency\\.([A-Za-z]+)\\.timeout\\.after\\.([A-Za-z]+)$.ignored";

public static String getGroupKey(String groupName) {
return String.format(TONY_APPLICATION_PREFIX + "group.%s", groupName);
Expand All @@ -357,4 +359,8 @@ public static String getGroupKey(String groupName) {
public static String getGroupDependentKey(String grp, String dependentGrp) {
return String.format(TONY_APPLICATION_PREFIX + "dependency.%s.timeout.after.%s", grp, dependentGrp);
}

public static String getGroupDependentIgnoredKey(String roleType, String dependentGrp) {
return String.format(TONY_APPLICATION_PREFIX + "dependency.%s.timeout.after.%s.ignored", roleType, dependentGrp);
}
}
10 changes: 10 additions & 0 deletions tony-core/src/main/java/com/linkedin/tony/TonySession.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -37,6 +38,8 @@

import static com.linkedin.tony.Constants.CHIEF_JOB_NAME;
import static com.linkedin.tony.Constants.WORKER_JOB_NAME;
import static com.linkedin.tony.TonyConfigurationKeys.UNTRACKED_JOBTYPES;
import static com.linkedin.tony.util.Utils.getUntrackedJobTypes;


/**
Expand Down Expand Up @@ -670,4 +673,11 @@ public int getNumRegisteredTasks() {
public Set<String> getRegisteredTasks() {
return registeredTasks;
}

public void makeTaskTypeUntracked(String taskType) {
String[] defaultUntrackedTypes = getUntrackedJobTypes(tonyConf);
List<String> untrackedList = Arrays.stream(defaultUntrackedTypes).collect(Collectors.toList());
untrackedList.add(taskType);
tonyConf.set(UNTRACKED_JOBTYPES, StringUtils.join(untrackedList, ","));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import com.linkedin.tony.util.Utils;

import static com.linkedin.tony.Constants.SIDECAR_TB_ROLE_NAME;
import static com.linkedin.tony.TonyConfigurationKeys.getGroupDependentIgnoredKey;

public abstract class MLGenericRuntime extends AbstractFrameworkRuntime {
private static final long REGISTRATION_STATUS_INTERVAL_MS = 15 * 1000;
Expand Down Expand Up @@ -141,9 +142,16 @@ public boolean isHealthy(Configuration tonyConf) {
* chief/workers are finished, the mechanism of dependency group timeout will make job failed.
*
* Dependency group timeout configuration as follows:
*
* ```
* tony.application.group.A = worker,chief
* tony.application.dependency.evaluator.timeout.after.A = 3600
* ```
*
* And in some of the cases, we don't want to fail the whole job even though a dependency times out.
* For example, if chief succeeded and there is a worker hanging for 1 hour,
* users can configure the job to still pass. So it introduces the new config of
* `tony.application.dependency.[X].timeout.after.[GROUP].ignored = true`, and more details could be
* found in https://github.com/tony-framework/TonY/issues/641.
*
*/
String errorMsg = groupDependencyTimeout(tonyConf);
Expand Down Expand Up @@ -231,10 +239,17 @@ protected String groupDependencyTimeout(Configuration tonyConf) {
}

if (System.currentTimeMillis() - latestEndTimeInAllDependentTasks > timeout) {
return String.format("Jobtype: %s runs exceeded timeout because it's "
+ "dependent group: %s (task set: [%s]) has been finished.",
runningTaskType, dependentGroupName,
StringUtils.join(grpWithMembersIndex.get(dependentGroupName), ","));

String ignoredTaskTypeKey = getGroupDependentIgnoredKey(runningTaskType, dependentGroupName);
boolean ignoreTimeout = tonyConf.getBoolean(ignoredTaskTypeKey, false);
if (!ignoreTimeout) {
return String.format("Task Type: %s runs exceeded timeout because it's "
+ "dependent group: %s (task set: [%s]) has been finished.",
runningTaskType, dependentGroupName,
StringUtils.join(grpWithMembersIndex.get(dependentGroupName), ","));
}

session.makeTaskTypeUntracked(runningTaskType);
}
}

Expand Down
49 changes: 49 additions & 0 deletions tony-core/src/test/java/com/linkedin/tony/TestTonyE2E.java
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,55 @@ public void testGroupDependencyTimeoutShouldPass() throws ParseException, IOExce
client.removeListener(handler);
}

/**
* When enable the conf of "tony.application.dependency.worker.timeout.after.A.ignored=true",
* it should make the job succeed.
*/
@Test
public void testTaskWithDependencyTimeoutButIgnoredShouldPass() throws Exception {
client.init(new String[]{
"--src_dir", "tony-core/src/test/resources/scripts",
"--hdfs_classpath", libPath,
"--container_env", Constants.SKIP_HADOOP_PATH + "=true",
"--python_venv", "tony-core/src/test/resources/test.zip",
"--executes", "python exit_0.py",
"--conf", "tony.chief.instances=1",
"--conf", "tony.worker.instances=2",
"--conf", "tony.worker.command=python forever_not_exit.py",
"--conf", "tony.application.framework=tensorflow",
"--conf", "tony.application.group.A=chief",
"--conf", "tony.application.dependency.worker.timeout.after.A=10",
"--conf", "tony.application.dependency.worker.timeout.after.A.ignored=true",
});
int exitCode = client.start();
Assert.assertEquals(exitCode, 0);
}

/**
* Test task-dependency-timeout with the task role of PS (the default untracked job type)
*/
@Test
public void testTaskWithDependencyTimeoutButIgnoredAndWithPSShouldPass() throws Exception {
client.init(new String[]{
"--src_dir", "tony-core/src/test/resources/scripts",
"--hdfs_classpath", libPath,
"--container_env", Constants.SKIP_HADOOP_PATH + "=true",
"--python_venv", "tony-core/src/test/resources/test.zip",
"--executes", "python exit_0.py",
"--conf", "tony.chief.instances=1",
"--conf", "tony.worker.instances=2",
"--conf", "tony.worker.command=python forever_not_exit.py",
"--conf", "tony.ps.instances=1",
"--conf", "tony.ps.command=python forever_not_exit.py",
"--conf", "tony.application.framework=tensorflow",
"--conf", "tony.application.group.A=chief",
"--conf", "tony.application.dependency.worker.timeout.after.A=10",
"--conf", "tony.application.dependency.worker.timeout.after.A.ignored=true",
});
int exitCode = client.start();
Assert.assertEquals(exitCode, 0);
}

@Test
public void testLostConnectionWithAMJobShouldFail() throws Exception {
client.init(new String[]{
Expand Down

0 comments on commit f390386

Please sign in to comment.