Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New balancer strategy: sortingCost #13254

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.server.coordinator;

import com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.druid.client.ServerInventoryView;
import org.apache.druid.server.coordinator.balancer.ClusterCostComputer;
import org.apache.druid.server.coordinator.balancer.CostBalancerStrategy;
import org.apache.druid.server.coordinator.loading.LoadQueueTaskMaster;
import org.apache.druid.timeline.DataSegment;

/**
* This cost strategy relies on dividing segments into time buckets
* The intervals within a bucket are sorted to allow for faster cost computation
* This strategy is based on ideas utilized in CachingCostBalancerStrategy
* However, CachingCostBalancerStrategy has 2 major issues which this strategy tries to fix:
* 1) The value / decisions may differ when segments of multiple granularities are present
* 2) A cache is slow to build : O(N ^ 2)
* This strategy tries to fix it while also being as just as fast in computing the cost
*/
public class SortingCostBalancerStrategy extends CostBalancerStrategy
{
private final ClusterCostComputer costComputer;

public SortingCostBalancerStrategy(
ServerInventoryView serverInventoryView,
LoadQueueTaskMaster loadQueueTaskMaster,
ListeningExecutorService exec
)
{
super(exec);
costComputer = new ClusterCostComputer(serverInventoryView, loadQueueTaskMaster);
}

@Override
protected double computePlacementCost(DataSegment segment, ServerHolder server)
{
final String serverName = server.getServer().getName();
double cost = costComputer.computeCost(serverName, segment);
if (server.isProjectedSegment(segment)) {
cost -= intervalCost(segment.getInterval(), segment.getInterval());
}
return cost;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.server.coordinator;

import com.fasterxml.jackson.annotation.JacksonInject;
import org.apache.druid.client.ServerInventoryView;
import org.apache.druid.server.coordinator.balancer.CostBalancerStrategy;
import org.apache.druid.server.coordinator.balancer.CostBalancerStrategyFactory;
import org.apache.druid.server.coordinator.loading.LoadQueueTaskMaster;

public class SortingCostBalancerStrategyFactory extends CostBalancerStrategyFactory
{
private final ServerInventoryView serverInventoryView;
private final LoadQueueTaskMaster loadQueueTaskMaster;


public SortingCostBalancerStrategyFactory(
@JacksonInject ServerInventoryView serverInventoryView,
@JacksonInject LoadQueueTaskMaster loadQueueTaskMaster
)
{
this.serverInventoryView = serverInventoryView;
this.loadQueueTaskMaster = loadQueueTaskMaster;
}

@Override
public CostBalancerStrategy createBalancerStrategy(final int numBalancerThreads)
{
return new SortingCostBalancerStrategy(
serverInventoryView,
loadQueueTaskMaster,
getOrCreateBalancerExecutor(1)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.coordinator.SortingCostBalancerStrategyFactory;

@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "strategy", defaultImpl = CostBalancerStrategyFactory.class)
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "cost", value = CostBalancerStrategyFactory.class),
@JsonSubTypes.Type(name = "cachingCost", value = DisabledCachingCostBalancerStrategyFactory.class),
@JsonSubTypes.Type(name = "sortingCost", value = SortingCostBalancerStrategyFactory.class),
@JsonSubTypes.Type(name = "diskNormalized", value = DiskNormalizedCostBalancerStrategyFactory.class),
@JsonSubTypes.Type(name = "random", value = RandomBalancerStrategyFactory.class)
})
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.server.coordinator.balancer;

import org.apache.druid.client.DruidServer;
import org.apache.druid.client.ServerInventoryView;
import org.apache.druid.server.coordinator.loading.LoadQueueTaskMaster;
import org.apache.druid.timeline.DataSegment;

import java.util.HashMap;
import java.util.Map;

/**
* Maintain ServerCostComputers per historical
*/
public class ClusterCostComputer
{
private final Map<String, ServerCostComputer> serverCostComputerMap = new HashMap<>();

public ClusterCostComputer(ServerInventoryView serverInventoryView, LoadQueueTaskMaster loadQueueTaskMaster)
{
for (DruidServer server : serverInventoryView.getInventory()) {
serverCostComputerMap.put(
server.getName(),
new ServerCostComputer(server, loadQueueTaskMaster.getPeonForServer(server.toImmutableDruidServer()))
);
}
}

public double computeCost(String serverName, DataSegment dataSegment)
{
ServerCostComputer serverCostCache = serverCostComputerMap.get(serverName);
return (serverCostCache != null) ? serverCostCache.computeCost(dataSegment) : 0.0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,12 @@ public class CostBalancerStrategy implements BalancerStrategy
* Comparator that prioritizes servers by cost. Cheaper servers come before
* costlier servers. Servers with the same cost may appear in a random order.
*/
private static final Comparator<Pair<Double, ServerHolder>> CHEAPEST_SERVERS_FIRST
protected static final Comparator<Pair<Double, ServerHolder>> CHEAPEST_SERVERS_FIRST
= Comparator.<Pair<Double, ServerHolder>, Double>comparing(pair -> pair.lhs)
.thenComparing(pair -> pair.rhs);

private final CoordinatorRunStats stats = new CoordinatorRunStats();
private final AtomicLong computeTimeNanos = new AtomicLong(0);
protected final CoordinatorRunStats stats = new CoordinatorRunStats();
protected final AtomicLong computeTimeNanos = new AtomicLong(0);

public static double computeJointSegmentsCost(DataSegment segment, Iterable<DataSegment> segmentSet)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.server.coordinator.balancer;

import org.apache.druid.client.DruidDataSource;
import org.apache.druid.client.DruidServer;
import org.apache.druid.server.coordinator.loading.LoadQueuePeon;
import org.apache.druid.timeline.DataSegment;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

/**
* Maintains the sorting cost computers on a server level
*/
public class ServerCostComputer
{
private final SortingCostComputer serverComputer;
private final Map<String, SortingCostComputer> computerPerDatasource = new HashMap<>();

ServerCostComputer(
DruidServer server,
LoadQueuePeon peon
)
{
Set<DataSegment> segments = new HashSet<>();
Map<String, Set<DataSegment>> datasourceSegments = new HashMap<>();
if (peon != null) {
for (DataSegment segment : peon.getSegmentsToLoad()) {
segments.add(segment);
String dsName = segment.getDataSource();
datasourceSegments.computeIfAbsent(dsName, ds -> new HashSet<>()).add(segment);
}
}
for (DruidDataSource dataSource : server.getDataSources()) {
final String dsName = dataSource.getName();
datasourceSegments.computeIfAbsent(dsName, ds -> new HashSet<>())
.addAll(dataSource.getSegments());
computerPerDatasource.put(
dsName,
new SortingCostComputer(datasourceSegments.get(dsName))
);
segments.addAll(dataSource.getSegments());
}
serverComputer = new SortingCostComputer(segments);
}

double computeCost(DataSegment segment)
{
return serverComputer.cost(segment) + computeDataSourceCost(segment);
}

private double computeDataSourceCost(DataSegment segment)
{
SortingCostComputer datasourceComputer = computerPerDatasource.get(segment.getDataSource());
return (datasourceComputer == null) ? 0.0 : datasourceComputer.cost(segment);
}
}
Loading
Loading