Skip to content

Commit

Permalink
fix task: atomic update/get fields and re-schedule (#1361)
Browse files Browse the repository at this point in the history
* atomic update and get task status&result
* call resubmitTask() when re-schedule task

Change-Id: I2efa9fd4979492c6901e1ae11fbaf2be451f0354
  • Loading branch information
javeme committed Mar 29, 2021
1 parent 0f3be14 commit 1da7036
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 18 deletions.
34 changes: 17 additions & 17 deletions hugegraph-core/src/main/java/com/baidu/hugegraph/task/HugeTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -222,9 +222,13 @@ public String result() {
return this.result;
}

private void result(String result) {
private synchronized boolean result(TaskStatus status, String result) {
checkPropertySize(result, P.RESULT);
this.result = result;
if (this.status(status)) {
this.result = result;
return true;
}
return false;
}

public void server(Id server) {
Expand Down Expand Up @@ -319,18 +323,17 @@ public boolean fail(Throwable e) {
LOG.warn("An exception occurred when running task: {}",
this.id(), e);
// Update status to FAILED if exception occurred(not interrupted)
if (this.status(TaskStatus.FAILED)) {
this.result(e.toString());
if (this.result(TaskStatus.FAILED, e.toString())) {
return true;
}
}
return false;
}

public void failSave(Throwable e) {
public void failToSave(Throwable e) {
if (!this.fail(e)) {
// Can't update status, just set result to error message
this.result(e.toString());
this.result = e.toString();
}
}

Expand All @@ -352,9 +355,7 @@ protected void done() {
protected void set(V v) {
String result = JsonUtil.toJson(v);
checkPropertySize(result, P.RESULT);
if (this.status(TaskStatus.SUCCESS)) {
this.result = result;
} else {
if (!this.result(TaskStatus.SUCCESS, result)) {
assert this.completed();
}
// Will call done() and may cause to save to store
Expand All @@ -381,22 +382,21 @@ protected boolean checkDependenciesSuccess() {
if (this.dependencies == null || this.dependencies.isEmpty()) {
return true;
}
TaskScheduler scheduler = this.scheduler();
for (Id dependency : this.dependencies) {
HugeTask<?> task = this.scheduler().task(dependency);
HugeTask<?> task = scheduler.task(dependency);
if (!task.completed()) {
// Dependent task not completed, re-schedule self
this.scheduler().schedule(this);
scheduler.schedule(this);
return false;
} else if (task.status() == TaskStatus.CANCELLED) {
this.status(TaskStatus.CANCELLED);
this.result(String.format(
this.result(TaskStatus.CANCELLED, String.format(
"Cancelled due to dependent task '%s' cancelled",
dependency));
this.done();
return false;
} else if (task.status() == TaskStatus.FAILED) {
this.status(TaskStatus.FAILED);
this.result(String.format(
this.result(TaskStatus.FAILED, String.format(
"Failed due to dependent task '%s' failed",
dependency));
this.done();
Expand Down Expand Up @@ -483,7 +483,7 @@ protected void property(String key, Object value) {
}
}

protected Object[] asArray() {
protected synchronized Object[] asArray() {
E.checkState(this.type != null, "Task type can't be null");
E.checkState(this.name != null, "Task name can't be null");

Expand Down Expand Up @@ -563,7 +563,7 @@ public Map<String, Object> asMap() {
return this.asMap(true);
}

public Map<String, Object> asMap(boolean withDetails) {
public synchronized Map<String, Object> asMap(boolean withDetails) {
E.checkState(this.type != null, "Task type can't be null");
E.checkState(this.name != null, "Task name can't be null");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,15 @@ private <V> Future<?> restore(HugeTask<V> task) {
public <V> Future<?> schedule(HugeTask<V> task) {
E.checkArgumentNotNull(task, "Task can't be null");

if (task.status() == TaskStatus.QUEUED) {
/*
* Just submit to queue if status=QUEUED (means re-schedule task)
* NOTE: schedule() method may be called multi times by
* HugeTask.checkDependenciesSuccess() method
*/
return this.resubmitTask(task);
}

if (task.callable() instanceof EphemeralJob) {
/*
* Due to EphemeralJob won't be serialized and deserialized through
Expand Down Expand Up @@ -252,6 +261,16 @@ private <V> Future<?> submitTask(HugeTask<V> task) {
return this.taskExecutor.submit(task);
}

private <V> Future<?> resubmitTask(HugeTask<V> task) {
E.checkArgument(task.status() == TaskStatus.QUEUED,
"Can't resubmit task '%s' with status %s",
task.id(), TaskStatus.QUEUED);
E.checkArgument(this.tasks.containsKey(task.id()),
"Can't resubmit task '%s' not been submitted before",
task.id());
return this.taskExecutor.submit(task);
}

public <V> void initTaskCallable(HugeTask<V> task) {
task.scheduler(this);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ protected void save() {
e, task.asMap(false));
String message = e.getMessage();
if (message.contains(ERROR_COMMIT) && needSaveWithEx(message)) {
task.failSave(e);
task.failToSave(e);
this.graph().taskScheduler().save(task);
return;
}
Expand Down

0 comments on commit 1da7036

Please sign in to comment.