From 5678a76818c7ca00b4f558a65f7698068a798754 Mon Sep 17 00:00:00 2001 From: Nishant Date: Mon, 2 May 2016 20:47:38 +0530 Subject: [PATCH 1/3] Optimize CostBalancerStrategy Ignore benchmark test in normal run fix test review comments fix compilation fix test --- .../coordinator/BalancerStrategyFactory.java | 4 +- .../coordinator/CostBalancerStrategy.java | 33 +++-- .../CostBalancerStrategyFactory.java | 16 ++- .../server/coordinator/DruidCoordinator.java | 18 ++- .../DruidCoordinatorRuntimeParams.java | 2 - .../RandomBalancerStrategyFactory.java | 8 ++ .../CostBalancerStrategyBenchmark.java | 114 ++++++++++++++++++ .../CostBalancerStrategyTest.java | 72 ++++------- .../DruidCoordinatorBalancerTest.java | 19 ++- .../DruidCoordinatorRuleRunnerTest.java | 30 +++++ .../coordinator/rules/LoadRuleTest.java | 21 +++- 11 files changed, 258 insertions(+), 79 deletions(-) create mode 100644 server/src/test/java/io/druid/server/coordination/CostBalancerStrategyBenchmark.java diff --git a/server/src/main/java/io/druid/server/coordinator/BalancerStrategyFactory.java b/server/src/main/java/io/druid/server/coordinator/BalancerStrategyFactory.java index 1f840c6c86a9..be11848dc0be 100644 --- a/server/src/main/java/io/druid/server/coordinator/BalancerStrategyFactory.java +++ b/server/src/main/java/io/druid/server/coordinator/BalancerStrategyFactory.java @@ -20,7 +20,9 @@ import org.joda.time.DateTime; -public interface BalancerStrategyFactory +import java.io.Closeable; + +public interface BalancerStrategyFactory extends Closeable { public BalancerStrategy createBalancerStrategy(DateTime referenceTimestamp); } diff --git a/server/src/main/java/io/druid/server/coordinator/CostBalancerStrategy.java b/server/src/main/java/io/druid/server/coordinator/CostBalancerStrategy.java index 2a52ba16b1aa..32a66ef9e62b 100644 --- a/server/src/main/java/io/druid/server/coordinator/CostBalancerStrategy.java +++ b/server/src/main/java/io/druid/server/coordinator/CostBalancerStrategy.java @@ -32,7 +32,7 @@ import java.util.List; import java.util.concurrent.Callable; -import java.util.concurrent.Executors; +import java.util.concurrent.ExecutorService; public class CostBalancerStrategy implements BalancerStrategy { @@ -41,12 +41,23 @@ public class CostBalancerStrategy implements BalancerStrategy private static final long SEVEN_DAYS_IN_MILLIS = 7 * DAY_IN_MILLIS; private static final long THIRTY_DAYS_IN_MILLIS = 30 * DAY_IN_MILLIS; private final long referenceTimestamp; - private final int threadCount; + private final ListeningExecutorService exec; - public CostBalancerStrategy(DateTime referenceTimestamp, int threadCount) + public static long gapMillis(Interval interval1, Interval interval2) + { + if (interval1.getStartMillis() > interval2.getEndMillis()) { + return interval1.getStartMillis() - interval2.getEndMillis(); + } else if (interval2.getStartMillis() > interval1.getEndMillis()) { + return interval2.getStartMillis() - interval1.getEndMillis(); + } else { + return 0; + } + } + + public CostBalancerStrategy(DateTime referenceTimestamp, ExecutorService exec) { this.referenceTimestamp = referenceTimestamp.getMillis(); - this.threadCount = threadCount; + this.exec = MoreExecutors.listeningDecorator(exec); } @Override @@ -85,7 +96,7 @@ public ServerHolder findNewSegmentHomeBalancer( */ public double computeJointSegmentCosts(final DataSegment segment1, final DataSegment segment2) { - final Interval gap = segment1.getInterval().gap(segment2.getInterval()); + final long gapMillis = gapMillis(segment1.getInterval(), segment2.getInterval()); final double baseCost = Math.min(segment1.getSize(), segment2.getSize()); double recencyPenalty = 1; @@ -103,10 +114,9 @@ public double computeJointSegmentCosts(final DataSegment segment1, final DataSeg } /** gap is null if the two segment intervals overlap or if they're adjacent */ - if (gap == null) { + if (gapMillis == 0) { gapPenalty = 2; } else { - long gapMillis = gap.toDurationMillis(); if (gapMillis < THIRTY_DAYS_IN_MILLIS) { gapPenalty = 2 - gapMillis / THIRTY_DAYS_IN_MILLIS; } @@ -151,8 +161,8 @@ public double calculateInitialTotalCost(final List serverHolders) * @param serverHolders A list of ServerHolders for a particular tier. * * @return The normalization value (the sum of the diagonal entries in the - * pairwise cost matrix). This is the cost of a cluster if each - * segment were to get its own historical node. + * pairwise cost matrix). This is the cost of a cluster if each + * segment were to get its own historical node. */ public double calculateNormalization(final List serverHolders) { @@ -234,12 +244,11 @@ protected Pair chooseBestServer( { Pair bestServer = Pair.of(Double.POSITIVE_INFINITY, null); - ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(threadCount)); List>> futures = Lists.newArrayList(); for (final ServerHolder server : serverHolders) { futures.add( - service.submit( + exec.submit( new Callable>() { @Override @@ -264,9 +273,7 @@ public Pair call() throws Exception catch (Exception e) { log.makeAlert(e, "Cost Balancer Multithread strategy wasn't able to complete cost computation.").emit(); } - service.shutdown(); return bestServer; } - } diff --git a/server/src/main/java/io/druid/server/coordinator/CostBalancerStrategyFactory.java b/server/src/main/java/io/druid/server/coordinator/CostBalancerStrategyFactory.java index f00b5b98e8f9..63c2f1c195e5 100644 --- a/server/src/main/java/io/druid/server/coordinator/CostBalancerStrategyFactory.java +++ b/server/src/main/java/io/druid/server/coordinator/CostBalancerStrategyFactory.java @@ -20,18 +20,28 @@ import org.joda.time.DateTime; +import java.io.IOException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + public class CostBalancerStrategyFactory implements BalancerStrategyFactory { - private final int threadCount; + private final ExecutorService exec; public CostBalancerStrategyFactory(int costBalancerStrategyThreadCount) { - this.threadCount = costBalancerStrategyThreadCount; + this.exec = Executors.newFixedThreadPool(costBalancerStrategyThreadCount); } @Override public BalancerStrategy createBalancerStrategy(DateTime referenceTimestamp) { - return new CostBalancerStrategy(referenceTimestamp, threadCount); + return new CostBalancerStrategy(referenceTimestamp, exec); + } + + @Override + public void close() throws IOException + { + exec.shutdownNow(); } } diff --git a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java index 3b2d20b9e0ae..64d02a346f43 100644 --- a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java @@ -698,13 +698,21 @@ public void run() .withEmitter(emitter) .withBalancerStrategyFactory(factory) .build(); - - for (DruidCoordinatorHelper helper : helpers) { - // Don't read state and run state in the same helper otherwise racy conditions may exist - if (leader && startingLeaderCounter == leaderCounter) { - params = helper.run(params); + try { + for (DruidCoordinatorHelper helper : helpers) { + // Don't read state and run state in the same helper otherwise racy conditions may exist + if (leader && startingLeaderCounter == leaderCounter) { + params = helper.run(params); + } } } + finally { + if (params.getBalancerStrategyFactory() != null) { + params.getBalancerStrategyFactory().close(); + } + } + + } catch (Exception e) { log.makeAlert(e, "Caught exception, ignoring so that schedule keeps going.").emit(); diff --git a/server/src/main/java/io/druid/server/coordinator/DruidCoordinatorRuntimeParams.java b/server/src/main/java/io/druid/server/coordinator/DruidCoordinatorRuntimeParams.java index 742999ce2439..695abc06e60f 100644 --- a/server/src/main/java/io/druid/server/coordinator/DruidCoordinatorRuntimeParams.java +++ b/server/src/main/java/io/druid/server/coordinator/DruidCoordinatorRuntimeParams.java @@ -21,7 +21,6 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import com.metamx.common.guava.Comparators; import com.metamx.emitter.service.ServiceEmitter; import io.druid.client.DruidDataSource; import io.druid.metadata.MetadataRuleManager; @@ -206,7 +205,6 @@ public static class Builder this.stats = new CoordinatorStats(); this.coordinatorDynamicConfig = new CoordinatorDynamicConfig.Builder().build(); this.balancerReferenceTimestamp = DateTime.now(); - this.strategyFactory = new CostBalancerStrategyFactory(1); } Builder( diff --git a/server/src/main/java/io/druid/server/coordinator/RandomBalancerStrategyFactory.java b/server/src/main/java/io/druid/server/coordinator/RandomBalancerStrategyFactory.java index ed1762d76d04..773ff16efd4f 100644 --- a/server/src/main/java/io/druid/server/coordinator/RandomBalancerStrategyFactory.java +++ b/server/src/main/java/io/druid/server/coordinator/RandomBalancerStrategyFactory.java @@ -20,6 +20,8 @@ import org.joda.time.DateTime; +import java.io.IOException; + public class RandomBalancerStrategyFactory implements BalancerStrategyFactory { @Override @@ -27,4 +29,10 @@ public BalancerStrategy createBalancerStrategy(DateTime referenceTimestamp) { return new RandomBalancerStrategy(); } + + @Override + public void close() throws IOException + { + // No-op + } } diff --git a/server/src/test/java/io/druid/server/coordination/CostBalancerStrategyBenchmark.java b/server/src/test/java/io/druid/server/coordination/CostBalancerStrategyBenchmark.java new file mode 100644 index 000000000000..0892f7a25c80 --- /dev/null +++ b/server/src/test/java/io/druid/server/coordination/CostBalancerStrategyBenchmark.java @@ -0,0 +1,114 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.coordination; + +import com.carrotsearch.junitbenchmarks.AbstractBenchmark; +import com.carrotsearch.junitbenchmarks.BenchmarkOptions; +import io.druid.server.coordinator.CostBalancerStrategy; +import io.druid.server.coordinator.CostBalancerStrategyFactory; +import io.druid.server.coordinator.ServerHolder; +import io.druid.timeline.DataSegment; +import org.joda.time.DateTime; +import org.joda.time.Interval; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Arrays; +import java.util.List; + +@Ignore +@RunWith(Parameterized.class) +public class CostBalancerStrategyBenchmark extends AbstractBenchmark +{ + @Parameterized.Parameters + public static List factoryClasses() + { + return Arrays.asList( + (CostBalancerStrategyFactory[]) Arrays.asList( + new CostBalancerStrategyFactory(1) + ).toArray(), + (CostBalancerStrategyFactory[]) Arrays.asList( + new CostBalancerStrategyFactory(4) + ).toArray() + ); + } + + private final CostBalancerStrategy strategy; + + public CostBalancerStrategyBenchmark(CostBalancerStrategyFactory factory) + { + this.strategy = (CostBalancerStrategy) factory.createBalancerStrategy(DateTime.now()); + } + + private static List serverHolderList; + volatile ServerHolder selected; + + @BeforeClass + public static void setup() + { + serverHolderList = CostBalancerStrategyTest.setupDummyCluster(5, 20000); + } + + @AfterClass + public static void tearDown(){ + serverHolderList = null; + } + + @Test + @BenchmarkOptions(warmupRounds = 10, benchmarkRounds = 1000) + public void testBenchmark() throws InterruptedException + { + DataSegment segment = CostBalancerStrategyTest.getSegment(1000, "testds", interval1); + selected = strategy.findNewSegmentHomeReplicator(segment, serverHolderList); + } + + + // Benchmark Joda Interval Gap impl vs CostBalancer.gapMillis + private final Interval interval1 = new Interval("2015-01-01T01:00:00Z/2015-01-01T02:00:00Z"); + private final Interval interval2 = new Interval("2015-02-01T01:00:00Z/2015-02-01T02:00:00Z"); + volatile Long sum; + + @BenchmarkOptions(warmupRounds = 1000, benchmarkRounds = 1000000) + @Test + public void testJodaGap() + { + long diff = 0; + for (int i = 0; i < 1000; i++) { + diff = diff + interval1.gap(interval2).toDurationMillis(); + } + sum = diff; + } + + @BenchmarkOptions(warmupRounds = 1000, benchmarkRounds = 1000000) + @Test + public void testBalancerGapMillis() + { + long diff = 0; + for (int i = 0; i < 1000; i++) { + diff = diff + CostBalancerStrategy.gapMillis(interval1, interval2); + } + sum = diff; + } + +} diff --git a/server/src/test/java/io/druid/server/coordination/CostBalancerStrategyTest.java b/server/src/test/java/io/druid/server/coordination/CostBalancerStrategyTest.java index 07884b78f910..d4e338332cb4 100644 --- a/server/src/test/java/io/druid/server/coordination/CostBalancerStrategyTest.java +++ b/server/src/test/java/io/druid/server/coordination/CostBalancerStrategyTest.java @@ -19,9 +19,12 @@ package io.druid.server.coordination; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import io.druid.client.ImmutableDruidDataSource; import io.druid.client.ImmutableDruidServer; +import io.druid.concurrent.Execs; import io.druid.server.coordinator.BalancerStrategy; import io.druid.server.coordinator.CostBalancerStrategy; import io.druid.server.coordinator.LoadQueuePeonTester; @@ -37,39 +40,39 @@ import java.util.List; import java.util.Map; +import java.util.concurrent.Executors; public class CostBalancerStrategyTest { - private final List serverHolderList = Lists.newArrayList(); - private final Interval day = DateTime.now().toDateMidnight().toInterval(); + private static final Interval day = DateTime.now().toDateMidnight().toInterval(); /** - * Create Druid cluster with 10 servers having 100 segments each, and 1 server with 98 segment + * Create Druid cluster with serverCount servers having maxSegments segments each, and 1 server with 98 segment * Cost Balancer Strategy should assign the next segment to the server with less segments. */ - public void setupDummyCluster(int serverCount, int maxSegments) + public static List setupDummyCluster(int serverCount, int maxSegments) { + List serverHolderList = Lists.newArrayList(); // Create 10 servers with current size being 3K & max size being 10K // Each having having 100 segments for (int i = 0; i < serverCount; i++) { LoadQueuePeonTester fromPeon = new LoadQueuePeonTester(); - ImmutableDruidServer druidServer = EasyMock.createMock(ImmutableDruidServer.class); - EasyMock.expect(druidServer.getName()).andReturn("DruidServer_Name_" + i).anyTimes(); - EasyMock.expect(druidServer.getCurrSize()).andReturn(3000L).anyTimes(); - EasyMock.expect(druidServer.getMaxSize()).andReturn(10000000L).anyTimes(); - - EasyMock.expect(druidServer.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes(); Map segments = Maps.newHashMap(); for (int j = 0; j < maxSegments; j++) { DataSegment segment = getSegment(j); segments.put(segment.getIdentifier(), segment); - EasyMock.expect(druidServer.getSegment(segment.getIdentifier())).andReturn(segment).anyTimes(); } - EasyMock.expect(druidServer.getSegments()).andReturn(segments).anyTimes(); - - EasyMock.replay(druidServer); - serverHolderList.add(new ServerHolder(druidServer, fromPeon)); + serverHolderList.add( + new ServerHolder( + new ImmutableDruidServer( + new DruidServerMetadata("DruidServer_Name_" + i, "localhost", 10000000L, "hot", "hot", 1), + 3000L, + ImmutableMap.of("DUMMY", EasyMock.createMock(ImmutableDruidDataSource.class)), + ImmutableMap.copyOf(segments) + ), + fromPeon + )); } // The best server to be available for next segment assignment has only 98 Segments @@ -90,6 +93,7 @@ public void setupDummyCluster(int serverCount, int maxSegments) EasyMock.replay(druidServer); serverHolderList.add(new ServerHolder(druidServer, fromPeon)); + return serverHolderList; } /** @@ -99,12 +103,12 @@ public void setupDummyCluster(int serverCount, int maxSegments) * * @return segment */ - private DataSegment getSegment(int index) + public static DataSegment getSegment(int index) { return getSegment(index, "DUMMY", day); } - private DataSegment getSegment(int index, String dataSource, Interval interval) + public static DataSegment getSegment(int index, String dataSource, Interval interval) { // Not using EasyMock as it hampers the performance of multithreads. DataSegment segment = new DataSegment( @@ -117,11 +121,11 @@ private DataSegment getSegment(int index, String dataSource, Interval interval) @Test public void testCostBalancerMultiThreadedStrategy() throws InterruptedException { - setupDummyCluster(10, 20); + List serverHolderList = setupDummyCluster(10, 20); DataSegment segment = getSegment(1000); final DateTime referenceTimestamp = new DateTime("2014-01-01"); - BalancerStrategy strategy = new CostBalancerStrategy(referenceTimestamp, 4); + BalancerStrategy strategy = new CostBalancerStrategy(referenceTimestamp, Executors.newFixedThreadPool(4)); ServerHolder holder = strategy.findNewSegmentHomeReplicator(segment, serverHolderList); Assert.assertNotNull("Should be able to find a place for new segment!!", holder); Assert.assertEquals("Best Server should be BEST_SERVER", "BEST_SERVER", holder.getServer().getName()); @@ -130,44 +134,21 @@ public void testCostBalancerMultiThreadedStrategy() throws InterruptedException @Test public void testCostBalancerSingleThreadStrategy() throws InterruptedException { - setupDummyCluster(10, 20); + List serverHolderList = setupDummyCluster(10, 20); DataSegment segment = getSegment(1000); final DateTime referenceTimestamp = new DateTime("2014-01-01"); - BalancerStrategy strategy = new CostBalancerStrategy(referenceTimestamp, 1); + BalancerStrategy strategy = new CostBalancerStrategy(referenceTimestamp, Executors.newFixedThreadPool(1)); ServerHolder holder = strategy.findNewSegmentHomeReplicator(segment, serverHolderList); Assert.assertNotNull("Should be able to find a place for new segment!!", holder); Assert.assertEquals("Best Server should be BEST_SERVER", "BEST_SERVER", holder.getServer().getName()); } - @Test - @Ignore - public void testBenchmark() throws InterruptedException - { - setupDummyCluster(100, 500); - DataSegment segment = getSegment(1000); - - BalancerStrategy singleThreadStrategy = new CostBalancerStrategy(DateTime.now(DateTimeZone.UTC), 1); - long start = System.currentTimeMillis(); - singleThreadStrategy.findNewSegmentHomeReplicator(segment, serverHolderList); - long end = System.currentTimeMillis(); - long latencySingleThread = end - start; - - BalancerStrategy strategy = new CostBalancerStrategy(DateTime.now(DateTimeZone.UTC), 4); - start = System.currentTimeMillis(); - strategy.findNewSegmentHomeReplicator(segment, serverHolderList); - end = System.currentTimeMillis(); - long latencyMultiThread = end - start; - - System.err.println("Latency - Single Threaded (ms): " + latencySingleThread); - System.err.println("Latency - Multi Threaded (ms): " + latencyMultiThread); - } - @Test public void testComputeJointSegmentCost() { DateTime referenceTime = new DateTime("2014-01-01T00:00:00"); - CostBalancerStrategy strategy = new CostBalancerStrategy(referenceTime, 4); + CostBalancerStrategy strategy = new CostBalancerStrategy(referenceTime, Executors.newFixedThreadPool(4)); double segmentCost = strategy.computeJointSegmentCosts( getSegment( 100, @@ -187,7 +168,6 @@ public void testComputeJointSegmentCost() ) ); Assert.assertEquals(138028.62811791385d, segmentCost, 0); - } } diff --git a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorBalancerTest.java b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorBalancerTest.java index 19aaf6abc228..a0074b132660 100644 --- a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorBalancerTest.java +++ b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorBalancerTest.java @@ -34,6 +34,7 @@ import org.junit.Before; import org.junit.Test; +import java.io.IOException; import java.util.Arrays; import java.util.HashMap; import java.util.Map; @@ -134,7 +135,7 @@ public void tearDown() throws Exception @Test - public void testMoveToEmptyServerBalancer() + public void testMoveToEmptyServerBalancer() throws IOException { EasyMock.expect(druidServer1.getName()).andReturn("from").atLeastOnce(); EasyMock.expect(druidServer1.getCurrSize()).andReturn(30L).atLeastOnce(); @@ -197,19 +198,21 @@ public void testMoveToEmptyServerBalancer() MAX_SEGMENTS_TO_MOVE ).build() ) - .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) + .withBalancerStrategyFactory(new CostBalancerStrategyFactory(1)) + .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) .build(); params = new DruidCoordinatorBalancerTester(coordinator).run(params); Assert.assertTrue(params.getCoordinatorStats().getPerTierStats().get("movedCount").get("normal").get() > 0); Assert.assertTrue(params.getCoordinatorStats().getPerTierStats().get("movedCount").get("normal").get() < segments.size()); + params.getBalancerStrategyFactory().close(); } @Test - public void testRun1() + public void testRun1() throws IOException { // Mock some servers of different usages @@ -272,16 +275,18 @@ public void testRun1() new CoordinatorDynamicConfig.Builder().withMaxSegmentsToMove(MAX_SEGMENTS_TO_MOVE) .build() ) - .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) + .withBalancerStrategyFactory(new CostBalancerStrategyFactory(1)) + .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) .build(); params = new DruidCoordinatorBalancerTester(coordinator).run(params); Assert.assertTrue(params.getCoordinatorStats().getPerTierStats().get("movedCount").get("normal").get() > 0); + params.getBalancerStrategyFactory().close(); } @Test - public void testRun2() + public void testRun2() throws IOException { // Mock some servers of different usages EasyMock.expect(druidServer1.getName()).andReturn("1").atLeastOnce(); @@ -366,11 +371,13 @@ public void testRun2() MAX_SEGMENTS_TO_MOVE ).build() ) - .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) + .withBalancerStrategyFactory(new CostBalancerStrategyFactory(1)) + .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) .build(); params = new DruidCoordinatorBalancerTester(coordinator).run(params); Assert.assertTrue(params.getCoordinatorStats().getPerTierStats().get("movedCount").get("normal").get() > 0); + params.getBalancerStrategyFactory().close(); } } diff --git a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java index cbc59e9fda0e..c727f45a6a8a 100644 --- a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java +++ b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java @@ -182,6 +182,8 @@ public void testRunThreeTiersOneReplicant() throws Exception .withAvailableSegments(availableSegments) .withDatabaseRuleManager(databaseRuleManager) .withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster())) + .withBalancerStrategyFactory(new CostBalancerStrategyFactory(1)) + .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) .withDynamicConfigs(new CoordinatorDynamicConfig.Builder().withMaxSegmentsToMove(5).build()) .build(); @@ -195,6 +197,7 @@ public void testRunThreeTiersOneReplicant() throws Exception Assert.assertTrue(stats.getPerTierStats().get("unassignedSize") == null); EasyMock.verify(mockPeon); + params.getBalancerStrategyFactory().close(); } /** @@ -276,6 +279,7 @@ public void testRunTwoTiersTwoReplicants() throws Exception .withAvailableSegments(availableSegments) .withDatabaseRuleManager(databaseRuleManager) .withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster())) + .withBalancerStrategyFactory(new CostBalancerStrategyFactory(1)) .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) .build(); @@ -288,6 +292,7 @@ public void testRunTwoTiersTwoReplicants() throws Exception Assert.assertTrue(stats.getPerTierStats().get("unassignedSize") == null); EasyMock.verify(mockPeon); + params.getBalancerStrategyFactory().close(); } /** @@ -365,6 +370,7 @@ public void testRunTwoTiersWithExistingSegments() throws Exception .withAvailableSegments(availableSegments) .withDatabaseRuleManager(databaseRuleManager) .withSegmentReplicantLookup(segmentReplicantLookup) + .withBalancerStrategyFactory(new CostBalancerStrategyFactory(1)) .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) .build(); @@ -377,6 +383,7 @@ public void testRunTwoTiersWithExistingSegments() throws Exception Assert.assertTrue(stats.getPerTierStats().get("unassignedSize") == null); EasyMock.verify(mockPeon); + params.getBalancerStrategyFactory().close(); } @Test @@ -429,6 +436,7 @@ public void testRunTwoTiersTierDoesNotExist() throws Exception .withAvailableSegments(availableSegments) .withDatabaseRuleManager(databaseRuleManager) .withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster())) + .withBalancerStrategyFactory(new CostBalancerStrategyFactory(1)) .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) .build(); @@ -436,6 +444,7 @@ public void testRunTwoTiersTierDoesNotExist() throws Exception EasyMock.verify(emitter); EasyMock.verify(mockPeon); + params.getBalancerStrategyFactory().close(); } @Test @@ -548,6 +557,7 @@ public void testDropRemove() throws Exception .withAvailableSegments(availableSegments) .withDatabaseRuleManager(databaseRuleManager) .withSegmentReplicantLookup(segmentReplicantLookup) + .withBalancerStrategyFactory(new CostBalancerStrategyFactory(1)) .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) .build(); @@ -557,6 +567,7 @@ public void testDropRemove() throws Exception Assert.assertTrue(stats.getGlobalStats().get("deletedCount").get() == 12); EasyMock.verify(coordinator); + params.getBalancerStrategyFactory().close(); } @Test @@ -625,6 +636,7 @@ public void testDropTooManyInSameTier() throws Exception .withAvailableSegments(availableSegments) .withDatabaseRuleManager(databaseRuleManager) .withSegmentReplicantLookup(segmentReplicantLookup) + .withBalancerStrategyFactory(new CostBalancerStrategyFactory(1)) .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) .build(); @@ -635,6 +647,7 @@ public void testDropTooManyInSameTier() throws Exception Assert.assertTrue(stats.getGlobalStats().get("deletedCount").get() == 12); EasyMock.verify(mockPeon); + params.getBalancerStrategyFactory().close(); } @Test @@ -709,6 +722,7 @@ public void testDropTooManyInDifferentTiers() throws Exception .withAvailableSegments(availableSegments) .withDatabaseRuleManager(databaseRuleManager) .withSegmentReplicantLookup(segmentReplicantLookup) + .withBalancerStrategyFactory(new CostBalancerStrategyFactory(1)) .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) .build(); @@ -719,6 +733,7 @@ public void testDropTooManyInDifferentTiers() throws Exception Assert.assertTrue(stats.getGlobalStats().get("deletedCount").get() == 12); EasyMock.verify(mockPeon); + params.getBalancerStrategyFactory().close(); } @Test @@ -789,6 +804,7 @@ public void testDontDropInDifferentTiers() throws Exception .withAvailableSegments(availableSegments) .withDatabaseRuleManager(databaseRuleManager) .withSegmentReplicantLookup(segmentReplicantLookup) + .withBalancerStrategyFactory(new CostBalancerStrategyFactory(1)) .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) .build(); @@ -799,6 +815,7 @@ public void testDontDropInDifferentTiers() throws Exception Assert.assertTrue(stats.getGlobalStats().get("deletedCount").get() == 12); EasyMock.verify(mockPeon); + params.getBalancerStrategyFactory().close(); } @Test @@ -882,6 +899,7 @@ public void testDropServerActuallyServesSegment() throws Exception .withAvailableSegments(availableSegments) .withDatabaseRuleManager(databaseRuleManager) .withSegmentReplicantLookup(segmentReplicantLookup) + .withBalancerStrategyFactory(new CostBalancerStrategyFactory(1)) .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) .build(); @@ -892,6 +910,7 @@ public void testDropServerActuallyServesSegment() throws Exception EasyMock.verify(mockPeon); EasyMock.verify(anotherMockPeon); + params.getBalancerStrategyFactory().close(); } /** @@ -955,6 +974,8 @@ public void testReplicantThrottle() throws Exception .withAvailableSegments(availableSegments) .withDatabaseRuleManager(databaseRuleManager) .withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster())) + .withBalancerStrategyFactory(new CostBalancerStrategyFactory(1)) + .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) .build(); DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params); @@ -975,6 +996,7 @@ public void testReplicantThrottle() throws Exception 1, 0 ); + afterParams.getBalancerStrategyFactory().close(); afterParams = ruleRunner.run( new DruidCoordinatorRuntimeParams.Builder() @@ -982,6 +1004,8 @@ public void testReplicantThrottle() throws Exception .withEmitter(emitter) .withAvailableSegments(Arrays.asList(overFlowSegment)) .withDatabaseRuleManager(databaseRuleManager) + .withBalancerStrategyFactory(new CostBalancerStrategyFactory(1)) + .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) .withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster())) .build() ); @@ -992,6 +1016,7 @@ public void testReplicantThrottle() throws Exception Assert.assertTrue(stats.getPerTierStats().get("unassignedSize") == null); EasyMock.verify(mockPeon); + afterParams.getBalancerStrategyFactory().close(); } /** @@ -1073,6 +1098,8 @@ public void testReplicantThrottleAcrossTiers() throws Exception .withDruidCluster(druidCluster) .withAvailableSegments(availableSegments) .withDatabaseRuleManager(databaseRuleManager) + .withBalancerStrategyFactory(new CostBalancerStrategyFactory(1)) + .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) .withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster())) .build(); @@ -1086,6 +1113,7 @@ public void testReplicantThrottleAcrossTiers() throws Exception Assert.assertTrue(stats.getPerTierStats().get("unassignedSize") == null); EasyMock.verify(mockPeon); + params.getBalancerStrategyFactory().close(); } @Test @@ -1168,6 +1196,7 @@ public void testDropReplicantThrottle() throws Exception .withAvailableSegments(longerAvailableSegments) .withDatabaseRuleManager(databaseRuleManager) .withSegmentReplicantLookup(segmentReplicantLookup) + .withBalancerStrategyFactory(new CostBalancerStrategyFactory(1)) .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) .build(); @@ -1176,6 +1205,7 @@ public void testDropReplicantThrottle() throws Exception Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get("normal").get() == 24); EasyMock.verify(mockPeon); + params.getBalancerStrategyFactory().close(); } private void mockCoordinator() diff --git a/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java b/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java index d2d254ff4141..920447d66340 100644 --- a/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java +++ b/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java @@ -33,6 +33,7 @@ import io.druid.client.DruidServer; import io.druid.jackson.DefaultObjectMapper; import io.druid.server.coordinator.CoordinatorStats; +import io.druid.server.coordinator.CostBalancerStrategyFactory; import io.druid.server.coordinator.DruidCluster; import io.druid.server.coordinator.DruidCoordinatorRuntimeParams; import io.druid.server.coordinator.LoadPeonCallback; @@ -189,18 +190,22 @@ public boolean appliesTo(Interval interval, DateTime referenceTimestamp) ) ); + CostBalancerStrategyFactory costBalancerStrategyFactory = new CostBalancerStrategyFactory(1); CoordinatorStats stats = rule.run( null, DruidCoordinatorRuntimeParams.newBuilder() .withDruidCluster(druidCluster) - .withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster())) + .withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster)) .withReplicationManager(throttler) + .withBalancerStrategyFactory(costBalancerStrategyFactory) + .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) .withAvailableSegments(Arrays.asList(segment)).build(), segment ); Assert.assertTrue(stats.getPerTierStats().get("assignedCount").get("hot").get() == 1); Assert.assertTrue(stats.getPerTierStats().get("assignedCount").get(DruidServer.DEFAULT_TIER).get() == 2); + costBalancerStrategyFactory.close(); } @Test @@ -291,19 +296,22 @@ public boolean appliesTo(Interval interval, DateTime referenceTimestamp) ) ); - + CostBalancerStrategyFactory costBalancerStrategyFactory = new CostBalancerStrategyFactory(1); CoordinatorStats stats = rule.run( null, DruidCoordinatorRuntimeParams.newBuilder() .withDruidCluster(druidCluster) .withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster)) .withReplicationManager(throttler) + .withBalancerStrategyFactory(costBalancerStrategyFactory) + .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) .withAvailableSegments(Arrays.asList(segment)).build(), segment ); Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get("hot").get() == 1); Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get(DruidServer.DEFAULT_TIER).get() == 1); + costBalancerStrategyFactory.close(); } @Test @@ -373,18 +381,21 @@ public boolean appliesTo(Interval interval, DateTime referenceTimestamp) ) ) ); - +CostBalancerStrategyFactory costBalancerStrategyFactory = new CostBalancerStrategyFactory(1); CoordinatorStats stats = rule.run( null, DruidCoordinatorRuntimeParams.newBuilder() .withDruidCluster(druidCluster) .withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster())) .withReplicationManager(throttler) + .withBalancerStrategyFactory(costBalancerStrategyFactory) + .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) .withAvailableSegments(Arrays.asList(segment)).build(), segment ); Assert.assertTrue(stats.getPerTierStats().get("assignedCount").get("hot").get() == 1); + costBalancerStrategyFactory.close(); } @Test @@ -470,6 +481,7 @@ public boolean appliesTo(Interval interval, DateTime referenceTimestamp) ) ) ); + CostBalancerStrategyFactory costBalancerStrategyFactory = new CostBalancerStrategyFactory(1); CoordinatorStats stats = rule.run( null, @@ -477,10 +489,13 @@ public boolean appliesTo(Interval interval, DateTime referenceTimestamp) .withDruidCluster(druidCluster) .withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster)) .withReplicationManager(throttler) + .withBalancerStrategyFactory(costBalancerStrategyFactory) + .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) .withAvailableSegments(Arrays.asList(segment)).build(), segment ); Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get("hot").get() == 1); + costBalancerStrategyFactory.close(); } } From 8c02b873b76e2300a9f698fd78d31842f713871b Mon Sep 17 00:00:00 2001 From: Nishant Date: Wed, 4 May 2016 22:24:06 +0530 Subject: [PATCH 2/3] review comments --- .../server/coordinator/CostBalancerStrategy.java | 4 ++-- .../coordinator/CostBalancerStrategyFactory.java | 7 ++++--- .../coordination/CostBalancerStrategyTest.java | 16 +++++++++++++--- 3 files changed, 19 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/io/druid/server/coordinator/CostBalancerStrategy.java b/server/src/main/java/io/druid/server/coordinator/CostBalancerStrategy.java index 32a66ef9e62b..9373db2b421c 100644 --- a/server/src/main/java/io/druid/server/coordinator/CostBalancerStrategy.java +++ b/server/src/main/java/io/druid/server/coordinator/CostBalancerStrategy.java @@ -54,10 +54,10 @@ public static long gapMillis(Interval interval1, Interval interval2) } } - public CostBalancerStrategy(DateTime referenceTimestamp, ExecutorService exec) + public CostBalancerStrategy(DateTime referenceTimestamp, ListeningExecutorService exec) { this.referenceTimestamp = referenceTimestamp.getMillis(); - this.exec = MoreExecutors.listeningDecorator(exec); + this.exec = exec; } @Override diff --git a/server/src/main/java/io/druid/server/coordinator/CostBalancerStrategyFactory.java b/server/src/main/java/io/druid/server/coordinator/CostBalancerStrategyFactory.java index 63c2f1c195e5..bbaebe6c7cdf 100644 --- a/server/src/main/java/io/druid/server/coordinator/CostBalancerStrategyFactory.java +++ b/server/src/main/java/io/druid/server/coordinator/CostBalancerStrategyFactory.java @@ -18,19 +18,20 @@ */ package io.druid.server.coordinator; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import org.joda.time.DateTime; import java.io.IOException; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class CostBalancerStrategyFactory implements BalancerStrategyFactory { - private final ExecutorService exec; + private final ListeningExecutorService exec; public CostBalancerStrategyFactory(int costBalancerStrategyThreadCount) { - this.exec = Executors.newFixedThreadPool(costBalancerStrategyThreadCount); + this.exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(costBalancerStrategyThreadCount)); } @Override diff --git a/server/src/test/java/io/druid/server/coordination/CostBalancerStrategyTest.java b/server/src/test/java/io/druid/server/coordination/CostBalancerStrategyTest.java index d4e338332cb4..be0eec6ce5bc 100644 --- a/server/src/test/java/io/druid/server/coordination/CostBalancerStrategyTest.java +++ b/server/src/test/java/io/druid/server/coordination/CostBalancerStrategyTest.java @@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.util.concurrent.MoreExecutors; import io.druid.client.ImmutableDruidDataSource; import io.druid.client.ImmutableDruidServer; import io.druid.concurrent.Execs; @@ -125,7 +126,10 @@ public void testCostBalancerMultiThreadedStrategy() throws InterruptedException DataSegment segment = getSegment(1000); final DateTime referenceTimestamp = new DateTime("2014-01-01"); - BalancerStrategy strategy = new CostBalancerStrategy(referenceTimestamp, Executors.newFixedThreadPool(4)); + BalancerStrategy strategy = new CostBalancerStrategy( + referenceTimestamp, + MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(4)) + ); ServerHolder holder = strategy.findNewSegmentHomeReplicator(segment, serverHolderList); Assert.assertNotNull("Should be able to find a place for new segment!!", holder); Assert.assertEquals("Best Server should be BEST_SERVER", "BEST_SERVER", holder.getServer().getName()); @@ -138,7 +142,10 @@ public void testCostBalancerSingleThreadStrategy() throws InterruptedException DataSegment segment = getSegment(1000); final DateTime referenceTimestamp = new DateTime("2014-01-01"); - BalancerStrategy strategy = new CostBalancerStrategy(referenceTimestamp, Executors.newFixedThreadPool(1)); + BalancerStrategy strategy = new CostBalancerStrategy( + referenceTimestamp, + MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1)) + ); ServerHolder holder = strategy.findNewSegmentHomeReplicator(segment, serverHolderList); Assert.assertNotNull("Should be able to find a place for new segment!!", holder); Assert.assertEquals("Best Server should be BEST_SERVER", "BEST_SERVER", holder.getServer().getName()); @@ -148,7 +155,10 @@ public void testCostBalancerSingleThreadStrategy() throws InterruptedException public void testComputeJointSegmentCost() { DateTime referenceTime = new DateTime("2014-01-01T00:00:00"); - CostBalancerStrategy strategy = new CostBalancerStrategy(referenceTime, Executors.newFixedThreadPool(4)); + CostBalancerStrategy strategy = new CostBalancerStrategy( + referenceTime, + MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(4)) + ); double segmentCost = strategy.computeJointSegmentCosts( getSegment( 100, From e81d4285f5b35d3e1b39613ab39874c6a59e9077 Mon Sep 17 00:00:00 2001 From: Nishant Date: Thu, 5 May 2016 16:49:10 +0530 Subject: [PATCH 3/3] review comment --- .../server/coordinator/DruidCoordinator.java | 31 +++++++------------ 1 file changed, 11 insertions(+), 20 deletions(-) diff --git a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java index 64d02a346f43..ebcd78adfece 100644 --- a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java @@ -686,19 +686,17 @@ public void run() } } - BalancerStrategyFactory factory = - new CostBalancerStrategyFactory(getDynamicConfigs().getBalancerComputeThreads()); - - // Do coordinator stuff. - DruidCoordinatorRuntimeParams params = - DruidCoordinatorRuntimeParams.newBuilder() - .withStartTime(startTime) - .withDatasources(metadataSegmentManager.getInventory()) - .withDynamicConfigs(getDynamicConfigs()) - .withEmitter(emitter) - .withBalancerStrategyFactory(factory) - .build(); - try { + try (BalancerStrategyFactory factory = + new CostBalancerStrategyFactory(getDynamicConfigs().getBalancerComputeThreads())) { + // Do coordinator stuff. + DruidCoordinatorRuntimeParams params = + DruidCoordinatorRuntimeParams.newBuilder() + .withStartTime(startTime) + .withDatasources(metadataSegmentManager.getInventory()) + .withDynamicConfigs(getDynamicConfigs()) + .withEmitter(emitter) + .withBalancerStrategyFactory(factory) + .build(); for (DruidCoordinatorHelper helper : helpers) { // Don't read state and run state in the same helper otherwise racy conditions may exist if (leader && startingLeaderCounter == leaderCounter) { @@ -706,13 +704,6 @@ public void run() } } } - finally { - if (params.getBalancerStrategyFactory() != null) { - params.getBalancerStrategyFactory().close(); - } - } - - } catch (Exception e) { log.makeAlert(e, "Caught exception, ignoring so that schedule keeps going.").emit();