Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adapt maxSegmentsToMove based on cluster skew #14584

Merged
merged 20 commits into from
Aug 17, 2023
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -542,26 +542,37 @@ private void stopBeingLeader()
}
}

/**
* Resets the balancerExec if required and creates a new BalancerStrategy for
* the current coordinator run.
*/
@VisibleForTesting
protected void initBalancerExecutor()
BalancerStrategy createBalancerStrategy(int balancerComputeThreads)
{
final int currentNumber = getDynamicConfigs().getBalancerComputeThreads();

// Reset balancerExecutor if required
if (balancerExec == null) {
balancerExec = createNewBalancerExecutor(currentNumber);
} else if (cachedBalancerThreadNumber != currentNumber) {
balancerExec = createNewBalancerExecutor(balancerComputeThreads);
} else if (cachedBalancerThreadNumber != balancerComputeThreads) {
log.info(
"balancerComputeThreads has changed from [%d] to [%d], recreating the thread pool.",
cachedBalancerThreadNumber,
currentNumber
"'balancerComputeThreads' has changed from [%d] to [%d]",
cachedBalancerThreadNumber, balancerComputeThreads
);
balancerExec.shutdownNow();
balancerExec = createNewBalancerExecutor(currentNumber);
balancerExec = createNewBalancerExecutor(balancerComputeThreads);
}

// Create BalancerStrategy
final BalancerStrategy balancerStrategy = balancerStrategyFactory.createBalancerStrategy(balancerExec);
log.info(
"Using balancer strategy[%s] with [%d] threads.",
balancerStrategy.getClass().getSimpleName(), balancerComputeThreads
);
return balancerStrategy;
}

private ListeningExecutorService createNewBalancerExecutor(int numThreads)
{
log.info("Creating new balancer executor with [%d] threads.", numThreads);
cachedBalancerThreadNumber = numThreads;
return MoreExecutors.listeningDecorator(
Execs.multiThreaded(numThreads, "coordinator-cost-balancer-%s")
Expand Down Expand Up @@ -750,15 +761,15 @@ public void run()
);

log.info(
"Emitted [%d] stats for group [%s]. All collected stats:%s\n",
"Emitted [%d] stats for group [%s]. All collected stats:%s",
emittedCount.get(), dutyGroupName, allStats.buildStatsTable()
);
}

// Emit the runtime of the full DutiesRunnable
final long runMillis = groupRunTime.stop().elapsed(TimeUnit.MILLISECONDS);
emitStat(Stats.CoordinatorRun.GROUP_RUN_TIME, Collections.emptyMap(), runMillis);
log.info("Finished coordinator run for group [%s] in [%d] ms", dutyGroupName, runMillis);
log.info("Finished coordinator run for group [%s] in [%d] ms.%n", dutyGroupName, runMillis);
}
catch (Exception e) {
log.makeAlert(e, "Caught exception, ignoring so that schedule keeps going.").emit();
Expand Down Expand Up @@ -818,15 +829,8 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
final DruidCluster cluster = prepareCluster(dynamicConfig, segmentLoadingConfig, currentServers);
cancelLoadsOnDecommissioningServers(cluster);

initBalancerExecutor();
final BalancerStrategy balancerStrategy = balancerStrategyFactory.createBalancerStrategy(balancerExec);
log.info(
"Using balancer strategy [%s] with round-robin assignment [%s] and debug dimensions [%s].",
balancerStrategy.getClass().getSimpleName(),
segmentLoadingConfig.isUseRoundRobinSegmentAssignment(),
dynamicConfig.getDebugDimensions()
);

final BalancerStrategy balancerStrategy
= createBalancerStrategy(segmentLoadingConfig.getBalancerComputeThreads());
return params.buildFromExisting()
.withDruidCluster(cluster)
.withBalancerStrategy(balancerStrategy)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,37 @@
*/
public class SegmentCountsPerInterval
{
private final Map<String, Object2IntMap<Interval>>
datasourceIntervalToSegmentCount = new HashMap<>();
private int totalSegments;
private long totalSegmentBytes;
private final Map<String, Object2IntMap<Interval>> datasourceIntervalToSegmentCount = new HashMap<>();
private final Object2IntMap<Interval> intervalToTotalSegmentCount = new Object2IntOpenHashMap<>();
private final Object2IntMap<String> datasourceToTotalSegmentCount = new Object2IntOpenHashMap<>();

public void addSegment(DataSegment segment)
{
updateCountInInterval(segment, 1);
totalSegmentBytes += segment.getSize();
}

public void removeSegment(DataSegment segment)
{
updateCountInInterval(segment, -1);
totalSegmentBytes -= segment.getSize();
}

public int getTotalSegmentCount()
{
return totalSegments;
}

public long getTotalSegmentBytes()
{
return totalSegmentBytes;
}

public Object2IntMap<String> getDatasourceToTotalSegmentCount()
{
return datasourceToTotalSegmentCount;
}

public Object2IntMap<Interval> getIntervalToSegmentCount(String datasource)
Expand All @@ -59,7 +78,9 @@ public Object2IntMap<Interval> getIntervalToTotalSegmentCount()

private void updateCountInInterval(DataSegment segment, int delta)
{
totalSegments += delta;
intervalToTotalSegmentCount.mergeInt(segment.getInterval(), delta, Integer::sum);
datasourceToTotalSegmentCount.mergeInt(segment.getDataSource(), delta, Integer::sum);
datasourceIntervalToSegmentCount
.computeIfAbsent(segment.getDataSource(), ds -> new Object2IntOpenHashMap<>())
.mergeInt(segment.getInterval(), delta, Integer::sum);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,23 @@ public long getMaxSize()
return server.getMaxSize();
}

/**
* Total projected disk usage of this server in bytes.
* <p>
* The total size:
* <ol>
* <li>INCLUDES segments loaded on this server</li>
* <li>INCLUDES segments loading on this server (actions: LOAD/REPLICATE)</li>
* <li>INCLUDES segments moving to this server (action: MOVE_TO)</li>
* <li>INCLUDES segments moving from this server (action: MOVE_FROM). This is
* because these segments have only been <i>marked</i> for drop. We include
* the size of these segments to avoid over-assigning the server in case the
* corresponding MOVE_TO operation gets delayed or fails.</li>
* <li>EXCLUDES segments dropping from this server (action: DROP). Excluding
* these segments cannot result in over-assignment because drops are always
* processed before loads.</li>
* </ol>
*/
public long getSizeUsed()
{
return server.getCurrSize() + sizeOfLoadingSegments - sizeOfDroppingSegments;
Expand Down Expand Up @@ -317,6 +334,11 @@ public int getNumLoadingReplicas()
return loadingReplicaCount;
}

public int getNumQueuedSegments()
{
return queuedSegments.size();
}

public boolean startOperation(SegmentAction action, DataSegment segment)
{
if (queuedSegments.containsKey(segment)) {
Expand Down Expand Up @@ -349,7 +371,7 @@ public boolean cancelOperation(SegmentAction action, DataSegment segment)
}
}

public boolean hasSegmentLoaded(SegmentId segmentId)
private boolean hasSegmentLoaded(SegmentId segmentId)
{
return server.getSegment(segmentId) != null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@ Iterator<ServerHolder> findServersToLoadSegment(
Iterator<ServerHolder> findServersToDropSegment(DataSegment segmentToDrop, List<ServerHolder> serverHolders);

/**
* Returns the stats collected by the strategy in the current run and resets
* the stats collector for the next run.
* Returns the stats collected by the strategy.
*/
CoordinatorRunStats getAndResetStats();
CoordinatorRunStats getStats();
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.PriorityQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

public class CostBalancerStrategy implements BalancerStrategy
Expand All @@ -68,6 +69,7 @@ public class CostBalancerStrategy implements BalancerStrategy
.thenComparing(pair -> pair.rhs);

private final CoordinatorRunStats stats = new CoordinatorRunStats();
private final AtomicLong computeTimeNanos = new AtomicLong(0);

public static double computeJointSegmentsCost(DataSegment segment, Iterable<DataSegment> segmentSet)
{
Expand Down Expand Up @@ -263,9 +265,13 @@ public Iterator<ServerHolder> findServersToDropSegment(
}

@Override
public CoordinatorRunStats getAndResetStats()
public CoordinatorRunStats getStats()
{
return stats.getSnapshotAndReset();
stats.add(
Stats.Balancer.COMPUTATION_TIME,
TimeUnit.NANOSECONDS.toMillis(computeTimeNanos.getAndSet(0))
);
return stats;
}

/**
Expand Down Expand Up @@ -351,8 +357,8 @@ private List<ServerHolder> orderServersByPlacementCost(

// Report computation stats
computeTime.stop();
stats.add(Stats.Balancer.COMPUTATION_COUNT, metricKey, 1);
stats.add(Stats.Balancer.COMPUTATION_TIME, metricKey, computeTime.elapsed(TimeUnit.MILLISECONDS));
stats.add(Stats.Balancer.COMPUTATION_COUNT, 1);
computeTimeNanos.addAndGet(computeTime.elapsed(TimeUnit.NANOSECONDS));

return costPrioritizedServers.stream().map(pair -> pair.rhs)
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public Iterator<ServerHolder> findServersToDropSegment(DataSegment segmentToDrop
}

@Override
public CoordinatorRunStats getAndResetStats()
public CoordinatorRunStats getStats()
{
return CoordinatorRunStats.empty();
}
Expand Down
Loading