Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix capacity response in mm-less ingestion #14888

Merged
merged 4 commits into from
Aug 25, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -396,12 +396,6 @@ public Map<String, Long> getBlacklistedTaskSlotCount()
return Collections.emptyMap();
}

@Override
public boolean isK8sTaskRunner()
{
return true;
}

@Override
public void unregisterListener(String listenerId)
{
Expand Down Expand Up @@ -457,4 +451,16 @@ public RunnerTaskState getRunnerTaskState(String taskId)

return workItem.getRunnerTaskState();
}

@Override
public int getTotalCapacity()
{
return config.getCapacity();
}

@Override
public int getUsedCapacity()
{
return tasks.size();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -627,4 +627,22 @@ public TaskLocation getLocation()

verifyAll();
}

@Test
public void test_getTotalCapacity()
{
Assert.assertEquals(1, runner.getTotalCapacity());
}

@Test
public void test_getUsedCapacity()
{
Assert.assertEquals(0, runner.getUsedCapacity());
KubernetesWorkItem workItem = new KubernetesWorkItem(task, null);
runner.tasks.put(task.getId(), workItem);
Assert.assertEquals(1, runner.getUsedCapacity());
runner.tasks.remove(task.getId());
Assert.assertEquals(0, runner.getUsedCapacity());

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1639,4 +1639,16 @@ public Map<String, Long> getBlacklistedTaskSlotCount()

return totalBlacklistedPeons;
}

@Override
public int getTotalCapacity()
{
return getWorkers().stream().mapToInt(workerInfo -> workerInfo.getWorker().getCapacity()).sum();
}

@Override
public int getUsedCapacity()
{
return getWorkers().stream().mapToInt(ImmutableWorkerInfo::getCurrCapacityUsed).sum();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,14 +136,6 @@ default TaskLocation getTaskLocation(String taskId)

Map<String, Long> getBlacklistedTaskSlotCount();

/**
* Beacause the k8s task runner is an extension, we need to know the task runner type in the overlord resource
*/
default boolean isK8sTaskRunner()
{
return false;
}

default void updateStatus(Task task, TaskStatus status)
{
// do nothing
Expand All @@ -154,5 +146,14 @@ default void updateLocation(Task task, TaskLocation location)
// do nothing
}

default int getTotalCapacity()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Best add javadocs to these methods esp to point out that they can return -1 when not implemented or can't be determined.

{
return -1;
}

default int getUsedCapacity()
{
return -1;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -1778,6 +1778,18 @@ public Map<String, Long> getBlacklistedTaskSlotCount()
return totalBlacklistedPeons;
}

@Override
public int getTotalCapacity()
{
return getWorkers().stream().mapToInt(workerInfo -> workerInfo.getWorker().getCapacity()).sum();
}

@Override
public int getUsedCapacity()
{
return getWorkers().stream().mapToInt(ImmutableWorkerInfo::getCurrCapacityUsed).sum();
}

private static class HttpRemoteTaskRunnerWorkItem extends RemoteTaskRunnerWorkItem
{
enum State
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@
import org.apache.druid.audit.AuditInfo;
import org.apache.druid.audit.AuditManager;
import org.apache.druid.client.indexing.ClientTaskQuery;
import org.apache.druid.client.indexing.IndexingWorker;
import org.apache.druid.client.indexing.IndexingWorkerInfo;
import org.apache.druid.common.config.ConfigManager.SetResult;
import org.apache.druid.common.config.JacksonConfigManager;
import org.apache.druid.common.exception.DruidException;
Expand Down Expand Up @@ -469,27 +467,17 @@ public Response getWorkerConfig()
public Response getTotalWorkerCapacity()
{
// Calculate current cluster capacity
int currentCapacity;
Optional<TaskRunner> taskRunnerOptional = taskMaster.getTaskRunner();
if (!taskRunnerOptional.isPresent()) {
// Cannot serve call as not leader
return Response.status(Response.Status.SERVICE_UNAVAILABLE).build();
}
TaskRunner taskRunner = taskRunnerOptional.get();
Collection<ImmutableWorkerInfo> workers;
if (taskRunner instanceof WorkerTaskRunner) {
workers = ((WorkerTaskRunner) taskRunner).getWorkers();
currentCapacity = workers.stream().mapToInt(workerInfo -> workerInfo.getWorker().getCapacity()).sum();
} else {
log.debug(
"Cannot calculate capacity as task runner [%s] of type [%s] does not support listing workers",
taskRunner,
taskRunner.getClass().getName()
);
workers = ImmutableList.of();
currentCapacity = -1;
}
Collection<ImmutableWorkerInfo> workers = taskRunner instanceof WorkerTaskRunner ?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need the list of workers?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for capacity with autoscaling, it looks like if capacity hint is not passed it tries to assume each worker has the same capacity when calculating max capacity, so it needs at least a single worker as an example.

((WorkerTaskRunner) taskRunner).getWorkers() : ImmutableList.of();

int currentCapacity = taskRunner.getTotalCapacity();
int usedCapacity = taskRunner.getUsedCapacity();
// Calculate maximum capacity with auto scale
int maximumCapacity;
if (workerConfigRef == null) {
Expand Down Expand Up @@ -520,7 +508,7 @@ public Response getTotalWorkerCapacity()
);
maximumCapacity = -1;
}
return Response.ok(new TotalWorkerCapacityResponse(currentCapacity, maximumCapacity)).build();
return Response.ok(new TotalWorkerCapacityResponse(currentCapacity, maximumCapacity, usedCapacity)).build();
}

// default value is used for backwards compatibility
Expand Down Expand Up @@ -939,24 +927,6 @@ public Response apply(TaskRunner taskRunner)
{
if (taskRunner instanceof WorkerTaskRunner) {
return Response.ok(((WorkerTaskRunner) taskRunner).getWorkers()).build();
} else if (taskRunner.isK8sTaskRunner()) {
// required because kubernetes task runner has no concept of a worker, so returning a dummy worker.
return Response.ok(ImmutableList.of(
new IndexingWorkerInfo(
new IndexingWorker(
"http",
"host",
"8100",
taskRunner.getTotalTaskSlotCount().getOrDefault("taskQueue", 0L).intValue(),
"version"
),
0,
Collections.emptySet(),
Collections.emptyList(),
DateTimes.EPOCH,
null
)
)).build();
} else {
log.debug(
"Task runner [%s] of type [%s] does not support listing workers",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,22 @@ public class TotalWorkerCapacityResponse
* This can be -1 if it cannot be determined or if auto scaling is not configured.
*/
private final int maximumCapacityWithAutoScale;
/**
* Used cluster capacity of the current state of the cluster. This can be -1 if
* it cannot be determined.
*/
private final int usedClusterCapacity;

@JsonCreator
public TotalWorkerCapacityResponse(
@JsonProperty("currentClusterCapacity") int currentClusterCapacity,
@JsonProperty("maximumCapacityWithAutoScale") int maximumCapacityWithAutoScale
@JsonProperty("maximumCapacityWithAutoScale") int maximumCapacityWithAutoScale,
@JsonProperty("usedClusterCapacity") int usedClusterCapacity
)
{
this.currentClusterCapacity = currentClusterCapacity;
this.maximumCapacityWithAutoScale = maximumCapacityWithAutoScale;
this.usedClusterCapacity = usedClusterCapacity;
}

@JsonProperty
Expand All @@ -59,4 +66,10 @@ public int getMaximumCapacityWithAutoScale()
{
return maximumCapacityWithAutoScale;
}

@JsonProperty
public int getUsedClusterCapacity()
{
return usedClusterCapacity;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,9 @@ public void testRun() throws Exception
Assert.assertEquals(3, remoteTaskRunner.getTotalTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
Assert.assertEquals(3, remoteTaskRunner.getIdleTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
Assert.assertEquals(0, remoteTaskRunner.getUsedTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
Assert.assertEquals(3, remoteTaskRunner.getTotalCapacity());
Assert.assertEquals(0, remoteTaskRunner.getUsedCapacity());


ListenableFuture<TaskStatus> result = remoteTaskRunner.run(task);

Expand All @@ -164,6 +167,8 @@ public void testRun() throws Exception
Assert.assertEquals(3, remoteTaskRunner.getTotalTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
Assert.assertEquals(3, remoteTaskRunner.getIdleTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
Assert.assertEquals(0, remoteTaskRunner.getUsedTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
Assert.assertEquals(3, remoteTaskRunner.getTotalCapacity());
Assert.assertEquals(0, remoteTaskRunner.getUsedCapacity());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip2", 2, "0", Wo

Assert.assertEquals(numTasks, taskRunner.getKnownTasks().size());
Assert.assertEquals(numTasks, taskRunner.getCompletedTasks().size());
Assert.assertEquals(4, taskRunner.getTotalCapacity());
Assert.assertEquals(0, taskRunner.getUsedCapacity());
}

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1331,6 +1331,8 @@ public void testGetTotalWorkerCapacityWithUnknown()
WorkerBehaviorConfig workerBehaviorConfig = EasyMock.createMock(WorkerBehaviorConfig.class);
AtomicReference<WorkerBehaviorConfig> workerBehaviorConfigAtomicReference = new AtomicReference<>(workerBehaviorConfig);
EasyMock.expect(configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class)).andReturn(workerBehaviorConfigAtomicReference);
EasyMock.expect(taskRunner.getTotalCapacity()).andReturn(-1);
EasyMock.expect(taskRunner.getUsedCapacity()).andReturn(-1);
EasyMock.replay(
taskRunner,
taskMaster,
Expand All @@ -1344,6 +1346,7 @@ public void testGetTotalWorkerCapacityWithUnknown()
final Response response = overlordResource.getTotalWorkerCapacity();
Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus());
Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) response.getEntity()).getCurrentClusterCapacity());
Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) response.getEntity()).getUsedClusterCapacity());
Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) response.getEntity()).getMaximumCapacityWithAutoScale());
}

Expand All @@ -1352,6 +1355,8 @@ public void testGetTotalWorkerCapacityWithWorkerTaskRunnerButWorkerBehaviorConfi
{
AtomicReference<WorkerBehaviorConfig> workerBehaviorConfigAtomicReference = new AtomicReference<>(null);
EasyMock.expect(configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class)).andReturn(workerBehaviorConfigAtomicReference);
EasyMock.expect(taskRunner.getTotalCapacity()).andReturn(-1);
EasyMock.expect(taskRunner.getUsedCapacity()).andReturn(-1);
EasyMock.replay(
taskRunner,
taskMaster,
Expand All @@ -1365,6 +1370,7 @@ public void testGetTotalWorkerCapacityWithWorkerTaskRunnerButWorkerBehaviorConfi
final Response response = overlordResource.getTotalWorkerCapacity();
Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus());
Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) response.getEntity()).getCurrentClusterCapacity());
Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) response.getEntity()).getUsedClusterCapacity());
Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) response.getEntity()).getMaximumCapacityWithAutoScale());
}

Expand All @@ -1374,6 +1380,8 @@ public void testGetTotalWorkerCapacityWithWorkerTaskRunnerButAutoScaleNotConfigu
DefaultWorkerBehaviorConfig workerBehaviorConfig = new DefaultWorkerBehaviorConfig(null, null);
AtomicReference<WorkerBehaviorConfig> workerBehaviorConfigAtomicReference = new AtomicReference<>(workerBehaviorConfig);
EasyMock.expect(configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class)).andReturn(workerBehaviorConfigAtomicReference);
EasyMock.expect(taskRunner.getTotalCapacity()).andReturn(-1);
EasyMock.expect(taskRunner.getUsedCapacity()).andReturn(-1);
EasyMock.replay(
taskRunner,
taskMaster,
Expand All @@ -1387,6 +1395,7 @@ public void testGetTotalWorkerCapacityWithWorkerTaskRunnerButAutoScaleNotConfigu
final Response response = overlordResource.getTotalWorkerCapacity();
Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus());
Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) response.getEntity()).getCurrentClusterCapacity());
Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) response.getEntity()).getUsedClusterCapacity());
Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) response.getEntity()).getMaximumCapacityWithAutoScale());
}

Expand All @@ -1408,6 +1417,9 @@ public void testGetTotalWorkerCapacityWithAutoScaleConfiguredAndProvisioningStra
)
);
EasyMock.expect(workerTaskRunner.getWorkers()).andReturn(workerInfos);
EasyMock.expect(workerTaskRunner.getTotalCapacity()).andReturn(expectedWorkerCapacity);
EasyMock.expect(workerTaskRunner.getUsedCapacity()).andReturn(0);

EasyMock.reset(taskMaster);
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(
Optional.of(workerTaskRunner)
Expand Down Expand Up @@ -1435,27 +1447,32 @@ public void testGetTotalWorkerCapacityWithAutoScaleConfiguredAndProvisioningStra
final Response response = overlordResource.getTotalWorkerCapacity();
Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus());
Assert.assertEquals(expectedWorkerCapacity, ((TotalWorkerCapacityResponse) response.getEntity()).getCurrentClusterCapacity());
Assert.assertEquals(0, ((TotalWorkerCapacityResponse) response.getEntity()).getUsedClusterCapacity());
Assert.assertEquals(expectedWorkerCapacity * maxNumWorkers, ((TotalWorkerCapacityResponse) response.getEntity()).getMaximumCapacityWithAutoScale());
}

@Test
public void testGetTotalWorkerCapacityWithAutoScaleConfiguredAndProvisioningStrategyNotSupportExpectedWorkerCapacity()
{
int invalidExpectedCapacity = -1;
int currentTotalCapacity = 3;
int currentCapacityUsed = 2;
int maxNumWorkers = 2;
WorkerTaskRunner workerTaskRunner = EasyMock.createMock(WorkerTaskRunner.class);
Collection<ImmutableWorkerInfo> workerInfos = ImmutableList.of(
new ImmutableWorkerInfo(
new Worker(
"http", "testWorker", "192.0.0.1", 3, "v1", WorkerConfig.DEFAULT_CATEGORY
"http", "testWorker", "192.0.0.1", currentTotalCapacity, "v1", WorkerConfig.DEFAULT_CATEGORY
),
2,
currentCapacityUsed,
ImmutableSet.of("grp1", "grp2"),
ImmutableSet.of("task1", "task2"),
DateTimes.of("2015-01-01T01:01:01Z")
)
);
EasyMock.expect(workerTaskRunner.getWorkers()).andReturn(workerInfos);
EasyMock.expect(workerTaskRunner.getTotalCapacity()).andReturn(currentTotalCapacity);
EasyMock.expect(workerTaskRunner.getUsedCapacity()).andReturn(currentCapacityUsed);
EasyMock.reset(taskMaster);
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(
Optional.of(workerTaskRunner)
Expand Down Expand Up @@ -1484,6 +1501,8 @@ public void testGetTotalWorkerCapacityWithAutoScaleConfiguredAndProvisioningStra
Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus());
Assert.assertEquals(workerInfos.stream().findFirst().get().getWorker().getCapacity(), ((TotalWorkerCapacityResponse) response.getEntity()).getCurrentClusterCapacity());
Assert.assertEquals(invalidExpectedCapacity, ((TotalWorkerCapacityResponse) response.getEntity()).getMaximumCapacityWithAutoScale());
Assert.assertEquals(currentTotalCapacity, ((TotalWorkerCapacityResponse) response.getEntity()).getCurrentClusterCapacity());
Assert.assertEquals(currentCapacityUsed, ((TotalWorkerCapacityResponse) response.getEntity()).getUsedClusterCapacity());
}

@Test
Expand Down
13 changes: 3 additions & 10 deletions web-console/src/helpers/capacity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,17 @@
* limitations under the License.
*/

import { sum } from 'd3-array';

import type { CapacityInfo } from '../druid-models';
import { Api } from '../singletons';

export async function getClusterCapacity(): Promise<CapacityInfo> {
const workersResponse = await Api.instance.get('/druid/indexer/v1/workers', {
const workersResponse = await Api.instance.get('/druid/indexer/v1/totalWorkerCapacity', {
timeout: 5000,
});

const usedTaskSlots = sum(
workersResponse.data,
(workerInfo: any) => Number(workerInfo.currCapacityUsed) || 0,
);
const usedTaskSlots = Number(workersResponse.data.usedClusterCapacity);

const totalTaskSlots = sum(workersResponse.data, (workerInfo: any) =>
Number(workerInfo.worker.capacity),
);
const totalTaskSlots = Number(workersResponse.data.currentClusterCapacity);

return {
availableTaskSlots: totalTaskSlots - usedTaskSlots,
Expand Down