Skip to content

Commit

Permalink
Non querying tasks shouldn't use processing buffers / merge buffers (#…
Browse files Browse the repository at this point in the history
…16887)

Tasks that do not support querying or query processing i.e. supportsQueries = false do not require processing threads, processing buffers, and merge buffers.
  • Loading branch information
LakshSingla authored Sep 10, 2024
1 parent 78775ad commit 72fbaf2
Show file tree
Hide file tree
Showing 7 changed files with 352 additions and 61 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.guice;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Provides;
import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.client.cache.CachePopulator;
import org.apache.druid.client.cache.CachePopulatorStats;
import org.apache.druid.collections.BlockingPool;
import org.apache.druid.collections.DummyBlockingPool;
import org.apache.druid.collections.DummyNonBlockingPool;
import org.apache.druid.collections.NonBlockingPool;
import org.apache.druid.guice.annotations.Global;
import org.apache.druid.guice.annotations.Merging;
import org.apache.druid.guice.annotations.Smile;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.java.util.common.lifecycle.Lifecycle;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.ExecutorServiceMonitor;
import org.apache.druid.query.NoopQueryProcessingPool;
import org.apache.druid.query.QueryProcessingPool;
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.groupby.GroupByResourcesReservationPool;

import java.nio.ByteBuffer;

/**
* This module fulfills the dependency injection of query processing and caching resources: buffer pools and
* thread pools on Peon selectively. Only the peons for the tasks supporting queries need to allocate direct buffers
* and thread pools. Thus, this is separate from the {@link DruidProcessingModule} to separate the needs of the peons and
* the historicals
*
* @see DruidProcessingModule
*/
public class PeonProcessingModule implements Module
{
private static final Logger log = new Logger(PeonProcessingModule.class);

@Override
public void configure(Binder binder)
{
DruidProcessingModule.registerConfigsAndMonitor(binder);
}

@Provides
@LazySingleton
public CachePopulator getCachePopulator(
@Smile ObjectMapper smileMapper,
CachePopulatorStats cachePopulatorStats,
CacheConfig cacheConfig
)
{
return DruidProcessingModule.createCachePopulator(smileMapper, cachePopulatorStats, cacheConfig);
}

@Provides
@ManageLifecycle
public QueryProcessingPool getProcessingExecutorPool(
Task task,
DruidProcessingConfig config,
ExecutorServiceMonitor executorServiceMonitor,
Lifecycle lifecycle
)
{
if (task.supportsQueries()) {
return DruidProcessingModule.createProcessingExecutorPool(config, executorServiceMonitor, lifecycle);
} else {
if (config.isNumThreadsConfigured()) {
log.warn(
"Ignoring the configured numThreads[%d] because task[%s] of type[%s] does not support queries",
config.getNumThreads(),
task.getId(),
task.getType()
);
}
return NoopQueryProcessingPool.instance();
}
}

@Provides
@LazySingleton
@Global
public NonBlockingPool<ByteBuffer> getIntermediateResultsPool(Task task, DruidProcessingConfig config)
{
if (task.supportsQueries()) {
return DruidProcessingModule.createIntermediateResultsPool(config);
} else {
return DummyNonBlockingPool.instance();
}
}

@Provides
@LazySingleton
@Merging
public BlockingPool<ByteBuffer> getMergeBufferPool(Task task, DruidProcessingConfig config)
{
if (task.supportsQueries()) {
return DruidProcessingModule.createMergeBufferPool(config);
} else {
if (config.isNumMergeBuffersConfigured()) {
log.warn(
"Ignoring the configured numMergeBuffers[%d] because task[%s] of type[%s] does not support queries",
config.getNumThreads(),
task.getId(),
task.getType()
);
}
return DummyBlockingPool.instance();
}
}

@Provides
@LazySingleton
@Merging
public GroupByResourcesReservationPool getGroupByResourcesReservationPool(
@Merging BlockingPool<ByteBuffer> mergeBufferPool,
GroupByQueryConfig groupByQueryConfig
)
{
return new GroupByResourcesReservationPool(mergeBufferPool, groupByQueryConfig);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,9 @@ default Set<ResourceAction> getInputSourceResources() throws UOE

/**
* True if this task type embeds a query stack, and therefore should preload resources (like broadcast tables)
* that may be needed by queries.
* that may be needed by queries. Tasks supporting queries are also allocated processing buffers, processing threads
* and merge buffers. Those which do not should not assume that these resources are present and must explicitly allocate
* any direct buffers or processing pools if required.
*
* If true, {@link #getQueryRunner(Query)} does not necessarily return nonnull query runners. For example,
* MSQWorkerTask returns true from this method (because it embeds a query stack for running multi-stage queries)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.query;

import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.error.DruidException;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

/**
* Implementation of {@link QueryProcessingPool} that throws when any query execution task unit is submitted to it. It is
* semantically shutdown from the moment it is created, and since the shutdown methods are supposed to be idempotent,
* they do not throw like the execution methods
*/
public class NoopQueryProcessingPool implements QueryProcessingPool
{
private static final NoopQueryProcessingPool INSTANCE = new NoopQueryProcessingPool();

public static NoopQueryProcessingPool instance()
{
return INSTANCE;
}

@Override
public <T, V> ListenableFuture<T> submitRunnerTask(PrioritizedQueryRunnerCallable<T, V> task)
{
throw unsupportedException();
}

@Override
public <T> ListenableFuture<T> submit(Callable<T> callable)
{
throw unsupportedException();
}

@Override
public ListenableFuture<?> submit(Runnable runnable)
{
throw unsupportedException();
}

@Override
public <T> ListenableFuture<T> submit(Runnable runnable, T t)
{
throw unsupportedException();
}

@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> collection)
{
throw unsupportedException();
}

@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> collection, long l, TimeUnit timeUnit)
{
throw unsupportedException();
}

@Override
public void shutdown()
{
// No op, since it is already shutdown
}

@Override
public List<Runnable> shutdownNow()
{
return Collections.emptyList();
}

@Override
public boolean isShutdown()
{
return true;
}

@Override
public boolean isTerminated()
{
return true;
}

@Override
public boolean awaitTermination(long l, TimeUnit timeUnit)
{
return true;
}

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> collection)
{
throw unsupportedException();
}

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> collection, long l, TimeUnit timeUnit)
{
throw unsupportedException();
}

@Override
public void execute(Runnable runnable)
{
throw unsupportedException();
}

private DruidException unsupportedException()
{
return DruidException.defensive("Unexpected call made to NoopQueryProcessingPool");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,13 @@
package org.apache.druid.guice;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.ProvisionException;
import org.apache.druid.client.cache.BackgroundCachePopulator;
import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.client.cache.CachePopulator;
import org.apache.druid.client.cache.CachePopulatorStats;
import org.apache.druid.client.cache.ForegroundCachePopulator;
import org.apache.druid.collections.BlockingPool;
import org.apache.druid.collections.DefaultBlockingPool;
import org.apache.druid.collections.NonBlockingPool;
Expand All @@ -43,35 +40,31 @@
import org.apache.druid.offheap.OffheapBufferGenerator;
import org.apache.druid.query.BrokerParallelMergeConfig;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.ExecutorServiceMonitor;
import org.apache.druid.query.ForwardingQueryProcessingPool;
import org.apache.druid.query.QueryProcessingPool;
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.groupby.GroupByResourcesReservationPool;
import org.apache.druid.server.metrics.MetricsModule;
import org.apache.druid.utils.JvmUtils;

import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;

/**
* This module is used to fulfill dependency injection of query processing and caching resources: buffer pools and
* thread pools on Broker. Broker does not need to be allocated an intermediate results pool.
* This is separated from DruidProcessingModule to separate the needs of the broker from the historicals
*
* @see DruidProcessingModule
*/

public class BrokerProcessingModule implements Module
{
private static final Logger log = new Logger(BrokerProcessingModule.class);

@Override
public void configure(Binder binder)
{
JsonConfigProvider.bind(binder, "druid.processing.merge", BrokerParallelMergeConfig.class);
JsonConfigProvider.bind(binder, "druid.processing", DruidProcessingConfig.class);
MetricsModule.register(binder, ExecutorServiceMonitor.class);
JsonConfigProvider.bind(binder, DruidProcessingModule.PROCESSING_PROPERTY_PREFIX + ".merge", BrokerParallelMergeConfig.class);
DruidProcessingModule.registerConfigsAndMonitor(binder);
}

@Provides
Expand All @@ -82,20 +75,7 @@ public CachePopulator getCachePopulator(
CacheConfig cacheConfig
)
{
if (cacheConfig.getNumBackgroundThreads() > 0) {
final ExecutorService exec = Executors.newFixedThreadPool(
cacheConfig.getNumBackgroundThreads(),
new ThreadFactoryBuilder()
.setNameFormat("background-cacher-%d")
.setDaemon(true)
.setPriority(Thread.MIN_PRIORITY)
.build()
);

return new BackgroundCachePopulator(exec, smileMapper, cachePopulatorStats, cacheConfig.getMaxEntrySize());
} else {
return new ForegroundCachePopulator(smileMapper, cachePopulatorStats, cacheConfig.getMaxEntrySize());
}
return DruidProcessingModule.createCachePopulator(smileMapper, cachePopulatorStats, cacheConfig);
}

@Provides
Expand All @@ -113,7 +93,6 @@ public QueryProcessingPool getProcessingExecutorPool(
public NonBlockingPool<ByteBuffer> getIntermediateResultsPool(DruidProcessingConfig config)
{
verifyDirectMemory(config);

return new StupidPool<>(
"intermediate processing pool",
new OffheapBufferGenerator("intermediate processing", config.intermediateComputeSizeBytes()),
Expand Down
Loading

0 comments on commit 72fbaf2

Please sign in to comment.