Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add CachingCostBalancerStrategy #61

Closed
wants to merge 23 commits into from
Closed
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
69d0bbb
Add CachingCostBalancerStrategy
dgolitsyn May 29, 2017
108f12a
Fail fast if serverInventoryView is null in CachingCostBalancerStrate…
dgolitsyn May 30, 2017
fe80785
Fix races in CachingCostBalancerStrategyFactory
dgolitsyn May 30, 2017
7a86471
Fix CachingCostBalancerStrategyBenchmark
dgolitsyn May 30, 2017
30e6c76
Benchmark fixes
leventov May 30, 2017
c58ae94
Address review comments
dgolitsyn Jun 1, 2017
2dba9ca
LifecycleLock in CachingCostBalancerStrategyFactory
dgolitsyn Jun 22, 2017
231b232
Fix typo
dgolitsyn Jun 22, 2017
73a716e
Address review comments
dgolitsyn Jun 22, 2017
3bc9376
Rename ServerView.ServerCallback to ServerRemovedCallback for readabi…
leventov Jun 23, 2017
eeb7b6c
Use LifecycleLock in CachingCostBalancerStrategyFactory
dgolitsyn Jul 3, 2017
5975448
Use DurationGranularity in SegmentCostCache
dgolitsyn Jul 4, 2017
3f524d1
Add test for CachingCostBalancerStrategy
dgolitsyn Jul 4, 2017
034c56a
Added docs to SegmentsCostCache
dgolitsyn Jul 4, 2017
43ce54b
Include segments that are going to be loaded to total cost in Caching…
dgolitsyn Jul 4, 2017
342a67b
Some refactoring and style
leventov Jul 6, 2017
cd27ca4
Typo
leventov Jul 6, 2017
23fa1e8
ValueAndSegment -> SegmentAndValue
leventov Jul 6, 2017
8dfb995
Add docs to SegmentsCostCache and rename SegmentAndValue to SegmentAn…
dgolitsyn Jul 6, 2017
250a7b7
Reduce bucketInterval in SegmentsCostCache from 30 to 15
dgolitsyn Jul 6, 2017
5cdb422
Fix CachingCostBalancerStrategyBenchmark
dgolitsyn Jul 6, 2017
843c462
Add link to plotter
leventov Jul 7, 2017
cae5e83
Merge branch '0.10.1-mmx' into caching-cost-balancer
leventov Aug 29, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.server.coordinator;

import io.druid.server.coordinator.cost.SegmentsCostCache;
import io.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

@State(Scope.Benchmark)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@Fork(1)
public class CachingCostBalancerStrategyBenchmark
{
private static final int NUMBER_OF_SEGMENTS = 100000;
private static final int NUMBER_OF_QUERIES = 500;

private static final long DAYS_IN_MONTH = 30;

private final DateTime referenceTime = new DateTime("2014-01-01T00:00:00");
private final List<DataSegment> segments = new ArrayList<>();
private final List<DataSegment> segmentQueries = new ArrayList<>();
private final int seed = ThreadLocalRandom.current().nextInt();

private SegmentsCostCache segmentsCostCache;

@Setup
public void createSegments()
{
Random random = new Random(seed);
SegmentsCostCache.Builder prototype = SegmentsCostCache.builder();
for (int i = 0; i < NUMBER_OF_SEGMENTS; ++i) {
DataSegment segment = createSegment(random.nextInt((int)TimeUnit.DAYS.toHours(DAYS_IN_MONTH)));
segments.add(segment);
prototype.addSegment(segment);
}
segmentsCostCache = prototype.build();
for (int i = 0; i < NUMBER_OF_QUERIES; ++i) {
DataSegment segment = createSegment(random.nextInt((int)TimeUnit.DAYS.toHours(DAYS_IN_MONTH)));
segmentQueries.add(segment);
}

System.out.println("GENERATING SEGMENTS : " + NUMBER_OF_SEGMENTS + " / " + NUMBER_OF_QUERIES);
}

@Benchmark
public double measureCostStrategy() throws InterruptedException
{
double cost = 0.0;
for (DataSegment segment : segmentQueries) {
cost += CostBalancerStrategy.computeJointSegmentsCost(segment, segments);
}
return cost;
}

@Benchmark
public double measureCachingCostStrategy() throws InterruptedException
{
double cost = 0.0;
for (DataSegment segment : segmentQueries) {
cost += segmentsCostCache.cost(segment);
}
return cost;
}

private DataSegment createSegment(int shift)
{
return new DataSegment(
"dataSource",
new Interval(referenceTime.plusHours(shift), referenceTime.plusHours(shift).plusHours(1)),
"version",
Collections.<String, Object>emptyMap(),
Collections.<String>emptyList(),
Collections.<String>emptyList(),
null,
0,
100
);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public void registerSegmentCallback(
}

@Override
public void registerServerCallback(Executor exec, ServerView.ServerCallback callback)
public void registerServerRemovedCallback(Executor exec, ServerView.ServerRemovedCallback callback)
{
// No-op
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public abstract class AbstractCuratorServerInventoryView<InventoryType> implemen
private final CuratorInventoryManager<DruidServer, InventoryType> inventoryManager;
private final AtomicBoolean started = new AtomicBoolean(false);

private final ConcurrentMap<ServerCallback, Executor> serverCallbacks = new MapMaker().makeMap();
private final ConcurrentMap<ServerRemovedCallback, Executor> serverCallbacks = new MapMaker().makeMap();
private final ConcurrentMap<SegmentCallback, Executor> segmentCallbacks = new MapMaker().makeMap();

public AbstractCuratorServerInventoryView(
Expand Down Expand Up @@ -210,7 +210,7 @@ public Iterable<DruidServer> getInventory()
}

@Override
public void registerServerCallback(Executor exec, ServerCallback callback)
public void registerServerRemovedCallback(Executor exec, ServerRemovedCallback callback)
{
serverCallbacks.put(callback, exec);
}
Expand Down Expand Up @@ -249,7 +249,7 @@ public void run()

protected void runServerCallbacks(final DruidServer server)
{
for (final Map.Entry<ServerCallback, Executor> entry : serverCallbacks.entrySet()) {
for (final Map.Entry<ServerRemovedCallback, Executor> entry : serverCallbacks.entrySet()) {
entry.getValue().execute(
new Runnable()
{
Expand Down
8 changes: 4 additions & 4 deletions server/src/main/java/io/druid/client/BrokerServerView.java
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,9 @@ public CallbackAction segmentViewInitialized()
segmentFilter
);

baseView.registerServerCallback(
baseView.registerServerRemovedCallback(
exec,
new ServerView.ServerCallback()
new ServerRemovedCallback()
{
@Override
public ServerView.CallbackAction serverRemoved(DruidServer server)
Expand Down Expand Up @@ -307,9 +307,9 @@ public <T> QueryRunner<T> getQueryRunner(DruidServer server)
}

@Override
public void registerServerCallback(Executor exec, ServerCallback callback)
public void registerServerRemovedCallback(Executor exec, ServerRemovedCallback callback)
{
baseView.registerServerCallback(exec, callback);
baseView.registerServerRemovedCallback(exec, callback);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,9 @@ public ServerView.CallbackAction segmentViewInitialized()
}
);

baseView.registerServerCallback(
baseView.registerServerRemovedCallback(
exec,
new ServerView.ServerCallback()
new ServerView.ServerRemovedCallback()
{
@Override
public ServerView.CallbackAction serverRemoved(DruidServer server)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,5 @@ public void registerSegmentCallback(
Executor exec, ServerView.SegmentCallback callback, Predicate<Pair<DruidServerMetadata, DataSegment>> filter
);

public void registerServerCallback(Executor exec, ServerView.ServerCallback callback);
public void registerServerRemovedCallback(Executor exec, ServerView.ServerRemovedCallback callback);
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer

private final LifecycleLock lifecycleLock = new LifecycleLock();

private final ConcurrentMap<ServerCallback, Executor> serverCallbacks = new MapMaker().makeMap();
private final ConcurrentMap<ServerRemovedCallback, Executor> serverCallbacks = new MapMaker().makeMap();
private final ConcurrentMap<SegmentCallback, Executor> segmentCallbacks = new MapMaker().makeMap();

private final ConcurrentMap<SegmentCallback, Predicate<Pair<DruidServerMetadata, DataSegment>>> segmentPredicates = new MapMaker()
Expand Down Expand Up @@ -258,7 +258,7 @@ public void registerSegmentCallback(
}

@Override
public void registerServerCallback(Executor exec, ServerCallback callback)
public void registerServerRemovedCallback(Executor exec, ServerRemovedCallback callback)
{
serverCallbacks.put(callback, exec);
}
Expand Down Expand Up @@ -327,7 +327,7 @@ public void run()

private void runServerCallbacks(final DruidServer server)
{
for (final Map.Entry<ServerCallback, Executor> entry : serverCallbacks.entrySet()) {
for (final Map.Entry<ServerRemovedCallback, Executor> entry : serverCallbacks.entrySet()) {
entry.getValue().execute(
new Runnable()
{
Expand Down
4 changes: 2 additions & 2 deletions server/src/main/java/io/druid/client/ServerView.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
*/
public interface ServerView
{
public void registerServerCallback(Executor exec, ServerCallback callback);
public void registerServerRemovedCallback(Executor exec, ServerRemovedCallback callback);
public void registerSegmentCallback(Executor exec, SegmentCallback callback);

public enum CallbackAction
Expand All @@ -37,7 +37,7 @@ public enum CallbackAction
UNREGISTER,
}

public static interface ServerCallback
public static interface ServerRemovedCallback
{
/**
* Called when a server is removed.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.server.coordinator;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ListeningExecutorService;
import io.druid.server.coordinator.cost.ClusterCostCache;
import io.druid.timeline.DataSegment;

import java.util.Set;


public class CachingCostBalancerStrategy extends CostBalancerStrategy

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CachingCostBalancerStrategy is the same algorithm as CostBalancerStrategy, so I suggest just to "add caching" to "CostBalancerStrategy", not introduce another configuration option.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the only reason to introduce another configuration option is that CachingCostBalancerStrategy has bigger memory footprint that CostBalancerStrategy. Not sure that all users are ok with such change

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How much extra memory it would take on coordinator in our prod cluster?

{

private final ClusterCostCache clusterCostCache;

public CachingCostBalancerStrategy(ClusterCostCache clusterCostCache, ListeningExecutorService exec)
{
super(exec);
this.clusterCostCache = Preconditions.checkNotNull(clusterCostCache);
}

@Override
protected double computeCost(
DataSegment proposalSegment, ServerHolder server, boolean includeCurrentServer
)
{
final long proposalSegmentSize = proposalSegment.getSize();

// (optional) Don't include server if it is already serving segment
if (!includeCurrentServer && server.isServingSegment(proposalSegment)) {
return Double.POSITIVE_INFINITY;
}

// Don't calculate cost if the server doesn't have enough space or is loading the segment
if (proposalSegmentSize > server.getAvailableSize() || server.isLoadingSegment(proposalSegment)) {
return Double.POSITIVE_INFINITY;
}

final String serverName = server.getServer().getName();

double cost = clusterCostCache.computeCost(serverName, proposalSegment);

// add segments that will be loaded to the cost
cost += costCacheForLoadingSegments(server).computeCost(serverName, proposalSegment);

return cost;
}

private ClusterCostCache costCacheForLoadingSegments(ServerHolder server)
{
final Set<DataSegment> loadingSegments = server.getPeon().getSegmentsToLoad();
return ClusterCostCache.builder(ImmutableMap.of(server.getServer().getName(), loadingSegments)).build();
}

}
Loading