Skip to content

Commit

Permalink
Increase the computed value of replicationThrottleLimit (#14913)
Browse files Browse the repository at this point in the history
Changes
- Increase value of `replicationThrottleLimit` computed by `smartSegmentLoading` from
2% to 5% of total number of used segments.
- Assign replicas to a tier even when some replicas are already being loaded in that tier
- Limit the total number of replicas in load queue at start of run + replica assignments in
the run to the `replicationThrottleLimit`.

i.e. for every tier,
    num loading replicas at start of run + num replicas assigned in run <= replicationThrottleLimit
  • Loading branch information
kfaraz authored Aug 28, 2023
1 parent 9fcbf05 commit d6565f4
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 34 deletions.
2 changes: 1 addition & 1 deletion docs/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -984,7 +984,7 @@ Druid computes the values to optimize Coordinator performance, based on the curr
|--------|--------------|-----------|
|`useRoundRobinSegmentAssignment`|true|Speeds up segment assignment.|
|`maxSegmentsInNodeLoadingQueue`|0|Removes the limit on load queue size.|
|`replicationThrottleLimit`|2% of used segments, minimum value 100|Prevents aggressive replication when a historical disappears only intermittently.|
|`replicationThrottleLimit`|5% of used segments, minimum value 100|Prevents aggressive replication when a historical disappears only intermittently.|
|`replicantLifetime`|60|Allows segments to wait about an hour (assuming a Coordinator period of 1 minute) in the load queue before an alert is raised. In `smartSegmentLoading` mode, load queues are not limited by size. Segments might therefore assigned to a load queue even if the corresponding server is slow to load them.|
|`maxNonPrimaryReplicantsToLoad`|`Integer.MAX_VALUE` (no limit)|This throttling is already handled by `replicationThrottleLimit`.|
|`maxSegmentsToMove`|2% of used segments, minimum value 100, maximum value 1000|Ensures that some segments are always moving in the cluster to keep it well balanced. The maximum value keeps the Coordinator run times bounded.|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,10 @@

package org.apache.druid.server.coordinator.loading;

import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

/**
* The ReplicationThrottler is used to throttle the number of segment replicas
Expand All @@ -43,52 +41,55 @@ public class ReplicationThrottler
private final int replicationThrottleLimit;
private final int maxReplicaAssignmentsInRun;

private final Map<String, Integer> tierToNumAssigned = new HashMap<>();
private final Set<String> tiersLoadingReplicas = new HashSet<>();
private final Object2IntOpenHashMap<String> tierToNumAssigned = new Object2IntOpenHashMap<>();
private final Object2IntOpenHashMap<String> tierToMaxAssignments = new Object2IntOpenHashMap<>();

private int totalReplicasAssignedInRun;

/**
* Creates a new ReplicationThrottler for use during a single coordinator run.
* The number of replicas loading on a tier must always be within the current
* {@code replicationThrottleLimit}. Thus, if a tier was already loading {@code k}
* replicas at the start of a coordinator run, it may be assigned only
* {@code replicationThrottleLimit - k} more replicas during the run.
*
* @param tiersLoadingReplicas Set of tier names which are already loading
* replicas and will not be eligible for loading
* more replicas in this run.
* @param tierToLoadingReplicaCount Map from tier name to number of replicas
* already being loaded.
* @param replicationThrottleLimit Maximum number of replicas that can be
* assigned to a single tier in the current run.
* @param maxReplicaAssignmentsInRun Max number of total replicas that can be
* assigned across all tiers in the current run.
*/
public ReplicationThrottler(
Set<String> tiersLoadingReplicas,
Map<String, Integer> tierToLoadingReplicaCount,
int replicationThrottleLimit,
int maxReplicaAssignmentsInRun
)
{
this.replicationThrottleLimit = replicationThrottleLimit;
this.maxReplicaAssignmentsInRun = maxReplicaAssignmentsInRun;
this.totalReplicasAssignedInRun = 0;
if (tiersLoadingReplicas != null) {
this.tiersLoadingReplicas.addAll(tiersLoadingReplicas);
}
}

public boolean isTierLoadingReplicas(String tier)
{
return tiersLoadingReplicas.contains(tier);
if (tierToLoadingReplicaCount != null) {
tierToLoadingReplicaCount.forEach(
(tier, numLoadingReplicas) -> tierToMaxAssignments.addTo(
tier,
Math.max(0, replicationThrottleLimit - numLoadingReplicas)
)
);
}
}

public boolean canAssignReplica(String tier)
public boolean isReplicationThrottledForTier(String tier)
{
return !tiersLoadingReplicas.contains(tier)
&& totalReplicasAssignedInRun < maxReplicaAssignmentsInRun
&& tierToNumAssigned.computeIfAbsent(tier, t -> 0) < replicationThrottleLimit;
return tierToNumAssigned.getInt(tier) >= tierToMaxAssignments.getOrDefault(tier, replicationThrottleLimit)
|| totalReplicasAssignedInRun >= maxReplicaAssignmentsInRun;
}

public void incrementAssignedReplicas(String tier)
{
++totalReplicasAssignedInRun;
tierToNumAssigned.compute(tier, (t, count) -> (count == null) ? 1 : count + 1);
tierToNumAssigned.addTo(tier, 1);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public static SegmentLoadingConfig create(CoordinatorDynamicConfig dynamicConfig
{
if (dynamicConfig.isSmartSegmentLoading()) {
// Compute replicationThrottleLimit with a lower bound of 100
final int throttlePercentage = 2;
final int throttlePercentage = 5;
final int replicationThrottleLimit = Math.max(100, numUsedSegments * throttlePercentage / 100);
log.info(
"Smart segment loading is enabled. Calculated replicationThrottleLimit[%,d] (%d%% of used segments[%,d]).",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ private int loadReplicas(
final boolean isAlreadyLoadedOnTier = numLoadedReplicas >= 1;

// Do not assign replicas if tier is already busy loading some
if (isAlreadyLoadedOnTier && replicationThrottler.isTierLoadingReplicas(tier)) {
if (isAlreadyLoadedOnTier && replicationThrottler.isReplicationThrottledForTier(tier)) {
return 0;
}

Expand Down Expand Up @@ -543,7 +543,7 @@ private boolean loadSegment(DataSegment segment, ServerHolder server)
private boolean replicateSegment(DataSegment segment, ServerHolder server)
{
final String tier = server.getServer().getTier();
if (!replicationThrottler.canAssignReplica(tier)) {
if (replicationThrottler.isReplicationThrottledForTier(tier)) {
incrementSkipStat(Stats.Segments.ASSIGN_SKIPPED, "Throttled replication", segment, tier);
return false;
}
Expand All @@ -563,22 +563,16 @@ private static ReplicationThrottler createReplicationThrottler(
SegmentLoadingConfig loadingConfig
)
{
final Set<String> tiersLoadingReplicas = new HashSet<>();
final Map<String, Integer> tierToLoadingReplicaCount = new HashMap<>();

cluster.getHistoricals().forEach(
(tier, historicals) -> {
int numLoadingReplicas = historicals.stream().mapToInt(ServerHolder::getNumLoadingReplicas).sum();
if (numLoadingReplicas > 0) {
log.info(
"Tier [%s] will not be assigned replicas as it is already loading [%d] replicas.",
tier, numLoadingReplicas
);
tiersLoadingReplicas.add(tier);
}
tierToLoadingReplicaCount.put(tier, numLoadingReplicas);
}
);
return new ReplicationThrottler(
tiersLoadingReplicas,
tierToLoadingReplicaCount,
loadingConfig.getReplicationThrottleLimit(),
loadingConfig.getMaxReplicaAssignmentsInRun()
);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.server.coordinator.loading;

import com.google.common.collect.ImmutableMap;
import org.junit.Assert;
import org.junit.Test;

public class ReplicationThrottlerTest
{
private static final String TIER_1 = "t1";
private static final String TIER_2 = "t2";

@Test
public void testTierDoesNotViolateThrottleLimit()
{
final int replicationThrottleLimit = 10;
ReplicationThrottler throttler = new ReplicationThrottler(
ImmutableMap.of(),
replicationThrottleLimit,
1000
);

// Verify that both the tiers can be assigned replicas upto the limit
for (int i = 0; i < replicationThrottleLimit; ++i) {
Assert.assertFalse(throttler.isReplicationThrottledForTier(TIER_1));
throttler.incrementAssignedReplicas(TIER_1);

Assert.assertFalse(throttler.isReplicationThrottledForTier(TIER_2));
throttler.incrementAssignedReplicas(TIER_2);
}
}

@Test
public void testTierWithLoadingReplicasDoesNotViolateThrottleLimit()
{
final int replicationThrottleLimit = 10;
ReplicationThrottler throttler = new ReplicationThrottler(
ImmutableMap.of(TIER_1, 10, TIER_2, 7),
replicationThrottleLimit,
1000
);

// T1 cannot be assigned any more replicas
Assert.assertTrue(throttler.isReplicationThrottledForTier(TIER_1));

// T2 can be assigned replicas until it hits the limit
for (int i = 0; i < 3; ++i) {
Assert.assertFalse(throttler.isReplicationThrottledForTier(TIER_2));
throttler.incrementAssignedReplicas(TIER_2);
}
Assert.assertTrue(throttler.isReplicationThrottledForTier(TIER_2));
}

}

0 comments on commit d6565f4

Please sign in to comment.