From 7f715a4cbeb6e899b7784dd5cff91aad3bd0b66a Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Thu, 1 Aug 2024 16:53:03 +0200 Subject: [PATCH] Only use at most a single thread for search context freeing (#111156) Forking to `GENERIC` makes sense here since we sporadically block for a macroscopic amount of time to protect transport threads, but in almost all cases the operation operates on the same data structures and is very fast. Since it's also very frequent, we shouldn't be creating a bunch of generic threads during a burst -> lets throttle to a single thread. --- .../action/search/SearchTransportService.java | 37 +++++++++++++++++-- 1 file changed, 34 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java b/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java index fb3c49d83cb93..52d4542faaf77 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java @@ -24,9 +24,12 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.common.util.concurrent.ThrottledTaskRunner; import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.Releasable; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchService; import org.elasticsearch.search.dfs.DfsSearchResult; @@ -60,6 +63,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Objects; +import java.util.concurrent.Executor; import java.util.function.BiFunction; import static org.elasticsearch.action.search.SearchTransportAPMMetrics.ACTION_ATTRIBUTE_NAME; @@ -455,9 +459,10 @@ public static void registerRequestHandler( boolean freed = searchService.freeReaderContext(request.id()); channel.sendResponse(SearchFreeContextResponse.of(freed)); }; + final Executor freeContextExecutor = buildFreeContextExecutor(transportService); transportService.registerRequestHandler( FREE_CONTEXT_SCROLL_ACTION_NAME, - transportService.getThreadPool().generic(), + freeContextExecutor, ScrollFreeContextRequest::new, instrumentedHandler(FREE_CONTEXT_SCROLL_ACTION_METRIC, transportService, searchTransportMetrics, freeContextHandler) ); @@ -470,7 +475,7 @@ public static void registerRequestHandler( transportService.registerRequestHandler( FREE_CONTEXT_ACTION_NAME, - transportService.getThreadPool().generic(), + freeContextExecutor, SearchFreeContextRequest::new, instrumentedHandler(FREE_CONTEXT_ACTION_METRIC, transportService, searchTransportMetrics, freeContextHandler) ); @@ -478,7 +483,7 @@ public static void registerRequestHandler( transportService.registerRequestHandler( CLEAR_SCROLL_CONTEXTS_ACTION_NAME, - transportService.getThreadPool().generic(), + freeContextExecutor, ClearScrollContextsRequest::new, instrumentedHandler(CLEAR_SCROLL_CONTEXTS_ACTION_METRIC, transportService, searchTransportMetrics, (request, channel, task) -> { searchService.freeAllScrollContexts(); @@ -626,6 +631,32 @@ public static void registerRequestHandler( TransportActionProxy.registerProxyAction(transportService, QUERY_CAN_MATCH_NODE_NAME, true, CanMatchNodeResponse::new); } + private static Executor buildFreeContextExecutor(TransportService transportService) { + final ThrottledTaskRunner throttledTaskRunner = new ThrottledTaskRunner( + "free_context", + 1, + transportService.getThreadPool().generic() + ); + return r -> throttledTaskRunner.enqueueTask(new ActionListener<>() { + @Override + public void onResponse(Releasable releasable) { + try (releasable) { + r.run(); + } + } + + @Override + public void onFailure(Exception e) { + if (r instanceof AbstractRunnable abstractRunnable) { + abstractRunnable.onFailure(e); + } + // should be impossible, GENERIC pool doesn't reject anything + logger.error("unexpected failure running " + r, e); + assert false : new AssertionError("unexpected failure running " + r, e); + } + }); + } + private static TransportRequestHandler instrumentedHandler( String actionQualifier, TransportService transportService,