-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix capacity response in mm-less ingestion #14888
Changes from 3 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -32,8 +32,6 @@ | |
import org.apache.druid.audit.AuditInfo; | ||
import org.apache.druid.audit.AuditManager; | ||
import org.apache.druid.client.indexing.ClientTaskQuery; | ||
import org.apache.druid.client.indexing.IndexingWorker; | ||
import org.apache.druid.client.indexing.IndexingWorkerInfo; | ||
import org.apache.druid.common.config.ConfigManager.SetResult; | ||
import org.apache.druid.common.config.JacksonConfigManager; | ||
import org.apache.druid.common.exception.DruidException; | ||
|
@@ -469,27 +467,17 @@ public Response getWorkerConfig() | |
public Response getTotalWorkerCapacity() | ||
{ | ||
// Calculate current cluster capacity | ||
int currentCapacity; | ||
Optional<TaskRunner> taskRunnerOptional = taskMaster.getTaskRunner(); | ||
if (!taskRunnerOptional.isPresent()) { | ||
// Cannot serve call as not leader | ||
return Response.status(Response.Status.SERVICE_UNAVAILABLE).build(); | ||
} | ||
TaskRunner taskRunner = taskRunnerOptional.get(); | ||
Collection<ImmutableWorkerInfo> workers; | ||
if (taskRunner instanceof WorkerTaskRunner) { | ||
workers = ((WorkerTaskRunner) taskRunner).getWorkers(); | ||
currentCapacity = workers.stream().mapToInt(workerInfo -> workerInfo.getWorker().getCapacity()).sum(); | ||
} else { | ||
log.debug( | ||
"Cannot calculate capacity as task runner [%s] of type [%s] does not support listing workers", | ||
taskRunner, | ||
taskRunner.getClass().getName() | ||
); | ||
workers = ImmutableList.of(); | ||
currentCapacity = -1; | ||
} | ||
Collection<ImmutableWorkerInfo> workers = taskRunner instanceof WorkerTaskRunner ? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we still need the list of workers? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. for capacity with autoscaling, it looks like if capacity hint is not passed it tries to assume each worker has the same capacity when calculating max capacity, so it needs at least a single worker as an example. |
||
((WorkerTaskRunner) taskRunner).getWorkers() : ImmutableList.of(); | ||
|
||
int currentCapacity = taskRunner.getTotalCapacity(); | ||
int usedCapacity = taskRunner.getUsedCapacity(); | ||
// Calculate maximum capacity with auto scale | ||
int maximumCapacity; | ||
if (workerConfigRef == null) { | ||
|
@@ -520,7 +508,7 @@ public Response getTotalWorkerCapacity() | |
); | ||
maximumCapacity = -1; | ||
} | ||
return Response.ok(new TotalWorkerCapacityResponse(currentCapacity, maximumCapacity)).build(); | ||
return Response.ok(new TotalWorkerCapacityResponse(currentCapacity, maximumCapacity, usedCapacity)).build(); | ||
} | ||
|
||
// default value is used for backwards compatibility | ||
|
@@ -939,24 +927,6 @@ public Response apply(TaskRunner taskRunner) | |
{ | ||
if (taskRunner instanceof WorkerTaskRunner) { | ||
return Response.ok(((WorkerTaskRunner) taskRunner).getWorkers()).build(); | ||
} else if (taskRunner.isK8sTaskRunner()) { | ||
// required because kubernetes task runner has no concept of a worker, so returning a dummy worker. | ||
return Response.ok(ImmutableList.of( | ||
new IndexingWorkerInfo( | ||
new IndexingWorker( | ||
"http", | ||
"host", | ||
"8100", | ||
taskRunner.getTotalTaskSlotCount().getOrDefault("taskQueue", 0L).intValue(), | ||
"version" | ||
), | ||
0, | ||
Collections.emptySet(), | ||
Collections.emptyList(), | ||
DateTimes.EPOCH, | ||
null | ||
) | ||
)).build(); | ||
} else { | ||
log.debug( | ||
"Task runner [%s] of type [%s] does not support listing workers", | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Best add javadocs to these methods esp to point out that they can return -1 when not implemented or can't be determined.