From bf5e5e7b75c2e098997fcad87cb2c4af5c3991e5 Mon Sep 17 00:00:00 2001 From: Nishant Date: Wed, 27 Apr 2016 20:59:41 +0530 Subject: [PATCH] fix #2886 (#2887) Fixes https://github.com/druid-io/druid/issues/2886 --- .../overlord/RemoteTaskRunnerFactory.java | 10 +- .../overlord/RemoteTaskRunnerFactoryTest.java | 139 ++++++++++++++++++ 2 files changed, 143 insertions(+), 6 deletions(-) create mode 100644 indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerFactoryTest.java diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunnerFactory.java b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunnerFactory.java index 376034ff2cd9..4901c8c21629 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunnerFactory.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunnerFactory.java @@ -51,10 +51,9 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory workerConfigRef; - private final ScheduledExecutorService cleanupExec; private final SimpleResourceManagementConfig config; private final ResourceManagementSchedulerConfig resourceManagementSchedulerConfig; - private final ScheduledExecutorService resourceManagementExec; + private final ScheduledExecutorFactory factory; @Inject public RemoteTaskRunnerFactory( @@ -75,10 +74,9 @@ public RemoteTaskRunnerFactory( this.jsonMapper = jsonMapper; this.httpClient = httpClient; this.workerConfigRef = workerConfigRef; - this.cleanupExec = factory.create(1, "RemoteTaskRunner-Scheduled-Cleanup--%d"); this.config = config; this.resourceManagementSchedulerConfig = resourceManagementSchedulerConfig; - this.resourceManagementExec = factory.create(1, "RemoteTaskRunner-ResourceManagement--%d"); + this.factory = factory; } @Override @@ -90,7 +88,7 @@ public RemoteTaskRunner build() config, workerConfigRef, resourceManagementSchedulerConfig, - resourceManagementExec + factory.create(1, "RemoteTaskRunner-ResourceManagement--%d") ); } else { resourceManagementStrategy = new NoopResourceManagementStrategy<>(); @@ -106,7 +104,7 @@ public RemoteTaskRunner build() .build(), httpClient, workerConfigRef, - cleanupExec, + factory.create(1, "RemoteTaskRunner-Scheduled-Cleanup--%d"), resourceManagementStrategy ); } diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerFactoryTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerFactoryTest.java new file mode 100644 index 000000000000..fe6020ae4c51 --- /dev/null +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerFactoryTest.java @@ -0,0 +1,139 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package io.druid.indexing.overlord; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Joiner; +import com.google.common.base.Supplier; +import com.metamx.common.concurrent.ScheduledExecutorFactory; +import com.metamx.common.concurrent.ScheduledExecutors; +import com.metamx.http.client.HttpClient; +import io.druid.curator.PotentiallyGzippedCompressionProvider; +import io.druid.indexing.common.TestUtils; +import io.druid.indexing.overlord.autoscaling.ResourceManagementSchedulerConfig; +import io.druid.indexing.overlord.autoscaling.SimpleResourceManagementConfig; +import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig; +import io.druid.indexing.overlord.setup.WorkerBehaviorConfig; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.server.initialization.IndexerZkConfig; +import io.druid.server.initialization.ZkPathsConfig; +import junit.framework.Assert; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.curator.test.TestingCluster; +import org.easymock.EasyMock; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicInteger; + +public class RemoteTaskRunnerFactoryTest +{ + private static final Joiner joiner = Joiner.on("/"); + private static final String basePath = "/test/druid"; + private TestingCluster testingCluster; + private CuratorFramework cf; + private ObjectMapper jsonMapper; + + + @Before + public void setUp() throws Exception + { + TestUtils testUtils = new TestUtils(); + jsonMapper = testUtils.getTestObjectMapper(); + + testingCluster = new TestingCluster(1); + testingCluster.start(); + + cf = CuratorFrameworkFactory.builder() + .connectString(testingCluster.getConnectString()) + .retryPolicy(new ExponentialBackoffRetry(1, 10)) + .compressionProvider(new PotentiallyGzippedCompressionProvider(false)) + .build(); + cf.start(); + cf.blockUntilConnected(); + } + + @After + public void tearDown() throws Exception + { + cf.close(); + testingCluster.stop(); + } + + @Test + public void testExecNotSharedBetweenRunners() + { + final AtomicInteger executorCount = new AtomicInteger(0); + RemoteTaskRunnerConfig config = new RemoteTaskRunnerConfig(); + IndexerZkConfig indexerZkConfig = new IndexerZkConfig( + new ZkPathsConfig() + { + @Override + public String getBase() + { + return basePath; + } + }, null, null, null, null, null + ); + + HttpClient httpClient = EasyMock.createMock(HttpClient.class); + Supplier workerBehaviorConfig = EasyMock.createMock(Supplier.class); + ScheduledExecutorFactory executorFactory = new ScheduledExecutorFactory() + { + @Override + public ScheduledExecutorService create(int i, String s) + { + executorCount.incrementAndGet(); + return ScheduledExecutors.fixed(i, s); + } + }; + SimpleResourceManagementConfig resourceManagementConfig = new SimpleResourceManagementConfig(); + ResourceManagementSchedulerConfig resourceManagementSchedulerConfig = new ResourceManagementSchedulerConfig() + { + @Override + public boolean isDoAutoscale() + { + return true; + } + }; + RemoteTaskRunnerFactory factory = new RemoteTaskRunnerFactory( + cf, + config, + indexerZkConfig, + jsonMapper, + httpClient, + workerBehaviorConfig, + executorFactory, + resourceManagementConfig, + resourceManagementSchedulerConfig + ); + Assert.assertEquals(0, executorCount.get()); + RemoteTaskRunner remoteTaskRunner1 = factory.build(); + Assert.assertEquals(2, executorCount.get()); + RemoteTaskRunner remoteTaskRunner2 = factory.build(); + Assert.assertEquals(4, executorCount.get()); + + } +}