-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adapt maxSegmentsToMove based on cluster skew #14584
Conversation
server/src/main/java/org/apache/druid/server/coordinator/balancer/TierSegmentBalancer.java
Fixed
Show fixed
Hide fixed
).sum(); | ||
return Math.min(decommSegmentsToMove, maxSegmentsToMove); | ||
} else { | ||
int maxPercentageToMove = dynamicConfig.getDecommissioningMaxPercentOfMaxSegmentsToMove(); |
Check notice
Code scanning / CodeQL
Deprecated method or constructor invocation Note
CoordinatorDynamicConfig.getDecommissioningMaxPercentOfMaxSegmentsToMove
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤘 this seems like it should be pretty cool to have, has been on my wish list for some time now
server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java
Outdated
Show resolved
Hide resolved
public static int computeMinSegmentsToMoveInTier(int totalSegmentsInTier) | ||
{ | ||
// Divide by 2^14 and multiply by 100 so that the value increases | ||
// in steps of 100 for every 2^14 = ~16k segments | ||
int upperBound = (totalSegmentsInTier >> 14) * 100; | ||
int lowerBound = Math.min(MIN_SEGMENTS_TO_MOVE, totalSegmentsInTier); | ||
return Math.max(lowerBound, upperBound); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i'm a little bit conflicted/worried about this for large clusters, on the one hand it is good to consider a larger number of segments to seek to achieve balance, but iirc data transfer costs from deep storage while super cheap are not actually free, so moves that barely make a difference could end up causing needless churn.
I wonder if we need some sort of "inertia" threshold where if the cost of moving the segments isn't "better enough" we skip moving the segment. That way we can still consider a larger number of segments, but not necessarily make every move. I suppose something like this would be balancer specific since it really probably only makes sense for cost balancer, but still might be worth considering.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that would make sense. The only problem is that such a threshold would have to be dynamic too. A cluster with fewer segments results in smaller costs. So the threshold would have to increase as the number of segments in the cluster increases.
As a compromise, I can reduce the minSegmentsToMove
even further.
Although, I tested these changes on a cluster with 900k segments and minSegmentsToMove
was about 6500. The good thing is that, even out of 6500, the cluster would decide to move only about 100 segments as the rest were already well placed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think the only really shocking value was when I saw this test:
Assert.assertEquals(61_000, computeMinSegmentsToMove(10_000_000));
Assuming that it actually moved that many segments and my math is correct, its like a bit less than $30 a day just in S3 get object requests assuming the cluster is in the same region as bucket 😅 .. but I guess with a cluster that big this is probably a tiny fraction of total spend so maybe its not a big deal.
My casual observation is that very large clusters with hundreds of data nodes usually make most of the moves when given the option, assuming new segments are continuously being created in a relatively large number, because there is frequently a slightly better server to place a given segment since the last time it was sampled. I don't have any data to prove this other than gut feeling though. I would also believe that maybe this is the case because i haven't actually seen a cluster that big trying to move segments this aggressively to give it a chance of ever converging on some stable state, so of course its always going to move stuff when only looking at tens to hundreds of segments at a time out of millions at a rate slower than new segments are created (and old ones dropped).
Another idea is that we could level off the min scaling as the total number of segments gets very high? like make the steps smaller or something?
However, I'm ok if we want to give this a shot as it is too, maybe we could advise in release notes that people with huge clusters might want to keep a careful watch on stuff to make sure things are actually chill.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that's a valid point, the costs could be pretty high for large clusters if they end up moving all the eligible segments in every cycle. Those tests are a little misleading as they are just verifying the value computed by the computeMinSegmentsToMove
method. With larger clusters, the maxSegmentsToMove
turns out to be the determining factor in practice. This is because the balancerComputeThreads
has been capped at 8 and thus the maxSegmentsToMove
falls off pretty fast. I have attached a table in the PR description for reference.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So with 2M total segments, we will never move more than 8k segments. With 10M, only 1k and so on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Although, I agree that minSegmentsToMove
can probably still be lower because we are already handling cases of skew and increasing the maxSegmentsToMove
when needed. So in stable state, the minSegmentsToMove
can be even lower.
For now, I am going to reduce it 4-fold, i.e. from 0.6% to 0.15%. We can improve upon this value if needed when we have seen some prod clusters running this.
server/src/main/java/org/apache/druid/server/coordinator/balancer/SegmentToMoveCalculator.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/druid/server/coordinator/balancer/SegmentToMoveCalculator.java
Outdated
Show resolved
Hide resolved
private static int computeSegmentsToMoveToBalanceDiskUsage( | ||
String tier, | ||
List<ServerHolder> servers | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i sort of wonder if this runs into conflicts with cost balancer which primarily operates on interval adjacency instead of disk size for datasources where the segment sizes are not very homogenous. I imagine it might, but also its probably not that harmful because in the worst case it just considers more segments to move than it needed to.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, this logic is just to determine the number of segments we would like to move, while being agnostic to the actual balancer strategy employed.
(That's not entirely true since we do calculate the total time taken and thus maxSegmentsToMove
based on time taken by cost
strategy in a single computation. But that just means that any other existing or future strategies need to be faster than cost
.)
Given that all historicals in a tier have the same amount of maximum disk space, we should be good. I imagine there are not many clusters out there that use heterogeneous historicals. If there are, we would have to rely on percentages but then again, that percentage would need to be somehow translated to bytes.
server/src/main/java/org/apache/druid/server/coordinator/balancer/TierSegmentBalancer.java
Outdated
Show resolved
Hide resolved
* | ||
* @return Number of {@code balancerComputeThreads} in the range [1, 8]. | ||
*/ | ||
public static int computeNumBalancerThreads(int numUsedSegments) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: would it better to consider the number of replicas rather than the number of used segments in cases where a lot of segments have 0 replicas in the load rule
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The javadoc calls it out that we are using numUsedSegments
as a proxy for the number of replicas. If we count number of replicas at this point, we might under-estimate the number of segments as some unavailable segments might not have been assigned yet (since this is before the RunRules
duty). This might lead to too few threads.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My understanding was that cost computation complexity is a function of the number of loaded replicas when smart segment loading is used, and not of the total number of segments.
Thanks for the clarification that the intent was because it's a good proxy in general
public static int computeMinSegmentsToMoveInTier(int totalSegmentsInTier) | ||
{ | ||
// Divide by 2^14 and multiply by 100 so that the value increases | ||
// in steps of 100 for every 2^14 = ~16k segments | ||
int upperBound = (totalSegmentsInTier >> 14) * 100; | ||
int lowerBound = Math.min(MIN_SEGMENTS_TO_MOVE, totalSegmentsInTier); | ||
return Math.max(lowerBound, upperBound); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think the only really shocking value was when I saw this test:
Assert.assertEquals(61_000, computeMinSegmentsToMove(10_000_000));
Assuming that it actually moved that many segments and my math is correct, its like a bit less than $30 a day just in S3 get object requests assuming the cluster is in the same region as bucket 😅 .. but I guess with a cluster that big this is probably a tiny fraction of total spend so maybe its not a big deal.
My casual observation is that very large clusters with hundreds of data nodes usually make most of the moves when given the option, assuming new segments are continuously being created in a relatively large number, because there is frequently a slightly better server to place a given segment since the last time it was sampled. I don't have any data to prove this other than gut feeling though. I would also believe that maybe this is the case because i haven't actually seen a cluster that big trying to move segments this aggressively to give it a chance of ever converging on some stable state, so of course its always going to move stuff when only looking at tens to hundreds of segments at a time out of millions at a rate slower than new segments are created (and old ones dropped).
Another idea is that we could level off the min scaling as the total number of segments gets very high? like make the steps smaller or something?
However, I'm ok if we want to give this a shot as it is too, maybe we could advise in release notes that people with huge clusters might want to keep a careful watch on stuff to make sure things are actually chill.
Motivation
What is usage skew?
Usage skew between historicals in a tier refers to a difference in terms of:
Why is skew bad?
Current methods of mitigation
maxSegmentsToMove
and the Coordinator looks only at small samples of segments to determine what to move in each cycle. These samples often contain segments that do not even need to be moved and the Coordinator skips moving them, thus wasting cycles.maxSegmentsToMove
.maxSegmentsToMove
that takes care of all scenarios would have to be very high and might lead to unnecessary moves when the cluster is already balanced.maxSegmentsToMove
manually when required.Proposed solution
Compute
maxSegmentsToMove
dynamically based on the current cluster state.Definitions
maxSegmentsToMove
: Maximum number of segments allowed to be moved in a coordinator run.This limit is to ensure that the coordinator:
segmentsToMove
: Number of segments sampled from all the loaded and loading segments that are considered for moving. TheBalancerStrategy
is consulted for these segments to find the optimal server to place them. It may also be that a segments is already optimally placed and is thus not moved at all.Existing implementation
When
smartSegmentLoading
is disabledmaxSegmentsToMove
as configured by the user in coordinator dynamic configsegmentsToMove = maxSegmentsToMove
When
smartSegmentLoading
is enabledmaxSegmentsToMove = Math.min(1000, 2% of numUsedSegments)
segmentsToMove = maxSegmentsToMove
Proposed implementation
smartSegmentLoading
is disabledsmartSegmentLoading
is enabledbalancerComputeThreads
based onnumUsedSegments
maxSegmentsToMove
based onbalancerComputeThreads
segmentsToMoveToFixSkew
based on difference in disk usage and number of segmentssegmentsToMove = Math.min(maxSegmentsToMove, segmentsToMoveToFixSkew)
See Calculations for details of how each parameter is computed.
Limits
balancerComputeThreads
<= 8maxSegmentsToMove
<= 20% of total segmentsminSegmentsToMove
= 0.6% of total segmentsClasses to review
SegmentsToMoveCalculator
BalanceSegments
TierSegmentBalancer
# Calculations
balancerComputeThreads
Def: Number of threads used by balancing executor to perform cost computations.
Calculated based on the number of used segments in the cluster. The number of used segments is used as a proxy for the total projected segments in the cluster as these are the ones which actually participate in balancing cost computation.
The number of threads increases in increasing step sizes.
The current max value is 8 threads.
balancerComputeThreads
maxComputationsPerThread
Def: Maximum number of computations that can be performed on each thread in a single coordinator run.
1 computation = calculation of joint cost of 1 pair of segments
As established in #14484 , 4 balancer compute threads are able to perform cost computations for 1000 moving segments in a cluster of 1 million total segments in about 5s. Thus,
maxSegmentsToMove
Def: Maximum number of segments allowed to be moved in a tier in a coordinator run.
minSegmentsToMove
Def: Minimum number of segments to consider for move so that the cluster is always balancing itself.
segmentsToMoveToFixSkew
segmentsToMove
Def: Number of segments finally considered for moving between active servers in a tier.
Computed
balancerComputeThreads
andmaxSegmentsToMove
balancerComputeThreads
maxSegmentsToMove
with 1 replica per segment
minSegmentsToMove
with 1 replica per segment
Assumptions
These assumptions always apply when using
cost
balancer strategy and are not specific to the implementation proposed here.Release note
Adjust
balancerComputeThreads
andmaxSegmentsToMove
automatically based on usage skew between the historicals in a tier. An imbalanced cluster now rectifies itself much faster without requiring any user intervention.This PR has: