Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adapt maxSegmentsToMove based on cluster skew #14584

Merged
merged 20 commits into from
Aug 17, 2023

Conversation

kfaraz
Copy link
Contributor

@kfaraz kfaraz commented Jul 14, 2023

Motivation

What is usage skew?

Usage skew between historicals in a tier refers to a difference in terms of:

  1. overall disk usage
  2. number of segments of each datasource
  3. total number of segments (already covered by 2)
  4. cost of placing a segment (difficult to quantify as it depends on datasource and interval of the segment)

Why is skew bad?

  • Queries may perform poorly.
  • Historicals may go out of memory.
  • Historicals may run out of disk space.

Current methods of mitigation

Method Cons
Let the cluster balance itself. This may take up to several hours. This is because clusters typically have a low value of maxSegmentsToMove and the Coordinator looks only at small samples of segments to determine what to move in each cycle. These samples often contain segments that do not even need to be moved and the Coordinator skips moving them, thus wasting cycles.
Use a high value of maxSegmentsToMove. A value of maxSegmentsToMove that takes care of all scenarios would have to be very high and might lead to unnecessary moves when the cluster is already balanced.
Increase and decrease maxSegmentsToMove manually when required. Requires a lot of timely user intervention.

Proposed solution

Compute maxSegmentsToMove dynamically based on the current cluster state.

Definitions

  • maxSegmentsToMove: Maximum number of segments allowed to be moved in a coordinator run.
    This limit is to ensure that the coordinator:
    • does not get stuck doing too many cost computations in every run
    • does not perform too many moves when the cluster is already well-balanced
  • segmentsToMove: Number of segments sampled from all the loaded and loading segments that are considered for moving. The BalancerStrategy is consulted for these segments to find the optimal server to place them. It may also be that a segments is already optimally placed and is thus not moved at all.

Existing implementation

When smartSegmentLoading is disabled

  • Use the value of maxSegmentsToMove as configured by the user in coordinator dynamic config
  • segmentsToMove = maxSegmentsToMove

When smartSegmentLoading is enabled

  • Calculate maxSegmentsToMove = Math.min(1000, 2% of numUsedSegments)
  • segmentsToMove = maxSegmentsToMove

Proposed implementation

  • No change in behaviour if smartSegmentLoading is disabled
  • If smartSegmentLoading is enabled
    • Compute balancerComputeThreads based on numUsedSegments
    • Compute maxSegmentsToMove based on balancerComputeThreads
    • Compute segmentsToMoveToFixSkew based on difference in disk usage and number of segments
    • Compute segmentsToMove = Math.min(maxSegmentsToMove, segmentsToMoveToFixSkew)

See Calculations for details of how each parameter is computed.

Limits

  • 1 <= balancerComputeThreads <= 8
  • maxSegmentsToMove <= 20% of total segments
  • minSegmentsToMove = 0.6% of total segments

Classes to review

  • SegmentsToMoveCalculator
  • BalanceSegments
  • TierSegmentBalancer

# Calculations

balancerComputeThreads

Def: Number of threads used by balancing executor to perform cost computations.

Calculated based on the number of used segments in the cluster. The number of used segments is used as a proxy for the total projected segments in the cluster as these are the ones which actually participate in balancing cost computation.

The number of threads increases in increasing step sizes.
The current max value is 8 threads.

Used segments balancerComputeThreads
0 to 50,000 1
50,000 to 100,000 2
100,000 to 175,000 3
175,000 to 250,000 4
250,000 to 350,000 5
350,000 to 450,000 6
450,000 to 600,000 7
600,000 and above 8

maxComputationsPerThread

Def: Maximum number of computations that can be performed on each thread in a single coordinator run.

1 computation = calculation of joint cost of 1 pair of segments

As established in #14484 , 4 balancer compute threads are able to perform cost computations for 1000 moving segments in a cluster of 1 million total segments in about 5s. Thus,

total computations with 4 threads in 5s = 1k * 1M = 1Btotal computations per thread in 5s = 250Mtotal computations per thread in 40s = 2B

maxComputationsPerThread (per coordinator cycle) = ~2B

// This still leaves us a buffer of 20s in the typical coordinator time of 1 min

maxSegmentsToMove

Def: Maximum number of segments allowed to be moved in a tier in a coordinator run.

totalSegments = all segments in cluster that would participate in cost computations
totalSegments = all loaded and queued(loading, moving, dropping) segments across all historicals

totalComputations = maxSegmentsToMove * totalSegments = numThreads * maxComputationsPerThread

maxSegmentsToMove = (numThreads * maxComputationsPerThread) / totalSegments

minSegmentsToMove

Def: Minimum number of segments to consider for move so that the cluster is always balancing itself.

// Value increases by 100 for every 2^14 = ~16k segments
// i.e. minSegmentsToMove = ~0.6% of totalSegments
 minSegmentsToMove = Math.max(100, (totalSegments >> 14) * 100)

segmentsToMoveToFixSkew

diskUsage = projected disk usage of a historical in the tier
diff in diskUsage = max(diskUsage) - min(diskUsage)
countDiff1 = (max(diskUsage) - min(diskUsage)) / avgSegmentSize

for each datasource:
  countDiffForDatasource = max(numSegments) - min(numSegments)

countDiff2 = max(countDiffForDatasource)

segmentsToMoveToFixSkew = Math.max(countDiff1, countDiff2) / 2

segmentsToMove

Def: Number of segments finally considered for moving between active servers in a tier.

segmentsToMove = Math.max(minSegmentsToMove, Math.min(maxSegmentsToMove, segmentsToMoveToFixSkew))

Note: If there are decommissioning servers in a tier, the above computations are modified slightly to prioritize move from decommissioning servers.

Computed balancerComputeThreads and maxSegmentsToMove

Used segments balancerComputeThreads maxSegmentsToMove
with 1 replica per segment
minSegmentsToMove
with 1 replica per segment
1,000 1 125 100
10,000 1 1,900 100
50,000 2 9,700 100
100,000 3 19,500 100
200,000 4 39,000 300
300,000 5 34,000 400
400,000 6 31,000 600
500,000 7 29,000 700
1,000,000 8 16,000 1,500
2,000,000 8 8,000 3,000
10,000,000 8 1,000 15,200

Assumptions

These assumptions always apply when using cost balancer strategy and are not specific to the implementation proposed here.

  1. All historicals in a tier have the same amount of maximum disk space and on-heap memory.
  2. Segments of same granularity have similar cost. In other words, all DAY segments are considered to occupy nearly the same amount of disk space.
  3. Segments of higher granularity have higher cost, cost of MONTH segment = ~ 30 x cost of DAY segment.

Release note

Adjust balancerComputeThreads and maxSegmentsToMove automatically based on usage skew between the historicals in a tier. An imbalanced cluster now rectifies itself much faster without requiring any user intervention.


This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@kfaraz kfaraz added this to the 27.0 milestone Jul 14, 2023
).sum();
return Math.min(decommSegmentsToMove, maxSegmentsToMove);
} else {
int maxPercentageToMove = dynamicConfig.getDecommissioningMaxPercentOfMaxSegmentsToMove();

Check notice

Code scanning / CodeQL

Deprecated method or constructor invocation Note

Invoking
CoordinatorDynamicConfig.getDecommissioningMaxPercentOfMaxSegmentsToMove
should be avoided because it has been deprecated.
@kfaraz kfaraz removed this from the 27.0 milestone Jul 14, 2023
@kfaraz kfaraz closed this Aug 3, 2023
@kfaraz kfaraz reopened this Aug 3, 2023
Copy link
Member

@clintropolis clintropolis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤘 this seems like it should be pretty cool to have, has been on my wish list for some time now

Comment on lines 96 to 103
public static int computeMinSegmentsToMoveInTier(int totalSegmentsInTier)
{
// Divide by 2^14 and multiply by 100 so that the value increases
// in steps of 100 for every 2^14 = ~16k segments
int upperBound = (totalSegmentsInTier >> 14) * 100;
int lowerBound = Math.min(MIN_SEGMENTS_TO_MOVE, totalSegmentsInTier);
return Math.max(lowerBound, upperBound);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm a little bit conflicted/worried about this for large clusters, on the one hand it is good to consider a larger number of segments to seek to achieve balance, but iirc data transfer costs from deep storage while super cheap are not actually free, so moves that barely make a difference could end up causing needless churn.

I wonder if we need some sort of "inertia" threshold where if the cost of moving the segments isn't "better enough" we skip moving the segment. That way we can still consider a larger number of segments, but not necessarily make every move. I suppose something like this would be balancer specific since it really probably only makes sense for cost balancer, but still might be worth considering.

Copy link
Contributor Author

@kfaraz kfaraz Aug 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that would make sense. The only problem is that such a threshold would have to be dynamic too. A cluster with fewer segments results in smaller costs. So the threshold would have to increase as the number of segments in the cluster increases.

As a compromise, I can reduce the minSegmentsToMove even further.

Although, I tested these changes on a cluster with 900k segments and minSegmentsToMove was about 6500. The good thing is that, even out of 6500, the cluster would decide to move only about 100 segments as the rest were already well placed.

Screenshot 2023-08-15 at 8 43 26 AM

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think the only really shocking value was when I saw this test:

    Assert.assertEquals(61_000, computeMinSegmentsToMove(10_000_000));

Assuming that it actually moved that many segments and my math is correct, its like a bit less than $30 a day just in S3 get object requests assuming the cluster is in the same region as bucket 😅 .. but I guess with a cluster that big this is probably a tiny fraction of total spend so maybe its not a big deal.

My casual observation is that very large clusters with hundreds of data nodes usually make most of the moves when given the option, assuming new segments are continuously being created in a relatively large number, because there is frequently a slightly better server to place a given segment since the last time it was sampled. I don't have any data to prove this other than gut feeling though. I would also believe that maybe this is the case because i haven't actually seen a cluster that big trying to move segments this aggressively to give it a chance of ever converging on some stable state, so of course its always going to move stuff when only looking at tens to hundreds of segments at a time out of millions at a rate slower than new segments are created (and old ones dropped).

Another idea is that we could level off the min scaling as the total number of segments gets very high? like make the steps smaller or something?

However, I'm ok if we want to give this a shot as it is too, maybe we could advise in release notes that people with huge clusters might want to keep a careful watch on stuff to make sure things are actually chill.

Copy link
Contributor Author

@kfaraz kfaraz Aug 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's a valid point, the costs could be pretty high for large clusters if they end up moving all the eligible segments in every cycle. Those tests are a little misleading as they are just verifying the value computed by the computeMinSegmentsToMove method. With larger clusters, the maxSegmentsToMove turns out to be the determining factor in practice. This is because the balancerComputeThreads has been capped at 8 and thus the maxSegmentsToMove falls off pretty fast. I have attached a table in the PR description for reference.

Screenshot 2023-08-17 at 8 30 26 AM

Copy link
Contributor Author

@kfaraz kfaraz Aug 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So with 2M total segments, we will never move more than 8k segments. With 10M, only 1k and so on.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although, I agree that minSegmentsToMove can probably still be lower because we are already handling cases of skew and increasing the maxSegmentsToMove when needed. So in stable state, the minSegmentsToMove can be even lower.

For now, I am going to reduce it 4-fold, i.e. from 0.6% to 0.15%. We can improve upon this value if needed when we have seen some prod clusters running this.

Comment on lines +255 to +258
private static int computeSegmentsToMoveToBalanceDiskUsage(
String tier,
List<ServerHolder> servers
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i sort of wonder if this runs into conflicts with cost balancer which primarily operates on interval adjacency instead of disk size for datasources where the segment sizes are not very homogenous. I imagine it might, but also its probably not that harmful because in the worst case it just considers more segments to move than it needed to.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this logic is just to determine the number of segments we would like to move, while being agnostic to the actual balancer strategy employed.
(That's not entirely true since we do calculate the total time taken and thus maxSegmentsToMove based on time taken by cost strategy in a single computation. But that just means that any other existing or future strategies need to be faster than cost.)

Given that all historicals in a tier have the same amount of maximum disk space, we should be good. I imagine there are not many clusters out there that use heterogeneous historicals. If there are, we would have to rely on percentages but then again, that percentage would need to be somehow translated to bytes.

*
* @return Number of {@code balancerComputeThreads} in the range [1, 8].
*/
public static int computeNumBalancerThreads(int numUsedSegments)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: would it better to consider the number of replicas rather than the number of used segments in cases where a lot of segments have 0 replicas in the load rule

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The javadoc calls it out that we are using numUsedSegments as a proxy for the number of replicas. If we count number of replicas at this point, we might under-estimate the number of segments as some unavailable segments might not have been assigned yet (since this is before the RunRules duty). This might lead to too few threads.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding was that cost computation complexity is a function of the number of loaded replicas when smart segment loading is used, and not of the total number of segments.

Thanks for the clarification that the intent was because it's a good proxy in general

Comment on lines 96 to 103
public static int computeMinSegmentsToMoveInTier(int totalSegmentsInTier)
{
// Divide by 2^14 and multiply by 100 so that the value increases
// in steps of 100 for every 2^14 = ~16k segments
int upperBound = (totalSegmentsInTier >> 14) * 100;
int lowerBound = Math.min(MIN_SEGMENTS_TO_MOVE, totalSegmentsInTier);
return Math.max(lowerBound, upperBound);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think the only really shocking value was when I saw this test:

    Assert.assertEquals(61_000, computeMinSegmentsToMove(10_000_000));

Assuming that it actually moved that many segments and my math is correct, its like a bit less than $30 a day just in S3 get object requests assuming the cluster is in the same region as bucket 😅 .. but I guess with a cluster that big this is probably a tiny fraction of total spend so maybe its not a big deal.

My casual observation is that very large clusters with hundreds of data nodes usually make most of the moves when given the option, assuming new segments are continuously being created in a relatively large number, because there is frequently a slightly better server to place a given segment since the last time it was sampled. I don't have any data to prove this other than gut feeling though. I would also believe that maybe this is the case because i haven't actually seen a cluster that big trying to move segments this aggressively to give it a chance of ever converging on some stable state, so of course its always going to move stuff when only looking at tens to hundreds of segments at a time out of millions at a rate slower than new segments are created (and old ones dropped).

Another idea is that we could level off the min scaling as the total number of segments gets very high? like make the steps smaller or something?

However, I'm ok if we want to give this a shot as it is too, maybe we could advise in release notes that people with huge clusters might want to keep a careful watch on stuff to make sure things are actually chill.

@kfaraz kfaraz merged commit 5d4ac64 into apache:master Aug 17, 2023
74 checks passed
@kfaraz kfaraz deleted the adaptive_max_segments_to_move branch August 17, 2023 05:44
@LakshSingla LakshSingla added this to the 28.0 milestone Oct 12, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants