Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add CachingCostBalancerStrategy #61

Closed
wants to merge 23 commits into from
Closed

Conversation

dgolitsyn
Copy link

@dgolitsyn dgolitsyn commented May 30, 2017

CachingCostBalancerStrategy is fast implementation of CostBalancerStrategy. Computation algorithm slightly differs from the original version and allows pre-compute and cache cost function values to reduce complexity of the algorithm.

Motivation:

  1. Current implementation is very complex computational task and calculation time grows linearly with number of segments in the cluster. For rather big cluster one druid balancer iteration could take tens of minutes
  2. If large number of segments need to be rebalanced, cost function calculation could take hours or even days (if number of historical nodes changed for instance)
  3. Total server cost could not be calculated due to high computational complexity and thus could not be included in balancing algorithm

Implementation details:

  1. Result numbers are not equal for CachingCostBalancerStrategy and CostBalancerStrategy, but decisions made are the same, meaning that the same server has been chosen for particular segment by two strategies (has been tested for ~100k segments from our cluster)
  2. CachingCostBalancerStrategy is less memory efficient and requires ~100mb heap for each million of segments
  3. Computation time is ~20000x faster (depends on segment distribution and total number of servers and segments)

Benchmark

                           Mode  Cnt     Score     Error  Units
cachingCostStrategy       avgt   20     0.238 ±  0.003  ms/op
costStrategy              avgt   20  4549.247 ± 26.430  ms/op

Random random = new Random(seed);
SegmentCostCache.Builder prototype = SegmentCostCache.builder();
for (int i = 0; i < NUMBER_OF_SEGMENTS; ++i) {
DataSegment segment = createSegment(random.nextInt(30 * 24));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that number of days in a month * number of hours in a day? Please leave a comment

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

this.clusterCostCache = Preconditions.checkNotNull(clusterCostCache);
}

protected double computeCost(DataSegment proposalSegment, ServerHolder serverHolder)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested to inline this method, to avoid unnecessary method overloading

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree, fixed

@LifecycleStart
public void start()
{
if (!executorRef.compareAndSet(null, Execs.singleThreaded("CachingCostBalancerStrategy-executor"))) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't seem that you need AtomicReference and CAS here, it is not switched back to null anywhere, although part of message "or has been stopped" below means that it was planned.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it was planned to be set to null in stop method after shutdownNow is called for current executor

if (!executorRef.compareAndSet(null, Execs.singleThreaded("CachingCostBalancerStrategy-executor"))) {
throw new ISE("CachingCostBalancerStrategyFactory is already started or has been stopped");
}
if (!started.compareAndSet(false, true)) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you use lifecycleLock?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please address or comment on this

}
serverInventoryView.registerSegmentCallback(
executorRef.get(),
new ServerView.SegmentCallback()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this callback may be called in multiple "cycles", this whole construction seems to be racy. Then it need to call clusterCostCacheBuilder.build() in segmentViewInitialized() and save it. For future use in createBalancerStrategy().

Or maybe this factory doesn't create own callbacks, but uses already build server/segment view from CoordinatorServerView directly in createBalancerStrategy()?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure, that understand this comment correctly. What do you think is racy?
All interactions with clusterCostCacheBuilder are done via executor which is singleThreaded. Created clusterCostCache is immutable and can be used by CachingCostBalancerStrategy in multiple threads safely.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comment below

{
return new Builder();
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra empty line

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please fix

public void addSegment(DataSegment dataSegment)
{
allSegmentsCostCache.addSegment(dataSegment);
segmentsPerDataSource.computeIfAbsent(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO formatting like

      segmentsPerDataSource
          .computeIfAbsent(dataSegment.getDataSource(), d -> SegmentCostCache.builder())
          .addSegment(dataSegment);

is more readable

{
allSegmentsCostCache.removeSegement(dataSegment);
SegmentCostCache.Builder builder = segmentsPerDataSource.get(dataSegment.getDataSource());
builder.removeSegement(dataSegment);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check not null?

public void removeSegment(DataSegment dataSegment)
{
int index = -1;
ListIterator<DataSegment> it = dataSegments.listIterator();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please define it in the loop scope. Also it's type don't need to be ListIterator, could be just iterator.

}

if (index < 0) {
throw new ISE("Failed to remove segment from bucket: segment not found");

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why ISE? In other similar places absence is considered normal

}
segmentsCostCache = prototype.build();
for (int i = 0; i < NUMBER_OF_QUERIES; ++i) {
DataSegment segment = createSegment(random.nextInt(30 * 24));
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

public ServerView.CallbackAction segmentViewInitialized()
{
initialized = true;
clusterCostCacheBuilder = ClusterCostCache.builder();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line doesn't solve the problem, because you always construct CachingCostBalancerStrategy from an empty builder.

}
serverInventoryView.registerSegmentCallback(
executorRef.get(),
new ServerView.SegmentCallback()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comment below

if (!executorRef.compareAndSet(null, Execs.singleThreaded("CachingCostBalancerStrategy-executor"))) {
throw new ISE("CachingCostBalancerStrategyFactory is already started or has been stopped");
}
if (!started.compareAndSet(false, true)) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please address or comment on this

{
return new Builder();
}

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please fix

@leventov
Copy link

Also @dgolitsyn please review how does it relate to apache#3485

import io.druid.timeline.DataSegment;


public class CachingCostBalancerStrategy extends CostBalancerStrategy

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CachingCostBalancerStrategy is the same algorithm as CostBalancerStrategy, so I suggest just to "add caching" to "CostBalancerStrategy", not introduce another configuration option.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the only reason to introduce another configuration option is that CachingCostBalancerStrategy has bigger memory footprint that CostBalancerStrategy. Not sure that all users are ok with such change

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How much extra memory it would take on coordinator in our prod cluster?

private static final EmittingLogger log = new EmittingLogger(CachingCostBalancerStrategyFactory.class);

private final ServerInventoryView serverInventoryView;
private final Object lifecycleLock = new Object();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use LifecycleLock class

}
if (initialized) {
try {
CompletableFuture<CachingCostBalancerStrategy> future = CompletableFuture.supplyAsync(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about replacing all internal maps in ClusterCostCache.Builder with ConcurrentHashMap, and allow concurrent CachingCostBalancerStrategy construction in the same thread as createBalancerStrategy() is called? It simplify code and allow to remove a exception handling here.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change will require to make ServerCostCache thread-safe as well as SegmentsCostCache. I think that the overall overhead could be significant, so my vote is to keep it as is.

Copy link

@leventov leventov Jul 6, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's updated only from Curator, likely from a single thread, so I don't think it will actually make any performance difference. OTOH it allows to avoid nasty threading and error-catching logic in the factory.


public class SegmentsCostCache
{
private static final double HALF_LIFE = 1.0; // cost function half-life in hours

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add code comments, a lot of comments all over this class. See my previous PR comments


private Interval toBucketInterval(DataSegment segment)
{
long start = segment.getInterval().getStartMillis() - (segment.getInterval().getStartMillis() % BUCKET_INTERVAL);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Granularity has logic like this, could it be reused?

return interval;
}

public boolean inCalculationInterval(DataSegment dataSegment)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add tests for this method

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tests added


// avoid calculation for segments outside of LIFE_THRESHOLD
if (!inCalculationInterval(dataSegment)) {
return cost;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"This situation is already excluded on the level where Bucket.cost() is called, so throwing ISE seems more reasonable to me" - prev comment

}
// add left-non-overlapping segments
if (leftIndex >= 0) {
cost += leftSum[leftIndex] * (FastMath.exp(-t1) - FastMath.exp(-t0));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
// add all right-overlapping segments
int rightIndex = index;
while (rightIndex < sortedSegments.size() && sortedSegments.get(rightIndex)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Break on &&

*
* Joint cost for two segments:
*
* cost(X, Y) = \int_{x_0}^{x_1} \int_{y_0}^{y_1} e^{-\lambda |x-y|}dxdy
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any online service that allows to plot those formulas?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

segments.tailSet(valueAndSegment).forEach(v -> v.leftValue += leftValue);
segments.headSet(valueAndSegment).forEach(v -> v.rightValue += rightValue);

ValueAndSegment lower = segments.lower(valueAndSegment);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this? tailSet and headSet are already updated above

segments.tailSet(segmentAndValue).forEach(v -> v.leftValue += leftValue);
segments.headSet(segmentAndValue).forEach(v -> v.rightValue += rightValue);

SegmentAndValue lower = segments.lower(segmentAndValue);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this? tailSet and headSet already updated above

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add comments for this


static class SegmentAndValue implements Comparable<SegmentAndValue>
{
private final DataSegment dataSegment;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe for sanity add equals() and hashCode() that compare the segment. Since we don't actually expect that the same segment is added, we may check

if (!segments.add(segmentAndValue)) {
  throw new ISE("expect new segment");
}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

@leventov
Copy link

Closing as it is merged into upstream: apache#4731

@leventov leventov closed this Sep 28, 2017
@leventov leventov deleted the caching-cost-balancer branch September 28, 2017 19:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants