From d6565f46b00108e008d492cc1df8a58559ea4cd2 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Mon, 28 Aug 2023 18:20:22 +0530 Subject: [PATCH] Increase the computed value of replicationThrottleLimit (#14913) Changes - Increase value of `replicationThrottleLimit` computed by `smartSegmentLoading` from 2% to 5% of total number of used segments. - Assign replicas to a tier even when some replicas are already being loaded in that tier - Limit the total number of replicas in load queue at start of run + replica assignments in the run to the `replicationThrottleLimit`. i.e. for every tier, num loading replicas at start of run + num replicas assigned in run <= replicationThrottleLimit --- docs/configuration/index.md | 2 +- .../loading/ReplicationThrottler.java | 43 +++++------ .../loading/SegmentLoadingConfig.java | 2 +- .../loading/StrategicSegmentAssigner.java | 16 ++--- .../loading/ReplicationThrottlerTest.java | 72 +++++++++++++++++++ 5 files changed, 101 insertions(+), 34 deletions(-) create mode 100644 server/src/test/java/org/apache/druid/server/coordinator/loading/ReplicationThrottlerTest.java diff --git a/docs/configuration/index.md b/docs/configuration/index.md index b16e676913dd..da8ce4235b3c 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -984,7 +984,7 @@ Druid computes the values to optimize Coordinator performance, based on the curr |--------|--------------|-----------| |`useRoundRobinSegmentAssignment`|true|Speeds up segment assignment.| |`maxSegmentsInNodeLoadingQueue`|0|Removes the limit on load queue size.| -|`replicationThrottleLimit`|2% of used segments, minimum value 100|Prevents aggressive replication when a historical disappears only intermittently.| +|`replicationThrottleLimit`|5% of used segments, minimum value 100|Prevents aggressive replication when a historical disappears only intermittently.| |`replicantLifetime`|60|Allows segments to wait about an hour (assuming a Coordinator period of 1 minute) in the load queue before an alert is raised. In `smartSegmentLoading` mode, load queues are not limited by size. Segments might therefore assigned to a load queue even if the corresponding server is slow to load them.| |`maxNonPrimaryReplicantsToLoad`|`Integer.MAX_VALUE` (no limit)|This throttling is already handled by `replicationThrottleLimit`.| |`maxSegmentsToMove`|2% of used segments, minimum value 100, maximum value 1000|Ensures that some segments are always moving in the cluster to keep it well balanced. The maximum value keeps the Coordinator run times bounded.| diff --git a/server/src/main/java/org/apache/druid/server/coordinator/loading/ReplicationThrottler.java b/server/src/main/java/org/apache/druid/server/coordinator/loading/ReplicationThrottler.java index 988a181707a1..55f5143f31b3 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/loading/ReplicationThrottler.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/loading/ReplicationThrottler.java @@ -19,12 +19,10 @@ package org.apache.druid.server.coordinator.loading; +import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap; import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; -import java.util.HashMap; -import java.util.HashSet; import java.util.Map; -import java.util.Set; /** * The ReplicationThrottler is used to throttle the number of segment replicas @@ -43,24 +41,27 @@ public class ReplicationThrottler private final int replicationThrottleLimit; private final int maxReplicaAssignmentsInRun; - private final Map tierToNumAssigned = new HashMap<>(); - private final Set tiersLoadingReplicas = new HashSet<>(); + private final Object2IntOpenHashMap tierToNumAssigned = new Object2IntOpenHashMap<>(); + private final Object2IntOpenHashMap tierToMaxAssignments = new Object2IntOpenHashMap<>(); private int totalReplicasAssignedInRun; /** * Creates a new ReplicationThrottler for use during a single coordinator run. + * The number of replicas loading on a tier must always be within the current + * {@code replicationThrottleLimit}. Thus, if a tier was already loading {@code k} + * replicas at the start of a coordinator run, it may be assigned only + * {@code replicationThrottleLimit - k} more replicas during the run. * - * @param tiersLoadingReplicas Set of tier names which are already loading - * replicas and will not be eligible for loading - * more replicas in this run. + * @param tierToLoadingReplicaCount Map from tier name to number of replicas + * already being loaded. * @param replicationThrottleLimit Maximum number of replicas that can be * assigned to a single tier in the current run. * @param maxReplicaAssignmentsInRun Max number of total replicas that can be * assigned across all tiers in the current run. */ public ReplicationThrottler( - Set tiersLoadingReplicas, + Map tierToLoadingReplicaCount, int replicationThrottleLimit, int maxReplicaAssignmentsInRun ) @@ -68,27 +69,27 @@ public ReplicationThrottler( this.replicationThrottleLimit = replicationThrottleLimit; this.maxReplicaAssignmentsInRun = maxReplicaAssignmentsInRun; this.totalReplicasAssignedInRun = 0; - if (tiersLoadingReplicas != null) { - this.tiersLoadingReplicas.addAll(tiersLoadingReplicas); - } - } - public boolean isTierLoadingReplicas(String tier) - { - return tiersLoadingReplicas.contains(tier); + if (tierToLoadingReplicaCount != null) { + tierToLoadingReplicaCount.forEach( + (tier, numLoadingReplicas) -> tierToMaxAssignments.addTo( + tier, + Math.max(0, replicationThrottleLimit - numLoadingReplicas) + ) + ); + } } - public boolean canAssignReplica(String tier) + public boolean isReplicationThrottledForTier(String tier) { - return !tiersLoadingReplicas.contains(tier) - && totalReplicasAssignedInRun < maxReplicaAssignmentsInRun - && tierToNumAssigned.computeIfAbsent(tier, t -> 0) < replicationThrottleLimit; + return tierToNumAssigned.getInt(tier) >= tierToMaxAssignments.getOrDefault(tier, replicationThrottleLimit) + || totalReplicasAssignedInRun >= maxReplicaAssignmentsInRun; } public void incrementAssignedReplicas(String tier) { ++totalReplicasAssignedInRun; - tierToNumAssigned.compute(tier, (t, count) -> (count == null) ? 1 : count + 1); + tierToNumAssigned.addTo(tier, 1); } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentLoadingConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentLoadingConfig.java index 25159cc2eb78..d1f01043ba24 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentLoadingConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentLoadingConfig.java @@ -48,7 +48,7 @@ public static SegmentLoadingConfig create(CoordinatorDynamicConfig dynamicConfig { if (dynamicConfig.isSmartSegmentLoading()) { // Compute replicationThrottleLimit with a lower bound of 100 - final int throttlePercentage = 2; + final int throttlePercentage = 5; final int replicationThrottleLimit = Math.max(100, numUsedSegments * throttlePercentage / 100); log.info( "Smart segment loading is enabled. Calculated replicationThrottleLimit[%,d] (%d%% of used segments[%,d]).", diff --git a/server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java b/server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java index 8bb82c5cdf69..0cc57db98809 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java @@ -497,7 +497,7 @@ private int loadReplicas( final boolean isAlreadyLoadedOnTier = numLoadedReplicas >= 1; // Do not assign replicas if tier is already busy loading some - if (isAlreadyLoadedOnTier && replicationThrottler.isTierLoadingReplicas(tier)) { + if (isAlreadyLoadedOnTier && replicationThrottler.isReplicationThrottledForTier(tier)) { return 0; } @@ -543,7 +543,7 @@ private boolean loadSegment(DataSegment segment, ServerHolder server) private boolean replicateSegment(DataSegment segment, ServerHolder server) { final String tier = server.getServer().getTier(); - if (!replicationThrottler.canAssignReplica(tier)) { + if (replicationThrottler.isReplicationThrottledForTier(tier)) { incrementSkipStat(Stats.Segments.ASSIGN_SKIPPED, "Throttled replication", segment, tier); return false; } @@ -563,22 +563,16 @@ private static ReplicationThrottler createReplicationThrottler( SegmentLoadingConfig loadingConfig ) { - final Set tiersLoadingReplicas = new HashSet<>(); + final Map tierToLoadingReplicaCount = new HashMap<>(); cluster.getHistoricals().forEach( (tier, historicals) -> { int numLoadingReplicas = historicals.stream().mapToInt(ServerHolder::getNumLoadingReplicas).sum(); - if (numLoadingReplicas > 0) { - log.info( - "Tier [%s] will not be assigned replicas as it is already loading [%d] replicas.", - tier, numLoadingReplicas - ); - tiersLoadingReplicas.add(tier); - } + tierToLoadingReplicaCount.put(tier, numLoadingReplicas); } ); return new ReplicationThrottler( - tiersLoadingReplicas, + tierToLoadingReplicaCount, loadingConfig.getReplicationThrottleLimit(), loadingConfig.getMaxReplicaAssignmentsInRun() ); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/loading/ReplicationThrottlerTest.java b/server/src/test/java/org/apache/druid/server/coordinator/loading/ReplicationThrottlerTest.java new file mode 100644 index 000000000000..4e1de51d36ab --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/coordinator/loading/ReplicationThrottlerTest.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator.loading; + +import com.google.common.collect.ImmutableMap; +import org.junit.Assert; +import org.junit.Test; + +public class ReplicationThrottlerTest +{ + private static final String TIER_1 = "t1"; + private static final String TIER_2 = "t2"; + + @Test + public void testTierDoesNotViolateThrottleLimit() + { + final int replicationThrottleLimit = 10; + ReplicationThrottler throttler = new ReplicationThrottler( + ImmutableMap.of(), + replicationThrottleLimit, + 1000 + ); + + // Verify that both the tiers can be assigned replicas upto the limit + for (int i = 0; i < replicationThrottleLimit; ++i) { + Assert.assertFalse(throttler.isReplicationThrottledForTier(TIER_1)); + throttler.incrementAssignedReplicas(TIER_1); + + Assert.assertFalse(throttler.isReplicationThrottledForTier(TIER_2)); + throttler.incrementAssignedReplicas(TIER_2); + } + } + + @Test + public void testTierWithLoadingReplicasDoesNotViolateThrottleLimit() + { + final int replicationThrottleLimit = 10; + ReplicationThrottler throttler = new ReplicationThrottler( + ImmutableMap.of(TIER_1, 10, TIER_2, 7), + replicationThrottleLimit, + 1000 + ); + + // T1 cannot be assigned any more replicas + Assert.assertTrue(throttler.isReplicationThrottledForTier(TIER_1)); + + // T2 can be assigned replicas until it hits the limit + for (int i = 0; i < 3; ++i) { + Assert.assertFalse(throttler.isReplicationThrottledForTier(TIER_2)); + throttler.incrementAssignedReplicas(TIER_2); + } + Assert.assertTrue(throttler.isReplicationThrottledForTier(TIER_2)); + } + +}