Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Search Task Resource Tracking PoC #1643

Conversation

tushar-kharbanda72
Copy link
Contributor

@tushar-kharbanda72 tushar-kharbanda72 commented Dec 2, 2021

Description

This PoC is to capture the system resource overhead for Search requests either on data node or coordinator node. It captures all the heap allocations being done by the threads running on search threadpool. It also captures the heap overhead for the responses received on coordinator while it is waiting for other data nodes to respond back (this serialization of response is generally done on transport_worker and doesn't get accounted for search threadpool tracking).

This PoC just aims to validate correctness and performance impact. Post these validations I'll focus more on concrete design aspect of the changes

Issues Resolved

#1179

Check List

  • New functionality includes testing.
    • All tests pass
  • New functionality has been documented.
    • New functionality has javadoc added
  • Commits are signed per the DCO using --signoff

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@tushar-kharbanda72 tushar-kharbanda72 requested a review from a team as a code owner December 2, 2021 08:54
@opensearch-ci-bot
Copy link
Collaborator

Can one of the admins verify this patch?

@opensearch-ci-bot
Copy link
Collaborator

✅   Gradle Wrapper Validation success a439a4d

@opensearch-ci-bot
Copy link
Collaborator

❌   Gradle Precommit failure a439a4d
Log 1683

@opensearch-ci-bot
Copy link
Collaborator

❌   Gradle Check failure a439a4d
Log 1298

Reports 1298

@@ -283,6 +287,10 @@ public void innerOnResponse(Result result) {
} finally {
executeNext(pendingExecutions, thread);
}
ThreadMXBean threadMXBean = (ThreadMXBean) ManagementFactory.getThreadMXBean();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is expensive. Move this to a static final variable

@@ -283,6 +287,10 @@ public void innerOnResponse(Result result) {
} finally {
executeNext(pendingExecutions, thread);
}
ThreadMXBean threadMXBean = (ThreadMXBean) ManagementFactory.getThreadMXBean();
long bytes = threadMXBean.getThreadAllocatedBytes(Thread.currentThread().getId());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, it is worth checking if thread allocation tracking is enabled & supported, for not doing unnecessary work: ThreadMXBean::isThreadAllocatedMemorySupported() and ThreadMXBean::isThreadAllocatedMemoryEnabled()

*/
private TaskResourceTracker() {
taskMap = new ConcurrentHashMap<>();
threadMXBean = (ThreadMXBean) ManagementFactory.getThreadMXBean();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

static final

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... and we could remove direct usage of theThreadMXBean from all other places, just fence it behind TaskResourceTracker

Comment on lines +156 to +169
// just register read operations
if (action.startsWith("indices:data/read")) {
if (threadContext.getTransient("TASK_ID") == null) {
threadContext.putTransient("TASK_ID", String.valueOf(task.getId()));

List<String> indices = new ArrayList<>();
if (request instanceof IndicesRequest) {
indices = Arrays.asList(((IndicesRequest) request).indices());
}
// TODO Add shard id handling
TaskResourceTracker.getInstance().registerTaskForTracking(task.getId(), indices, null, action);
}
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can't this be modelled as a TaskListener

import java.util.List;
import java.util.concurrent.ConcurrentHashMap;

public class TaskResourceTracker implements ResourceWatcher {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add java docs for all new classes

Comment on lines +58 to +73
public void registerTaskForTracking(long taskId, List<String> indices, ShardId shardId, String actionName) {
taskMap.put(new TaskInfoKey(taskId, indices, shardId, actionName), new ArrayList<>());
}

public void registerWorkerForTask(long taskId, long workerId, long cpuCurrent, long bytesCurrent, String threadpoolName) {
// TODO remove this after identifying cases where it can be true
if (taskMap.get(new TaskInfoKey(taskId)) == null) {
return;
}

TaskWorkerResourceUtilInfo taskWorkerResourceUtilInfo =
new TaskWorkerResourceUtilInfo(workerId, cpuCurrent, cpuCurrent, bytesCurrent, bytesCurrent,
true, threadpoolName);

taskMap.get(new TaskInfoKey(taskId)).add(taskWorkerResourceUtilInfo);
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above maybe model as a TaskListener?

Comment on lines +28 to +33
public TaskInfoKey(long taskId, List<String> indices, ShardId shardId, String action) {
this.taskId = taskId;
this.indices = indices;
this.shardId = shardId;
this.action = action;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Task is so far not bound to a ShardId, this should be more generic

Comment on lines +302 to +304
ThreadMXBean threadMXBean = (ThreadMXBean) ManagementFactory.getThreadMXBean();
long bytesStart = threadMXBean.getThreadAllocatedBytes(Thread.currentThread().getId());

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ThreadMXBean is spread all over the place. Lets simplify

Comment on lines +307 to +313

// long bytesEnd = threadMXBean.getThreadAllocatedBytes(Thread.currentThread().getId());
if (response instanceof SearchPhaseResult) {
// TaskResourceTracker.getInstance().registerResponseOverhead(((SearchPhaseResult) response).getShardSearchRequest().getParentTask().getId(), bytesEnd - bytesStart);
TaskResourceTracker.getInstance().registerResponseOverhead1(response, bytesStart);
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why place it in InboundHandler

import org.opensearch.ExceptionsHelper;
import org.opensearch.tasks.TaskResourceTracker;

public class ResourceRunnable extends AbstractRunnable implements WrappedRunnable {
Copy link
Collaborator

@Bukhtawar Bukhtawar Dec 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets add java docs to explain how threads running cost is computed and associated with the corresponding task

@@ -431,6 +432,7 @@ protected Node(
resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS));
final ResourceWatcherService resourceWatcherService = new ResourceWatcherService(settings, threadPool);
resourcesToClose.add(resourceWatcherService);
resourceWatcherService.add(TaskResourceTracker.getInstance(), ResourceWatcherService.Frequency.HIGH);
Copy link
Collaborator

@Bukhtawar Bukhtawar Dec 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to watch per 5s, seems wasteful. Instead it should be tied to the overall usage subject to high utilization beyond a threshold or individual task level resource utilization or on-demand

Comment on lines +292 to +293

TaskResourceTracker.getInstance().transfer(task.getId(), result, bytes);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be too hard to maintain the code base with this construct. Lets simplify

Comment on lines +95 to +99
if ("tw".equals(taskWorkerResourceUtilInfo.getThreadPoolName())) {
response[0] += taskWorkerResourceUtilInfo.getOverheardBytes();
} else {
search[0] += taskWorkerResourceUtilInfo.getHeapNow() - taskWorkerResourceUtilInfo.getHeapStart();
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

transport worker is a very critical thread. I'll advice against any custom logic for it.

Comment on lines +135 to +144
public void transfer(long taskId, Object ob, long bytes) {
TaskInfoKey key = new TaskInfoKey(taskId);
if (!overhead.containsKey(ob) || taskMap.get(key) == null) return;

long bytesStart = overhead.get(ob);

TaskWorkerResourceUtilInfo t = new TaskWorkerResourceUtilInfo(1L, 0L, 0L, bytesStart, bytes, false, "tw");
taskMap.get(key).add(t);
overhead.remove(ob);
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please elaborate on this logic.

Comment on lines +91 to +98
taskMap.forEach((taskInfoKey, taskWorkerResourceUtilInfos) -> taskWorkerResourceUtilInfos.forEach(taskWorkerResourceUtilInfo -> {
if (taskWorkerResourceUtilInfo.isActive()) {
taskWorkerResourceUtilInfo.setHeapNow(threadMXBean.getThreadAllocatedBytes(taskWorkerResourceUtilInfo.getWorkerId()));
}
if ("tw".equals(taskWorkerResourceUtilInfo.getThreadPoolName())) {
response[0] += taskWorkerResourceUtilInfo.getOverheardBytes();
} else {
search[0] += taskWorkerResourceUtilInfo.getHeapNow() - taskWorkerResourceUtilInfo.getHeapStart();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

@Bukhtawar Bukhtawar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest you use the below static assignment for optimizing the overhead of ThreadMXBean

+    static {
+        threadMXBean = ManagementFactory.getThreadMXBean();
+        Method getBytes;
+        try {
+            getBytes = threadMXBean.getClass()
+                    .getMethod("getThreadAllocatedBytes", long[].class);
+            getBytes.setAccessible(true);
+        } catch (NoSuchMethodException e) {
+            getBytes = null;
+        }
+        getThreadAllocatedBytes = getBytes;
+    }

@@ -150,6 +153,20 @@ public Task register(String type, String action, TaskAwareRequest request) {
logger.trace("register {} [{}] [{}] [{}]", task.getId(), type, action, task.getDescription());
}

// just register read operations
if (action.startsWith("indices:data/read")) {
Copy link
Collaborator

@reta reta Dec 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest to enrich action with something like isResourceTrackingEnabled() and use it as an indicator of the need to track resources. Also, the tracking key (in this case, indices, but is action specific) has to be provided by the action as as well, fe as getResourceTrackingKey method.

Comment on lines +62 to +73
public void registerWorkerForTask(long taskId, long workerId, long cpuCurrent, long bytesCurrent, String threadpoolName) {
// TODO remove this after identifying cases where it can be true
if (taskMap.get(new TaskInfoKey(taskId)) == null) {
return;
}

TaskWorkerResourceUtilInfo taskWorkerResourceUtilInfo =
new TaskWorkerResourceUtilInfo(workerId, cpuCurrent, cpuCurrent, bytesCurrent, bytesCurrent,
true, threadpoolName);

taskMap.get(new TaskInfoKey(taskId)).add(taskWorkerResourceUtilInfo);
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There could be concurrency bugs with these checks between the two gets

TaskInfoKey key = new TaskInfoKey(taskId);
if (!overhead.containsKey(ob) || taskMap.get(key) == null) return;

long bytesStart = overhead.get(ob);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NPE here, if the key gets removed in between check and usage. Use Long instead

@@ -74,6 +88,8 @@ public Stats(StreamInput in) throws IOException {
rejected = in.readLong();
largest = in.readInt();
completed = in.readLong();
bytes = in.readLong();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please wrap in version checks:

            if (in.getVersion().onOrAfter(Version.V_2_0_0)) {
               bytes = in.readLong();
               ro = in.readLong(); 
            }

@@ -85,6 +101,8 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeLong(rejected);
out.writeInt(largest);
out.writeLong(completed);
out.writeLong(bytes);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

            if (in.getVersion().onOrAfter(Version.V_2_0_0)) {
               out.writeLong(bytes);
               out.writeLong(ro);
            }

Copy link
Collaborator

@Bukhtawar Bukhtawar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Tushar, this is good stuff, I like the thought you have put into this espl this being your first contribution.

@tushar-kharbanda72
Copy link
Contributor Author

Thanks @reta and @Bukhtawar for taking out time and reviewing this PoC code. I have started thinking more on the final design for this and divided this into 4 to 5 meaningful chunks. Will start raising PRs for prod ready code from next week (making sure these comments are addressed as well).

@dblock
Copy link
Member

dblock commented Mar 21, 2022

@tushar-kharbanda72 Want to finish this?

@tushar-kharbanda72
Copy link
Contributor Author

@tushar-kharbanda72 Want to finish this?

@dblock Implemented this feature. Closing this POC PR.

Feature implementation PRs:

PR 1: #2089
PR 2: #2639

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants