Skip to content

Commit

Permalink
Avoid registering listeners on already completed tasks
Browse files Browse the repository at this point in the history
Signed-off-by: Ketan Verma <ketan9495@gmail.com>
  • Loading branch information
ketanv3 committed Aug 2, 2022
1 parent d89de65 commit 2027602
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 5 deletions.
12 changes: 9 additions & 3 deletions server/src/main/java/org/opensearch/tasks/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -410,10 +410,16 @@ public TaskResult result(DiscoveryNode node, ActionResponse response) throws IOE
}

/**
* Registers a task resource tracking completion listener on this task.
* Registers a task resource tracking completion listener on this task if resource tracking is still active.
* Returns true on successful subscription, false otherwise.
*/
public void addResourceTrackingCompletionListener(NotifyOnceListener<Task> listener) {
resourceTrackingCompletionListeners.add(listener);
public boolean addResourceTrackingCompletionListener(NotifyOnceListener<Task> listener) {
if (numActiveResourceTrackingThreads.get() > 0) {
resourceTrackingCompletionListeners.add(listener);
return true;
}

return false;
}

/**
Expand Down
9 changes: 8 additions & 1 deletion server/src/main/java/org/opensearch/tasks/TaskManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public Task register(String type, String action, TaskAwareRequest request) {
}

if (task.supportsResourceTracking()) {
task.addResourceTrackingCompletionListener(new NotifyOnceListener<>() {
boolean success = task.addResourceTrackingCompletionListener(new NotifyOnceListener<>() {
@Override
protected void innerOnResponse(Task task) {
// Stop tracking the task once the last thread has been marked inactive.
Expand All @@ -175,6 +175,13 @@ protected void innerOnFailure(Exception e) {
ExceptionsHelper.reThrowIfNotNull(e);
}
});

if (success == false) {
logger.debug(
"failed to register a completion listener as task resource tracking has already completed [taskId={}]",
task.getId()
);
}
}

if (task instanceof CancellableTask) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ protected void doRun() {
// operationFinishedValidator will be called just after all task threads are marked inactive and
// the task is unregistered.
if (taskTestContext.operationFinishedValidator != null) {
task.addResourceTrackingCompletionListener(new NotifyOnceListener<>() {
boolean success = task.addResourceTrackingCompletionListener(new NotifyOnceListener<>() {
@Override
protected void innerOnResponse(Task task) {
taskTestContext.operationFinishedValidator.accept(task, threadId.get());
Expand All @@ -196,6 +196,10 @@ protected void innerOnFailure(Exception e) {
ExceptionsHelper.reThrowIfNotNull(e);
}
});

if (success == false) {
fail("failed to register a completion listener as task resource tracking has already completed");
}
}

Object[] allocation1 = new Object[1000000]; // 4MB
Expand Down

0 comments on commit 2027602

Please sign in to comment.