diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java index 8fc6a5624ddb..9a4e4bcca629 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java @@ -396,12 +396,6 @@ public Map getBlacklistedTaskSlotCount() return Collections.emptyMap(); } - @Override - public boolean isK8sTaskRunner() - { - return true; - } - @Override public void unregisterListener(String listenerId) { @@ -457,4 +451,16 @@ public RunnerTaskState getRunnerTaskState(String taskId) return workItem.getRunnerTaskState(); } + + @Override + public int getTotalCapacity() + { + return config.getCapacity(); + } + + @Override + public int getUsedCapacity() + { + return tasks.size(); + } } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java index 613e3b1031e1..8b9e126597bb 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java @@ -627,4 +627,22 @@ public TaskLocation getLocation() verifyAll(); } + + @Test + public void test_getTotalCapacity() + { + Assert.assertEquals(1, runner.getTotalCapacity()); + } + + @Test + public void test_getUsedCapacity() + { + Assert.assertEquals(0, runner.getUsedCapacity()); + KubernetesWorkItem workItem = new KubernetesWorkItem(task, null); + runner.tasks.put(task.getId(), workItem); + Assert.assertEquals(1, runner.getUsedCapacity()); + runner.tasks.remove(task.getId()); + Assert.assertEquals(0, runner.getUsedCapacity()); + + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java index 4bae1d2cb254..56a62763ab68 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java @@ -1639,4 +1639,16 @@ public Map getBlacklistedTaskSlotCount() return totalBlacklistedPeons; } + + @Override + public int getTotalCapacity() + { + return getWorkers().stream().mapToInt(workerInfo -> workerInfo.getWorker().getCapacity()).sum(); + } + + @Override + public int getUsedCapacity() + { + return getWorkers().stream().mapToInt(ImmutableWorkerInfo::getCurrCapacityUsed).sum(); + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java index 3d0eb485aa1a..13fc23f1a656 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java @@ -136,14 +136,6 @@ default TaskLocation getTaskLocation(String taskId) Map getBlacklistedTaskSlotCount(); - /** - * Beacause the k8s task runner is an extension, we need to know the task runner type in the overlord resource - */ - default boolean isK8sTaskRunner() - { - return false; - } - default void updateStatus(Task task, TaskStatus status) { // do nothing @@ -154,5 +146,22 @@ default void updateLocation(Task task, TaskLocation location) // do nothing } + /** + * The maximum number of tasks this TaskRunner can run concurrently. + * Can return -1 if this method is not implemented or capacity can't be found. + */ + default int getTotalCapacity() + { + return -1; + } + + /** + * The current number of tasks this TaskRunner is running. + * Can return -1 if this method is not implemented or the # of tasks can't be found. + */ + default int getUsedCapacity() + { + return -1; + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java index 57d610ab736d..7f81a359a0bb 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java @@ -1778,6 +1778,18 @@ public Map getBlacklistedTaskSlotCount() return totalBlacklistedPeons; } + @Override + public int getTotalCapacity() + { + return getWorkers().stream().mapToInt(workerInfo -> workerInfo.getWorker().getCapacity()).sum(); + } + + @Override + public int getUsedCapacity() + { + return getWorkers().stream().mapToInt(ImmutableWorkerInfo::getCurrCapacityUsed).sum(); + } + private static class HttpRemoteTaskRunnerWorkItem extends RemoteTaskRunnerWorkItem { enum State diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java index 3742ecb8c3ca..fa61f796154d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java @@ -32,8 +32,6 @@ import org.apache.druid.audit.AuditInfo; import org.apache.druid.audit.AuditManager; import org.apache.druid.client.indexing.ClientTaskQuery; -import org.apache.druid.client.indexing.IndexingWorker; -import org.apache.druid.client.indexing.IndexingWorkerInfo; import org.apache.druid.common.config.ConfigManager.SetResult; import org.apache.druid.common.config.JacksonConfigManager; import org.apache.druid.common.exception.DruidException; @@ -469,27 +467,17 @@ public Response getWorkerConfig() public Response getTotalWorkerCapacity() { // Calculate current cluster capacity - int currentCapacity; Optional taskRunnerOptional = taskMaster.getTaskRunner(); if (!taskRunnerOptional.isPresent()) { // Cannot serve call as not leader return Response.status(Response.Status.SERVICE_UNAVAILABLE).build(); } TaskRunner taskRunner = taskRunnerOptional.get(); - Collection workers; - if (taskRunner instanceof WorkerTaskRunner) { - workers = ((WorkerTaskRunner) taskRunner).getWorkers(); - currentCapacity = workers.stream().mapToInt(workerInfo -> workerInfo.getWorker().getCapacity()).sum(); - } else { - log.debug( - "Cannot calculate capacity as task runner [%s] of type [%s] does not support listing workers", - taskRunner, - taskRunner.getClass().getName() - ); - workers = ImmutableList.of(); - currentCapacity = -1; - } + Collection workers = taskRunner instanceof WorkerTaskRunner ? + ((WorkerTaskRunner) taskRunner).getWorkers() : ImmutableList.of(); + int currentCapacity = taskRunner.getTotalCapacity(); + int usedCapacity = taskRunner.getUsedCapacity(); // Calculate maximum capacity with auto scale int maximumCapacity; if (workerConfigRef == null) { @@ -520,7 +508,7 @@ public Response getTotalWorkerCapacity() ); maximumCapacity = -1; } - return Response.ok(new TotalWorkerCapacityResponse(currentCapacity, maximumCapacity)).build(); + return Response.ok(new TotalWorkerCapacityResponse(currentCapacity, maximumCapacity, usedCapacity)).build(); } // default value is used for backwards compatibility @@ -939,24 +927,6 @@ public Response apply(TaskRunner taskRunner) { if (taskRunner instanceof WorkerTaskRunner) { return Response.ok(((WorkerTaskRunner) taskRunner).getWorkers()).build(); - } else if (taskRunner.isK8sTaskRunner()) { - // required because kubernetes task runner has no concept of a worker, so returning a dummy worker. - return Response.ok(ImmutableList.of( - new IndexingWorkerInfo( - new IndexingWorker( - "http", - "host", - "8100", - taskRunner.getTotalTaskSlotCount().getOrDefault("taskQueue", 0L).intValue(), - "version" - ), - 0, - Collections.emptySet(), - Collections.emptyList(), - DateTimes.EPOCH, - null - ) - )).build(); } else { log.debug( "Task runner [%s] of type [%s] does not support listing workers", diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/TotalWorkerCapacityResponse.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/TotalWorkerCapacityResponse.java index f40a8f2466a3..6ee03ed6ca0d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/TotalWorkerCapacityResponse.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/TotalWorkerCapacityResponse.java @@ -37,15 +37,22 @@ public class TotalWorkerCapacityResponse * This can be -1 if it cannot be determined or if auto scaling is not configured. */ private final int maximumCapacityWithAutoScale; + /** + * Used cluster capacity of the current state of the cluster. This can be -1 if + * it cannot be determined. + */ + private final int usedClusterCapacity; @JsonCreator public TotalWorkerCapacityResponse( @JsonProperty("currentClusterCapacity") int currentClusterCapacity, - @JsonProperty("maximumCapacityWithAutoScale") int maximumCapacityWithAutoScale + @JsonProperty("maximumCapacityWithAutoScale") int maximumCapacityWithAutoScale, + @JsonProperty("usedClusterCapacity") int usedClusterCapacity ) { this.currentClusterCapacity = currentClusterCapacity; this.maximumCapacityWithAutoScale = maximumCapacityWithAutoScale; + this.usedClusterCapacity = usedClusterCapacity; } @JsonProperty @@ -59,4 +66,10 @@ public int getMaximumCapacityWithAutoScale() { return maximumCapacityWithAutoScale; } + + @JsonProperty + public int getUsedClusterCapacity() + { + return usedClusterCapacity; + } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java index a76766d2268c..db56811ee2c6 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java @@ -147,6 +147,9 @@ public void testRun() throws Exception Assert.assertEquals(3, remoteTaskRunner.getTotalTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); Assert.assertEquals(3, remoteTaskRunner.getIdleTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); Assert.assertEquals(0, remoteTaskRunner.getUsedTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); + Assert.assertEquals(3, remoteTaskRunner.getTotalCapacity()); + Assert.assertEquals(0, remoteTaskRunner.getUsedCapacity()); + ListenableFuture result = remoteTaskRunner.run(task); @@ -164,6 +167,8 @@ public void testRun() throws Exception Assert.assertEquals(3, remoteTaskRunner.getTotalTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); Assert.assertEquals(3, remoteTaskRunner.getIdleTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); Assert.assertEquals(0, remoteTaskRunner.getUsedTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); + Assert.assertEquals(3, remoteTaskRunner.getTotalCapacity()); + Assert.assertEquals(0, remoteTaskRunner.getUsedCapacity()); } @Test diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java index d7feece444c2..7b2a6f618b94 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java @@ -146,6 +146,8 @@ WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip2", 2, "0", Wo Assert.assertEquals(numTasks, taskRunner.getKnownTasks().size()); Assert.assertEquals(numTasks, taskRunner.getCompletedTasks().size()); + Assert.assertEquals(4, taskRunner.getTotalCapacity()); + Assert.assertEquals(0, taskRunner.getUsedCapacity()); } /* diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java index 4b40a40b7ab5..d244fb0ba304 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java @@ -1331,6 +1331,8 @@ public void testGetTotalWorkerCapacityWithUnknown() WorkerBehaviorConfig workerBehaviorConfig = EasyMock.createMock(WorkerBehaviorConfig.class); AtomicReference workerBehaviorConfigAtomicReference = new AtomicReference<>(workerBehaviorConfig); EasyMock.expect(configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class)).andReturn(workerBehaviorConfigAtomicReference); + EasyMock.expect(taskRunner.getTotalCapacity()).andReturn(-1); + EasyMock.expect(taskRunner.getUsedCapacity()).andReturn(-1); EasyMock.replay( taskRunner, taskMaster, @@ -1344,6 +1346,7 @@ public void testGetTotalWorkerCapacityWithUnknown() final Response response = overlordResource.getTotalWorkerCapacity(); Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus()); Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) response.getEntity()).getCurrentClusterCapacity()); + Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) response.getEntity()).getUsedClusterCapacity()); Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) response.getEntity()).getMaximumCapacityWithAutoScale()); } @@ -1352,6 +1355,8 @@ public void testGetTotalWorkerCapacityWithWorkerTaskRunnerButWorkerBehaviorConfi { AtomicReference workerBehaviorConfigAtomicReference = new AtomicReference<>(null); EasyMock.expect(configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class)).andReturn(workerBehaviorConfigAtomicReference); + EasyMock.expect(taskRunner.getTotalCapacity()).andReturn(-1); + EasyMock.expect(taskRunner.getUsedCapacity()).andReturn(-1); EasyMock.replay( taskRunner, taskMaster, @@ -1365,6 +1370,7 @@ public void testGetTotalWorkerCapacityWithWorkerTaskRunnerButWorkerBehaviorConfi final Response response = overlordResource.getTotalWorkerCapacity(); Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus()); Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) response.getEntity()).getCurrentClusterCapacity()); + Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) response.getEntity()).getUsedClusterCapacity()); Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) response.getEntity()).getMaximumCapacityWithAutoScale()); } @@ -1374,6 +1380,8 @@ public void testGetTotalWorkerCapacityWithWorkerTaskRunnerButAutoScaleNotConfigu DefaultWorkerBehaviorConfig workerBehaviorConfig = new DefaultWorkerBehaviorConfig(null, null); AtomicReference workerBehaviorConfigAtomicReference = new AtomicReference<>(workerBehaviorConfig); EasyMock.expect(configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class)).andReturn(workerBehaviorConfigAtomicReference); + EasyMock.expect(taskRunner.getTotalCapacity()).andReturn(-1); + EasyMock.expect(taskRunner.getUsedCapacity()).andReturn(-1); EasyMock.replay( taskRunner, taskMaster, @@ -1387,6 +1395,7 @@ public void testGetTotalWorkerCapacityWithWorkerTaskRunnerButAutoScaleNotConfigu final Response response = overlordResource.getTotalWorkerCapacity(); Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus()); Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) response.getEntity()).getCurrentClusterCapacity()); + Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) response.getEntity()).getUsedClusterCapacity()); Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) response.getEntity()).getMaximumCapacityWithAutoScale()); } @@ -1408,6 +1417,9 @@ public void testGetTotalWorkerCapacityWithAutoScaleConfiguredAndProvisioningStra ) ); EasyMock.expect(workerTaskRunner.getWorkers()).andReturn(workerInfos); + EasyMock.expect(workerTaskRunner.getTotalCapacity()).andReturn(expectedWorkerCapacity); + EasyMock.expect(workerTaskRunner.getUsedCapacity()).andReturn(0); + EasyMock.reset(taskMaster); EasyMock.expect(taskMaster.getTaskRunner()).andReturn( Optional.of(workerTaskRunner) @@ -1435,6 +1447,7 @@ public void testGetTotalWorkerCapacityWithAutoScaleConfiguredAndProvisioningStra final Response response = overlordResource.getTotalWorkerCapacity(); Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus()); Assert.assertEquals(expectedWorkerCapacity, ((TotalWorkerCapacityResponse) response.getEntity()).getCurrentClusterCapacity()); + Assert.assertEquals(0, ((TotalWorkerCapacityResponse) response.getEntity()).getUsedClusterCapacity()); Assert.assertEquals(expectedWorkerCapacity * maxNumWorkers, ((TotalWorkerCapacityResponse) response.getEntity()).getMaximumCapacityWithAutoScale()); } @@ -1442,20 +1455,24 @@ public void testGetTotalWorkerCapacityWithAutoScaleConfiguredAndProvisioningStra public void testGetTotalWorkerCapacityWithAutoScaleConfiguredAndProvisioningStrategyNotSupportExpectedWorkerCapacity() { int invalidExpectedCapacity = -1; + int currentTotalCapacity = 3; + int currentCapacityUsed = 2; int maxNumWorkers = 2; WorkerTaskRunner workerTaskRunner = EasyMock.createMock(WorkerTaskRunner.class); Collection workerInfos = ImmutableList.of( new ImmutableWorkerInfo( new Worker( - "http", "testWorker", "192.0.0.1", 3, "v1", WorkerConfig.DEFAULT_CATEGORY + "http", "testWorker", "192.0.0.1", currentTotalCapacity, "v1", WorkerConfig.DEFAULT_CATEGORY ), - 2, + currentCapacityUsed, ImmutableSet.of("grp1", "grp2"), ImmutableSet.of("task1", "task2"), DateTimes.of("2015-01-01T01:01:01Z") ) ); EasyMock.expect(workerTaskRunner.getWorkers()).andReturn(workerInfos); + EasyMock.expect(workerTaskRunner.getTotalCapacity()).andReturn(currentTotalCapacity); + EasyMock.expect(workerTaskRunner.getUsedCapacity()).andReturn(currentCapacityUsed); EasyMock.reset(taskMaster); EasyMock.expect(taskMaster.getTaskRunner()).andReturn( Optional.of(workerTaskRunner) @@ -1484,6 +1501,8 @@ public void testGetTotalWorkerCapacityWithAutoScaleConfiguredAndProvisioningStra Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus()); Assert.assertEquals(workerInfos.stream().findFirst().get().getWorker().getCapacity(), ((TotalWorkerCapacityResponse) response.getEntity()).getCurrentClusterCapacity()); Assert.assertEquals(invalidExpectedCapacity, ((TotalWorkerCapacityResponse) response.getEntity()).getMaximumCapacityWithAutoScale()); + Assert.assertEquals(currentTotalCapacity, ((TotalWorkerCapacityResponse) response.getEntity()).getCurrentClusterCapacity()); + Assert.assertEquals(currentCapacityUsed, ((TotalWorkerCapacityResponse) response.getEntity()).getUsedClusterCapacity()); } @Test diff --git a/web-console/src/helpers/capacity.ts b/web-console/src/helpers/capacity.ts index aaed5903dc23..aa29efc68e9f 100644 --- a/web-console/src/helpers/capacity.ts +++ b/web-console/src/helpers/capacity.ts @@ -16,24 +16,17 @@ * limitations under the License. */ -import { sum } from 'd3-array'; - import type { CapacityInfo } from '../druid-models'; import { Api } from '../singletons'; export async function getClusterCapacity(): Promise { - const workersResponse = await Api.instance.get('/druid/indexer/v1/workers', { + const workersResponse = await Api.instance.get('/druid/indexer/v1/totalWorkerCapacity', { timeout: 5000, }); - const usedTaskSlots = sum( - workersResponse.data, - (workerInfo: any) => Number(workerInfo.currCapacityUsed) || 0, - ); + const usedTaskSlots = Number(workersResponse.data.usedClusterCapacity); - const totalTaskSlots = sum(workersResponse.data, (workerInfo: any) => - Number(workerInfo.worker.capacity), - ); + const totalTaskSlots = Number(workersResponse.data.currentClusterCapacity); return { availableTaskSlots: totalTaskSlots - usedTaskSlots,