Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adapt maxSegmentsToMove based on cluster skew #14584

Merged
merged 20 commits into from
Aug 17, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -549,26 +549,37 @@ private void stopBeingLeader()
}
}

/**
* Resets the balancerExec if required and creates a new BalancerStrategy for
* the current coordinator run.
*/
@VisibleForTesting
protected void initBalancerExecutor()
BalancerStrategy createBalancerStrategy(int balancerComputeThreads)
{
final int currentNumber = getDynamicConfigs().getBalancerComputeThreads();

// Reset balancerExecutor if required
if (balancerExec == null) {
balancerExec = createNewBalancerExecutor(currentNumber);
} else if (cachedBalancerThreadNumber != currentNumber) {
balancerExec = createNewBalancerExecutor(balancerComputeThreads);
} else if (cachedBalancerThreadNumber != balancerComputeThreads) {
log.info(
"balancerComputeThreads has changed from [%d] to [%d], recreating the thread pool.",
cachedBalancerThreadNumber,
currentNumber
"'balancerComputeThreads' has changed from [%d] to [%d]",
cachedBalancerThreadNumber, balancerComputeThreads
);
balancerExec.shutdownNow();
balancerExec = createNewBalancerExecutor(currentNumber);
balancerExec = createNewBalancerExecutor(balancerComputeThreads);
}

// Create BalancerStrategy
final BalancerStrategy balancerStrategy = balancerStrategyFactory.createBalancerStrategy(balancerExec);
log.info(
"Using balancer strategy[%s] with [%d] threads.",
balancerStrategy.getClass().getSimpleName(), balancerComputeThreads
);
return balancerStrategy;
}

private ListeningExecutorService createNewBalancerExecutor(int numThreads)
{
log.info("Creating new balancer executor with [%d] threads.", numThreads);
cachedBalancerThreadNumber = numThreads;
return MoreExecutors.listeningDecorator(
Execs.multiThreaded(numThreads, "coordinator-cost-balancer-%s")
Expand Down Expand Up @@ -826,15 +837,8 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
final DruidCluster cluster = prepareCluster(dynamicConfig, segmentLoadingConfig, currentServers);
cancelLoadsOnDecommissioningServers(cluster);

initBalancerExecutor();
final BalancerStrategy balancerStrategy = balancerStrategyFactory.createBalancerStrategy(balancerExec);
log.info(
"Using balancer strategy [%s] with round-robin assignment [%s] and debug dimensions [%s].",
balancerStrategy.getClass().getSimpleName(),
segmentLoadingConfig.isUseRoundRobinSegmentAssignment(),
dynamicConfig.getDebugDimensions()
);

final BalancerStrategy balancerStrategy
= createBalancerStrategy(segmentLoadingConfig.getBalancerComputeThreads());
return params.buildFromExisting()
.withDruidCluster(cluster)
.withBalancerStrategy(balancerStrategy)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
*/
public class SegmentCountsPerInterval
{
private int totalSegments;
private final Map<String, Object2IntMap<Interval>>
datasourceIntervalToSegmentCount = new HashMap<>();
private final Object2IntMap<Interval> intervalToTotalSegmentCount = new Object2IntOpenHashMap<>();
Expand All @@ -47,6 +48,11 @@ public void removeSegment(DataSegment segment)
updateCountInInterval(segment, -1);
}

public int getTotalSegmentCount()
{
return totalSegments;
}

public Object2IntMap<Interval> getIntervalToSegmentCount(String datasource)
{
return datasourceIntervalToSegmentCount.getOrDefault(datasource, Object2IntMaps.emptyMap());
Expand All @@ -59,6 +65,7 @@ public Object2IntMap<Interval> getIntervalToTotalSegmentCount()

private void updateCountInInterval(DataSegment segment, int delta)
{
totalSegments += delta;
intervalToTotalSegmentCount.mergeInt(segment.getInterval(), delta, Integer::sum);
datasourceIntervalToSegmentCount
.computeIfAbsent(segment.getDataSource(), ds -> new Object2IntOpenHashMap<>())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,11 @@ public boolean isDecommissioning()
return isDecommissioning;
}

public boolean isLoadQueueFull()
{
return totalAssignmentsInRun >= maxAssignmentsInRun;
}

public long getAvailableSize()
{
return getMaxSize() - getSizeUsed();
Expand Down Expand Up @@ -312,6 +317,11 @@ public int getNumLoadingReplicas()
return loadingReplicaCount;
}

public int getNumQueuedSegments()
{
return queuedSegments.size();
}

public boolean startOperation(SegmentAction action, DataSegment segment)
{
if (queuedSegments.containsKey(segment)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.ServerHolder;
import org.apache.druid.server.coordinator.loading.SegmentLoadingConfig;
Expand Down Expand Up @@ -60,13 +61,14 @@

private final List<ServerHolder> activeServers;
private final List<ServerHolder> decommissioningServers;
private final int totalMaxSegmentsToMove;
private final int totalSegmentsToMove;

private final int movingSegmentCount;

public TierSegmentBalancer(
String tier,
Set<ServerHolder> servers,
int maxSegmentsToMove,
DruidCoordinatorRuntimeParams params
)
{
Expand All @@ -75,7 +77,6 @@
this.segmentAssigner = params.getSegmentAssigner();

this.loadingConfig = params.getSegmentLoadingConfig();
this.totalMaxSegmentsToMove = loadingConfig.getMaxSegmentsToMove();
this.runStats = segmentAssigner.getStats();

Map<Boolean, List<ServerHolder>> partitions =
Expand All @@ -84,6 +85,8 @@
this.activeServers = partitions.get(false);

this.movingSegmentCount = activeServers.stream().mapToInt(ServerHolder::getNumMovingSegments).sum();

this.totalSegmentsToMove = getSegmentsToMove(maxSegmentsToMove);
}

public void run()
Expand All @@ -97,16 +100,17 @@
}

log.info(
"Moving max [%d] segments in tier [%s] with [%d] active servers and"
+ " [%d] decommissioning servers. There are [%d] segments already in queue.",
totalMaxSegmentsToMove, tier, activeServers.size(), decommissioningServers.size(), movingSegmentCount
"Moving max [%,d] segments in tier [%s] with [%d] active servers and"
+ " [%d] decommissioning servers. There are already [%,d] segments in queue.",
totalSegmentsToMove, tier, activeServers.size(), decommissioningServers.size(), movingSegmentCount
);
runStats.add(Stats.Segments.MAX_TO_MOVE, RowKey.of(Dimension.TIER, tier), totalSegmentsToMove);

// Move segments from decommissioning to active servers
int movedDecommSegments = 0;
if (!decommissioningServers.isEmpty()) {
int maxDecommPercentToMove = loadingConfig.getPercentDecommSegmentsToMove();
int maxDecommSegmentsToMove = (int) Math.ceil(totalMaxSegmentsToMove * (maxDecommPercentToMove / 100.0));
int maxDecommSegmentsToMove = (int) Math.ceil(totalSegmentsToMove * (maxDecommPercentToMove / 100.0));
movedDecommSegments +=
moveSegmentsFromTo(decommissioningServers, activeServers, maxDecommSegmentsToMove);
log.info(
Expand All @@ -116,7 +120,7 @@
}

// Move segments across active servers
int maxGeneralSegmentsToMove = totalMaxSegmentsToMove - movedDecommSegments;
int maxGeneralSegmentsToMove = totalSegmentsToMove - movedDecommSegments;
int movedGeneralSegments =
moveSegmentsFromTo(activeServers, activeServers, maxGeneralSegmentsToMove);
log.info(
Expand Down Expand Up @@ -221,4 +225,44 @@
runStats.add(Stats.Segments.MOVE_SKIPPED, key, 1);
}

private int getSegmentsToMove(int maxSegmentsToMove)
{
// If smartSegmentLoading is disabled, use the configured value
final CoordinatorDynamicConfig dynamicConfig = params.getCoordinatorDynamicConfig();
if (!dynamicConfig.isSmartSegmentLoading()) {
return maxSegmentsToMove;
}

// Move everything from decommissioning servers
int projectedDecommissioningSegments
= decommissioningServers.stream()
.mapToInt(server -> server.getProjectedSegments().getTotalSegmentCount())
.sum();
Fixed Show fixed Hide fixed

// Measure the skew in disk usage
double maxDiskUsage = 0.0f;
double minDiskUsage = 100.0f;
for (ServerHolder server : activeServers) {
double diskUsage = server.getPercentUsed();
if (diskUsage > maxDiskUsage) {
maxDiskUsage = diskUsage;
}
if (diskUsage < minDiskUsage) {
minDiskUsage = diskUsage;
}
}

// We assume that there is always a usage diff of at least 5%
// to ensure that some segments are always being balanced
double usageDiff = Math.max(10, maxDiskUsage - minDiskUsage);

// At least 100 segments must always be picked for moving
final int segmentsToMove = Math.max(100, (int) (maxSegmentsToMove * usageDiff / 100));
log.info(
"Moving max [%,d] segments in tier [%s] with max usage[%.2f%%] and min usage[%.2f%%]",
segmentsToMove, tier, maxDiskUsage, minDiskUsage
);
return segmentsToMove;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@
package org.apache.druid.server.coordinator.duty;

import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
import org.apache.druid.server.coordinator.DruidCluster;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.balancer.TierSegmentBalancer;
import org.apache.druid.server.coordinator.loading.SegmentLoadingConfig;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;

import java.util.Collection;

/**
*
*/
Expand All @@ -43,19 +46,20 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)

final DruidCluster cluster = params.getDruidCluster();
final SegmentLoadingConfig loadingConfig = params.getSegmentLoadingConfig();
final int maxSegmentsToMove = loadingConfig.getMaxSegmentsToMove();

final int maxSegmentsToMove = getMaxSegmentsToMove(params);
if (maxSegmentsToMove <= 0) {
log.info("Skipping balance as maxSegmentsToMove is [%d].", maxSegmentsToMove);
return params;
} else {
log.info(
"Balancing segments in tiers [%s] with maxSegmentsToMove=[%d], maxLifetime=[%d].",
"Balancing segments in tiers [%s] with maxSegmentsToMove[%,d] and maxLifetime[%d].",
cluster.getTierNames(), maxSegmentsToMove, loadingConfig.getMaxLifetimeInLoadQueue()
);
}

cluster.getHistoricals().forEach(
(tier, servers) -> new TierSegmentBalancer(tier, servers, params).run()
(tier, servers) -> new TierSegmentBalancer(tier, servers, maxSegmentsToMove, params).run()
);

CoordinatorRunStats runStats = params.getCoordinatorStats();
Expand All @@ -66,4 +70,53 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
return params;
}

/**
* Recomputes the value of {@code maxSegmentsToMove} if smart segment loading
* is enabled. {@code maxSegmentsToMove} defines the upper bound, the actual
* number of segments picked for moving is determined by the {@link TierSegmentBalancer}
* based on the level of skew in the tier.
*/
private int getMaxSegmentsToMove(DruidCoordinatorRuntimeParams params)
{
final CoordinatorDynamicConfig dynamicConfig = params.getCoordinatorDynamicConfig();
if (dynamicConfig.isSmartSegmentLoading()) {
final int totalSegmentsInCluster = getTotalSegmentsOnHistoricals(params.getDruidCluster());
final int numHistoricals = getNumHistoricals(params.getDruidCluster());
final int numBalancerThreads = params.getSegmentLoadingConfig().getBalancerComputeThreads();
final int computedMaxSegmentsToMove = SegmentLoadingConfig
.computeMaxSegmentsToMove(totalSegmentsInCluster, numBalancerThreads);
log.info(
"Computed maxSegmentsToMove[%,d] for total [%,d] segments on [%d] historicals.",
computedMaxSegmentsToMove, totalSegmentsInCluster, numHistoricals
);

return computedMaxSegmentsToMove;
} else {
return dynamicConfig.getMaxSegmentsToMove();
}
}

/**
* Total number of all segments in the cluster that would participate in cost
* computations. This includes all replicas of all loaded, loading, dropping
* and moving segments across all active (non-decommissioning) historicals.
* <p>
* This is calculated here to ensure that all assignments done by the preceding
* {@link RunRules} duty are accounted for.
*/
private int getTotalSegmentsOnHistoricals(DruidCluster cluster)
{
return cluster.getHistoricals().values().stream()
.flatMap(Collection::stream)
.filter(server -> !server.isDecommissioning())
.mapToInt(server -> server.getServer().getNumSegments() + server.getNumQueuedSegments())
.sum();
}

private int getNumHistoricals(DruidCluster cluster)
{
return cluster.getHistoricals().values().stream()
.mapToInt(Collection::size).sum();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ private void handleUnusedSegmentsForServer(
if (!usedSegments.contains(segment)
&& loadQueueManager.dropSegment(segment, serverHolder)) {
totalUnneededCount++;
log.info(
log.debug(
"Dropping uneeded segment [%s] from server [%s] in tier [%s]",
segment.getId(), server.getName(), server.getTier()
);
Expand Down
Loading
Loading