Skip to content

Commit

Permalink
Discard changes to server/src/main/java/org/apache/druid/server/coord…
Browse files Browse the repository at this point in the history
…inator/CostBalancerStrategy.java
  • Loading branch information
317brian authored Jun 13, 2023
1 parent 934948c commit 72f1699
Showing 1 changed file with 27 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.commons.math3.util.FastMath;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Interval;
Expand All @@ -36,6 +37,8 @@
import java.util.List;
import java.util.NavigableSet;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

public class CostBalancerStrategy implements BalancerStrategy
Expand Down Expand Up @@ -226,7 +229,7 @@ public Iterator<ServerHolder> pickServersToDrop(DataSegment toDrop, NavigableSet

try {
// results is an un-ordered list of a pair consisting of the 'cost' of a segment being on a server and the server
List<Pair<Double, ServerHolder>> results = resultsFuture.get();
List<Pair<Double, ServerHolder>> results = resultsFuture.get(1, TimeUnit.MINUTES);
return results.stream()
// Comparator.comapringDouble will order by lowest cost...
// reverse it because we want to drop from the highest cost servers first
Expand All @@ -235,7 +238,7 @@ public Iterator<ServerHolder> pickServersToDrop(DataSegment toDrop, NavigableSet
.iterator();
}
catch (Exception e) {
log.makeAlert(e, "Cost Balancer Multithread strategy wasn't able to complete cost computation.").emit();
alertOnFailure(e, "pick drop server");
}
return Collections.emptyIterator();
}
Expand Down Expand Up @@ -298,10 +301,7 @@ public void emitStats(String tier, CoordinatorStats stats, List<ServerHolder> se

log.info(
"[%s]: Initial Total Cost: [%f], Normalization: [%f], Initial Normalized Cost: [%f]",
tier,
initialTotalCost,
normalization,
normalizedInitialCost
tier, initialTotalCost, normalization, normalizedInitialCost
);
}

Expand Down Expand Up @@ -373,7 +373,7 @@ protected Pair<Double, ServerHolder> chooseBestServer(
final List<Pair<Double, ServerHolder>> bestServers = new ArrayList<>();
bestServers.add(bestServer);
try {
for (Pair<Double, ServerHolder> server : resultsFuture.get()) {
for (Pair<Double, ServerHolder> server : resultsFuture.get(1, TimeUnit.MINUTES)) {
if (server.lhs <= bestServers.get(0).lhs) {
if (server.lhs < bestServers.get(0).lhs) {
bestServers.clear();
Expand All @@ -390,9 +390,28 @@ protected Pair<Double, ServerHolder> chooseBestServer(
bestServer = bestServers.get(ThreadLocalRandom.current().nextInt(bestServers.size()));
}
catch (Exception e) {
log.makeAlert(e, "Cost Balancer Multithread strategy wasn't able to complete cost computation.").emit();
alertOnFailure(e, "choose best load server");
}
return bestServer;
}

private void alertOnFailure(Exception e, String action)
{
// Do not alert if the executor has been shutdown
if (exec.isShutdown()) {
log.noStackTrace().info("Balancer executor was terminated. Failing action [%s].", action);
return;
}

final boolean hasTimedOut = e instanceof TimeoutException;

final String message = StringUtils.format(
"Cost balancer strategy %s in action [%s].%s",
hasTimedOut ? "timed out" : "failed", action,
hasTimedOut ? " Try setting a higher value of 'balancerComputeThreads'." : ""
);
log.makeAlert(e, message).emit();
}

}

0 comments on commit 72f1699

Please sign in to comment.