From 1da70365a9e0c67d77610df05c4b9b9e10eb2db5 Mon Sep 17 00:00:00 2001 From: Jermy Li Date: Tue, 2 Mar 2021 14:51:25 +0800 Subject: [PATCH] fix task: atomic update/get fields and re-schedule (#1361) * atomic update and get task status&result * call resubmitTask() when re-schedule task Change-Id: I2efa9fd4979492c6901e1ae11fbaf2be451f0354 --- .../com/baidu/hugegraph/task/HugeTask.java | 34 +++++++++---------- .../hugegraph/task/StandardTaskScheduler.java | 19 +++++++++++ .../baidu/hugegraph/task/TaskCallable.java | 2 +- 3 files changed, 37 insertions(+), 18 deletions(-) diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/task/HugeTask.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/task/HugeTask.java index 55cebaf09b..28086ba399 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/task/HugeTask.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/task/HugeTask.java @@ -222,9 +222,13 @@ public String result() { return this.result; } - private void result(String result) { + private synchronized boolean result(TaskStatus status, String result) { checkPropertySize(result, P.RESULT); - this.result = result; + if (this.status(status)) { + this.result = result; + return true; + } + return false; } public void server(Id server) { @@ -319,18 +323,17 @@ public boolean fail(Throwable e) { LOG.warn("An exception occurred when running task: {}", this.id(), e); // Update status to FAILED if exception occurred(not interrupted) - if (this.status(TaskStatus.FAILED)) { - this.result(e.toString()); + if (this.result(TaskStatus.FAILED, e.toString())) { return true; } } return false; } - public void failSave(Throwable e) { + public void failToSave(Throwable e) { if (!this.fail(e)) { // Can't update status, just set result to error message - this.result(e.toString()); + this.result = e.toString(); } } @@ -352,9 +355,7 @@ protected void done() { protected void set(V v) { String result = JsonUtil.toJson(v); checkPropertySize(result, P.RESULT); - if (this.status(TaskStatus.SUCCESS)) { - this.result = result; - } else { + if (!this.result(TaskStatus.SUCCESS, result)) { assert this.completed(); } // Will call done() and may cause to save to store @@ -381,22 +382,21 @@ protected boolean checkDependenciesSuccess() { if (this.dependencies == null || this.dependencies.isEmpty()) { return true; } + TaskScheduler scheduler = this.scheduler(); for (Id dependency : this.dependencies) { - HugeTask task = this.scheduler().task(dependency); + HugeTask task = scheduler.task(dependency); if (!task.completed()) { // Dependent task not completed, re-schedule self - this.scheduler().schedule(this); + scheduler.schedule(this); return false; } else if (task.status() == TaskStatus.CANCELLED) { - this.status(TaskStatus.CANCELLED); - this.result(String.format( + this.result(TaskStatus.CANCELLED, String.format( "Cancelled due to dependent task '%s' cancelled", dependency)); this.done(); return false; } else if (task.status() == TaskStatus.FAILED) { - this.status(TaskStatus.FAILED); - this.result(String.format( + this.result(TaskStatus.FAILED, String.format( "Failed due to dependent task '%s' failed", dependency)); this.done(); @@ -483,7 +483,7 @@ protected void property(String key, Object value) { } } - protected Object[] asArray() { + protected synchronized Object[] asArray() { E.checkState(this.type != null, "Task type can't be null"); E.checkState(this.name != null, "Task name can't be null"); @@ -563,7 +563,7 @@ public Map asMap() { return this.asMap(true); } - public Map asMap(boolean withDetails) { + public synchronized Map asMap(boolean withDetails) { E.checkState(this.type != null, "Task type can't be null"); E.checkState(this.name != null, "Task name can't be null"); diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/task/StandardTaskScheduler.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/task/StandardTaskScheduler.java index 5d46899d8c..3bbf723328 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/task/StandardTaskScheduler.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/task/StandardTaskScheduler.java @@ -205,6 +205,15 @@ private Future restore(HugeTask task) { public Future schedule(HugeTask task) { E.checkArgumentNotNull(task, "Task can't be null"); + if (task.status() == TaskStatus.QUEUED) { + /* + * Just submit to queue if status=QUEUED (means re-schedule task) + * NOTE: schedule() method may be called multi times by + * HugeTask.checkDependenciesSuccess() method + */ + return this.resubmitTask(task); + } + if (task.callable() instanceof EphemeralJob) { /* * Due to EphemeralJob won't be serialized and deserialized through @@ -252,6 +261,16 @@ private Future submitTask(HugeTask task) { return this.taskExecutor.submit(task); } + private Future resubmitTask(HugeTask task) { + E.checkArgument(task.status() == TaskStatus.QUEUED, + "Can't resubmit task '%s' with status %s", + task.id(), TaskStatus.QUEUED); + E.checkArgument(this.tasks.containsKey(task.id()), + "Can't resubmit task '%s' not been submitted before", + task.id()); + return this.taskExecutor.submit(task); + } + public void initTaskCallable(HugeTask task) { task.scheduler(this); diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/task/TaskCallable.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/task/TaskCallable.java index bb21ac8554..cc37a34327 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/task/TaskCallable.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/task/TaskCallable.java @@ -117,7 +117,7 @@ protected void save() { e, task.asMap(false)); String message = e.getMessage(); if (message.contains(ERROR_COMMIT) && needSaveWithEx(message)) { - task.failSave(e); + task.failToSave(e); this.graph().taskScheduler().save(task); return; }