From 72fbaf2e56e7606e4ba8862dbcd70a2687bb843b Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Tue, 10 Sep 2024 11:36:36 +0530 Subject: [PATCH] Non querying tasks shouldn't use processing buffers / merge buffers (#16887) Tasks that do not support querying or query processing i.e. supportsQueries = false do not require processing threads, processing buffers, and merge buffers. --- .../druid/guice/PeonProcessingModule.java | 143 ++++++++++++++++++ .../druid/indexing/common/task/Task.java | 4 +- .../druid/query/NoopQueryProcessingPool.java | 134 ++++++++++++++++ .../druid/guice/BrokerProcessingModule.java | 31 +--- .../druid/guice/DruidProcessingModule.java | 85 ++++++++--- .../druid/guice/RouterProcessingModule.java | 12 +- .../java/org/apache/druid/cli/CliPeon.java | 4 +- 7 files changed, 352 insertions(+), 61 deletions(-) create mode 100644 indexing-service/src/main/java/org/apache/druid/guice/PeonProcessingModule.java create mode 100644 processing/src/main/java/org/apache/druid/query/NoopQueryProcessingPool.java diff --git a/indexing-service/src/main/java/org/apache/druid/guice/PeonProcessingModule.java b/indexing-service/src/main/java/org/apache/druid/guice/PeonProcessingModule.java new file mode 100644 index 000000000000..8961da7a5559 --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/guice/PeonProcessingModule.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.guice; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.inject.Binder; +import com.google.inject.Module; +import com.google.inject.Provides; +import org.apache.druid.client.cache.CacheConfig; +import org.apache.druid.client.cache.CachePopulator; +import org.apache.druid.client.cache.CachePopulatorStats; +import org.apache.druid.collections.BlockingPool; +import org.apache.druid.collections.DummyBlockingPool; +import org.apache.druid.collections.DummyNonBlockingPool; +import org.apache.druid.collections.NonBlockingPool; +import org.apache.druid.guice.annotations.Global; +import org.apache.druid.guice.annotations.Merging; +import org.apache.druid.guice.annotations.Smile; +import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.java.util.common.lifecycle.Lifecycle; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.query.DruidProcessingConfig; +import org.apache.druid.query.ExecutorServiceMonitor; +import org.apache.druid.query.NoopQueryProcessingPool; +import org.apache.druid.query.QueryProcessingPool; +import org.apache.druid.query.groupby.GroupByQueryConfig; +import org.apache.druid.query.groupby.GroupByResourcesReservationPool; + +import java.nio.ByteBuffer; + +/** + * This module fulfills the dependency injection of query processing and caching resources: buffer pools and + * thread pools on Peon selectively. Only the peons for the tasks supporting queries need to allocate direct buffers + * and thread pools. Thus, this is separate from the {@link DruidProcessingModule} to separate the needs of the peons and + * the historicals + * + * @see DruidProcessingModule + */ +public class PeonProcessingModule implements Module +{ + private static final Logger log = new Logger(PeonProcessingModule.class); + + @Override + public void configure(Binder binder) + { + DruidProcessingModule.registerConfigsAndMonitor(binder); + } + + @Provides + @LazySingleton + public CachePopulator getCachePopulator( + @Smile ObjectMapper smileMapper, + CachePopulatorStats cachePopulatorStats, + CacheConfig cacheConfig + ) + { + return DruidProcessingModule.createCachePopulator(smileMapper, cachePopulatorStats, cacheConfig); + } + + @Provides + @ManageLifecycle + public QueryProcessingPool getProcessingExecutorPool( + Task task, + DruidProcessingConfig config, + ExecutorServiceMonitor executorServiceMonitor, + Lifecycle lifecycle + ) + { + if (task.supportsQueries()) { + return DruidProcessingModule.createProcessingExecutorPool(config, executorServiceMonitor, lifecycle); + } else { + if (config.isNumThreadsConfigured()) { + log.warn( + "Ignoring the configured numThreads[%d] because task[%s] of type[%s] does not support queries", + config.getNumThreads(), + task.getId(), + task.getType() + ); + } + return NoopQueryProcessingPool.instance(); + } + } + + @Provides + @LazySingleton + @Global + public NonBlockingPool getIntermediateResultsPool(Task task, DruidProcessingConfig config) + { + if (task.supportsQueries()) { + return DruidProcessingModule.createIntermediateResultsPool(config); + } else { + return DummyNonBlockingPool.instance(); + } + } + + @Provides + @LazySingleton + @Merging + public BlockingPool getMergeBufferPool(Task task, DruidProcessingConfig config) + { + if (task.supportsQueries()) { + return DruidProcessingModule.createMergeBufferPool(config); + } else { + if (config.isNumMergeBuffersConfigured()) { + log.warn( + "Ignoring the configured numMergeBuffers[%d] because task[%s] of type[%s] does not support queries", + config.getNumThreads(), + task.getId(), + task.getType() + ); + } + return DummyBlockingPool.instance(); + } + } + + @Provides + @LazySingleton + @Merging + public GroupByResourcesReservationPool getGroupByResourcesReservationPool( + @Merging BlockingPool mergeBufferPool, + GroupByQueryConfig groupByQueryConfig + ) + { + return new GroupByResourcesReservationPool(mergeBufferPool, groupByQueryConfig); + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java index 9b882e2e8d2b..003b39e606b0 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java @@ -176,7 +176,9 @@ default Set getInputSourceResources() throws UOE /** * True if this task type embeds a query stack, and therefore should preload resources (like broadcast tables) - * that may be needed by queries. + * that may be needed by queries. Tasks supporting queries are also allocated processing buffers, processing threads + * and merge buffers. Those which do not should not assume that these resources are present and must explicitly allocate + * any direct buffers or processing pools if required. * * If true, {@link #getQueryRunner(Query)} does not necessarily return nonnull query runners. For example, * MSQWorkerTask returns true from this method (because it embeds a query stack for running multi-stage queries) diff --git a/processing/src/main/java/org/apache/druid/query/NoopQueryProcessingPool.java b/processing/src/main/java/org/apache/druid/query/NoopQueryProcessingPool.java new file mode 100644 index 000000000000..efb9a53776a6 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/NoopQueryProcessingPool.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query; + +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.druid.error.DruidException; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +/** + * Implementation of {@link QueryProcessingPool} that throws when any query execution task unit is submitted to it. It is + * semantically shutdown from the moment it is created, and since the shutdown methods are supposed to be idempotent, + * they do not throw like the execution methods + */ +public class NoopQueryProcessingPool implements QueryProcessingPool +{ + private static final NoopQueryProcessingPool INSTANCE = new NoopQueryProcessingPool(); + + public static NoopQueryProcessingPool instance() + { + return INSTANCE; + } + + @Override + public ListenableFuture submitRunnerTask(PrioritizedQueryRunnerCallable task) + { + throw unsupportedException(); + } + + @Override + public ListenableFuture submit(Callable callable) + { + throw unsupportedException(); + } + + @Override + public ListenableFuture submit(Runnable runnable) + { + throw unsupportedException(); + } + + @Override + public ListenableFuture submit(Runnable runnable, T t) + { + throw unsupportedException(); + } + + @Override + public List> invokeAll(Collection> collection) + { + throw unsupportedException(); + } + + @Override + public List> invokeAll(Collection> collection, long l, TimeUnit timeUnit) + { + throw unsupportedException(); + } + + @Override + public void shutdown() + { + // No op, since it is already shutdown + } + + @Override + public List shutdownNow() + { + return Collections.emptyList(); + } + + @Override + public boolean isShutdown() + { + return true; + } + + @Override + public boolean isTerminated() + { + return true; + } + + @Override + public boolean awaitTermination(long l, TimeUnit timeUnit) + { + return true; + } + + @Override + public T invokeAny(Collection> collection) + { + throw unsupportedException(); + } + + @Override + public T invokeAny(Collection> collection, long l, TimeUnit timeUnit) + { + throw unsupportedException(); + } + + @Override + public void execute(Runnable runnable) + { + throw unsupportedException(); + } + + private DruidException unsupportedException() + { + return DruidException.defensive("Unexpected call made to NoopQueryProcessingPool"); + } +} diff --git a/server/src/main/java/org/apache/druid/guice/BrokerProcessingModule.java b/server/src/main/java/org/apache/druid/guice/BrokerProcessingModule.java index d70a2157e154..bc12e929219e 100644 --- a/server/src/main/java/org/apache/druid/guice/BrokerProcessingModule.java +++ b/server/src/main/java/org/apache/druid/guice/BrokerProcessingModule.java @@ -20,16 +20,13 @@ package org.apache.druid.guice; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.inject.Binder; import com.google.inject.Module; import com.google.inject.Provides; import com.google.inject.ProvisionException; -import org.apache.druid.client.cache.BackgroundCachePopulator; import org.apache.druid.client.cache.CacheConfig; import org.apache.druid.client.cache.CachePopulator; import org.apache.druid.client.cache.CachePopulatorStats; -import org.apache.druid.client.cache.ForegroundCachePopulator; import org.apache.druid.collections.BlockingPool; import org.apache.druid.collections.DefaultBlockingPool; import org.apache.druid.collections.NonBlockingPool; @@ -43,25 +40,22 @@ import org.apache.druid.offheap.OffheapBufferGenerator; import org.apache.druid.query.BrokerParallelMergeConfig; import org.apache.druid.query.DruidProcessingConfig; -import org.apache.druid.query.ExecutorServiceMonitor; import org.apache.druid.query.ForwardingQueryProcessingPool; import org.apache.druid.query.QueryProcessingPool; import org.apache.druid.query.groupby.GroupByQueryConfig; import org.apache.druid.query.groupby.GroupByResourcesReservationPool; -import org.apache.druid.server.metrics.MetricsModule; import org.apache.druid.utils.JvmUtils; import java.nio.ByteBuffer; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.ForkJoinPool; /** * This module is used to fulfill dependency injection of query processing and caching resources: buffer pools and * thread pools on Broker. Broker does not need to be allocated an intermediate results pool. * This is separated from DruidProcessingModule to separate the needs of the broker from the historicals + * + * @see DruidProcessingModule */ - public class BrokerProcessingModule implements Module { private static final Logger log = new Logger(BrokerProcessingModule.class); @@ -69,9 +63,8 @@ public class BrokerProcessingModule implements Module @Override public void configure(Binder binder) { - JsonConfigProvider.bind(binder, "druid.processing.merge", BrokerParallelMergeConfig.class); - JsonConfigProvider.bind(binder, "druid.processing", DruidProcessingConfig.class); - MetricsModule.register(binder, ExecutorServiceMonitor.class); + JsonConfigProvider.bind(binder, DruidProcessingModule.PROCESSING_PROPERTY_PREFIX + ".merge", BrokerParallelMergeConfig.class); + DruidProcessingModule.registerConfigsAndMonitor(binder); } @Provides @@ -82,20 +75,7 @@ public CachePopulator getCachePopulator( CacheConfig cacheConfig ) { - if (cacheConfig.getNumBackgroundThreads() > 0) { - final ExecutorService exec = Executors.newFixedThreadPool( - cacheConfig.getNumBackgroundThreads(), - new ThreadFactoryBuilder() - .setNameFormat("background-cacher-%d") - .setDaemon(true) - .setPriority(Thread.MIN_PRIORITY) - .build() - ); - - return new BackgroundCachePopulator(exec, smileMapper, cachePopulatorStats, cacheConfig.getMaxEntrySize()); - } else { - return new ForegroundCachePopulator(smileMapper, cachePopulatorStats, cacheConfig.getMaxEntrySize()); - } + return DruidProcessingModule.createCachePopulator(smileMapper, cachePopulatorStats, cacheConfig); } @Provides @@ -113,7 +93,6 @@ public QueryProcessingPool getProcessingExecutorPool( public NonBlockingPool getIntermediateResultsPool(DruidProcessingConfig config) { verifyDirectMemory(config); - return new StupidPool<>( "intermediate processing pool", new OffheapBufferGenerator("intermediate processing", config.intermediateComputeSizeBytes()), diff --git a/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java b/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java index a2daa25e214a..4879b5cd3c7b 100644 --- a/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java +++ b/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java @@ -59,13 +59,14 @@ */ public class DruidProcessingModule implements Module { + public static final String PROCESSING_PROPERTY_PREFIX = "druid.processing"; + private static final Logger log = new Logger(DruidProcessingModule.class); @Override public void configure(Binder binder) { - JsonConfigProvider.bind(binder, "druid.processing", DruidProcessingConfig.class); - MetricsModule.register(binder, ExecutorServiceMonitor.class); + registerConfigsAndMonitor(binder); } @Provides @@ -75,6 +76,59 @@ public CachePopulator getCachePopulator( CachePopulatorStats cachePopulatorStats, CacheConfig cacheConfig ) + { + return createCachePopulator(smileMapper, cachePopulatorStats, cacheConfig); + } + + @Provides + @ManageLifecycle + public QueryProcessingPool getProcessingExecutorPool( + DruidProcessingConfig config, + ExecutorServiceMonitor executorServiceMonitor, + Lifecycle lifecycle + ) + { + return createProcessingExecutorPool(config, executorServiceMonitor, lifecycle); + } + + @Provides + @LazySingleton + @Global + public NonBlockingPool getIntermediateResultsPool(DruidProcessingConfig config) + { + return createIntermediateResultsPool(config); + } + + @Provides + @LazySingleton + @Merging + public BlockingPool getMergeBufferPool(DruidProcessingConfig config) + { + return createMergeBufferPool(config); + } + + @Provides + @LazySingleton + @Merging + public GroupByResourcesReservationPool getGroupByResourcesReservationPool( + @Merging BlockingPool mergeBufferPool, + GroupByQueryConfig groupByQueryConfig + ) + { + return new GroupByResourcesReservationPool(mergeBufferPool, groupByQueryConfig); + } + + public static void registerConfigsAndMonitor(Binder binder) + { + JsonConfigProvider.bind(binder, PROCESSING_PROPERTY_PREFIX, DruidProcessingConfig.class); + MetricsModule.register(binder, ExecutorServiceMonitor.class); + } + + public static CachePopulator createCachePopulator( + ObjectMapper smileMapper, + CachePopulatorStats cachePopulatorStats, + CacheConfig cacheConfig + ) { if (cacheConfig.getNumBackgroundThreads() > 0) { final ExecutorService exec = Executors.newFixedThreadPool( @@ -92,9 +146,7 @@ public CachePopulator getCachePopulator( } } - @Provides - @ManageLifecycle - public QueryProcessingPool getProcessingExecutorPool( + public static QueryProcessingPool createProcessingExecutorPool( DruidProcessingConfig config, ExecutorServiceMonitor executorServiceMonitor, Lifecycle lifecycle @@ -109,10 +161,7 @@ public QueryProcessingPool getProcessingExecutorPool( ); } - @Provides - @LazySingleton - @Global - public NonBlockingPool getIntermediateResultsPool(DruidProcessingConfig config) + public static NonBlockingPool createIntermediateResultsPool(final DruidProcessingConfig config) { verifyDirectMemory(config); return new StupidPool<>( @@ -123,10 +172,7 @@ public NonBlockingPool getIntermediateResultsPool(DruidProcessingCon ); } - @Provides - @LazySingleton - @Merging - public BlockingPool getMergeBufferPool(DruidProcessingConfig config) + public static BlockingPool createMergeBufferPool(final DruidProcessingConfig config) { verifyDirectMemory(config); return new DefaultBlockingPool<>( @@ -135,18 +181,7 @@ public BlockingPool getMergeBufferPool(DruidProcessingConfig config) ); } - @Provides - @LazySingleton - @Merging - public GroupByResourcesReservationPool getGroupByResourcesReservationPool( - @Merging BlockingPool mergeBufferPool, - GroupByQueryConfig groupByQueryConfig - ) - { - return new GroupByResourcesReservationPool(mergeBufferPool, groupByQueryConfig); - } - - private void verifyDirectMemory(DruidProcessingConfig config) + private static void verifyDirectMemory(DruidProcessingConfig config) { try { final long maxDirectMemory = JvmUtils.getRuntimeInfo().getDirectMemorySizeBytes(); diff --git a/server/src/main/java/org/apache/druid/guice/RouterProcessingModule.java b/server/src/main/java/org/apache/druid/guice/RouterProcessingModule.java index f76b5ed940d6..85357a7fa04f 100644 --- a/server/src/main/java/org/apache/druid/guice/RouterProcessingModule.java +++ b/server/src/main/java/org/apache/druid/guice/RouterProcessingModule.java @@ -28,15 +28,12 @@ import org.apache.druid.collections.NonBlockingPool; import org.apache.druid.guice.annotations.Global; import org.apache.druid.guice.annotations.Merging; -import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.query.DruidProcessingConfig; -import org.apache.druid.query.ExecutorServiceMonitor; -import org.apache.druid.query.ForwardingQueryProcessingPool; +import org.apache.druid.query.NoopQueryProcessingPool; import org.apache.druid.query.QueryProcessingPool; import org.apache.druid.query.groupby.GroupByQueryConfig; import org.apache.druid.query.groupby.GroupByResourcesReservationPool; -import org.apache.druid.server.metrics.MetricsModule; import java.nio.ByteBuffer; @@ -46,6 +43,8 @@ * {@link org.apache.druid.query.QueryToolChest}s, and they couple query type aspects not related to processing and * caching, which Router uses, and related to processing and caching, which Router doesn't use, but they inject the * resources. + * + * @see DruidProcessingModule */ public class RouterProcessingModule implements Module { @@ -54,8 +53,7 @@ public class RouterProcessingModule implements Module @Override public void configure(Binder binder) { - JsonConfigProvider.bind(binder, "druid.processing", DruidProcessingConfig.class); - MetricsModule.register(binder, ExecutorServiceMonitor.class); + DruidProcessingModule.registerConfigsAndMonitor(binder); } @Provides @@ -65,7 +63,7 @@ public QueryProcessingPool getProcessingExecutorPool(DruidProcessingConfig confi if (config.isNumThreadsConfigured()) { log.warn("numThreads[%d] configured, that is ignored on Router", config.getNumThreads()); } - return new ForwardingQueryProcessingPool(Execs.dummy()); + return NoopQueryProcessingPool.instance(); } @Provides diff --git a/services/src/main/java/org/apache/druid/cli/CliPeon.java b/services/src/main/java/org/apache/druid/cli/CliPeon.java index eb572850cda2..61a8ab7374e4 100644 --- a/services/src/main/java/org/apache/druid/cli/CliPeon.java +++ b/services/src/main/java/org/apache/druid/cli/CliPeon.java @@ -47,7 +47,6 @@ import org.apache.druid.discovery.NodeRole; import org.apache.druid.guice.Binders; import org.apache.druid.guice.CacheModule; -import org.apache.druid.guice.DruidProcessingModule; import org.apache.druid.guice.IndexingServiceInputSourceModule; import org.apache.druid.guice.IndexingServiceTaskLogsModule; import org.apache.druid.guice.IndexingServiceTuningConfigModule; @@ -58,6 +57,7 @@ import org.apache.druid.guice.LifecycleModule; import org.apache.druid.guice.ManageLifecycle; import org.apache.druid.guice.ManageLifecycleServer; +import org.apache.druid.guice.PeonProcessingModule; import org.apache.druid.guice.PolyBind; import org.apache.druid.guice.QueryRunnerFactoryModule; import org.apache.druid.guice.QueryableModule; @@ -205,7 +205,7 @@ public void configure(Properties properties) protected List getModules() { return ImmutableList.of( - new DruidProcessingModule(), + new PeonProcessingModule(), new QueryableModule(), new QueryRunnerFactoryModule(), new SegmentWranglerModule(),