diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java index 78652a70b919..8d3380aff68a 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -793,7 +793,7 @@ void resetInternal(DataSourceMetadata dataSourceMetadata) // defend against consecutive reset requests from replicas // as well as the case where the metadata store do not have an entry for the reset partitions boolean doReset = false; - for (Map.Entry resetPartitionOffset : resetKafkaMetadata.getKafkaPartitions() + for (Entry resetPartitionOffset : resetKafkaMetadata.getKafkaPartitions() .getPartitionOffsetMap() .entrySet()) { final Long partitionOffsetInMetadataStore = currentMetadata == null @@ -876,7 +876,7 @@ void gracefulShutdownInternal() throws ExecutionException, InterruptedException, // checkTaskDuration() will be triggered. This is better than just telling these tasks to publish whatever they // have, as replicas that are supposed to publish the same segment may not have read the same set of offsets. for (TaskGroup taskGroup : taskGroups.values()) { - for (Map.Entry entry : taskGroup.tasks.entrySet()) { + for (Entry entry : taskGroup.tasks.entrySet()) { if (taskInfoProvider.getTaskLocation(entry.getKey()).equals(TaskLocation.unknown())) { killTask(entry.getKey()); } else { @@ -915,7 +915,7 @@ String generateSequenceName( { StringBuilder sb = new StringBuilder(); - for (Map.Entry entry : startPartitions.entrySet()) { + for (Entry entry : startPartitions.entrySet()) { sb.append(StringUtils.format("+%d(%d)", entry.getKey(), entry.getValue())); } String partitionOffsetStr = sb.toString().substring(1); @@ -1083,7 +1083,7 @@ public Boolean apply(KafkaIndexTask.Status status) // existing) so that the next tasks will start reading from where this task left off Map publishingTaskEndOffsets = taskClient.getEndOffsets(taskId); - for (Map.Entry entry : publishingTaskEndOffsets.entrySet()) { + for (Entry entry : publishingTaskEndOffsets.entrySet()) { Integer partition = entry.getKey(); Long offset = entry.getValue(); ConcurrentHashMap partitionOffsets = partitionGroups.get( @@ -1375,7 +1375,7 @@ private void updateTaskStatus() throws ExecutionException, InterruptedException, // update status (and startTime if unknown) of current tasks in taskGroups for (TaskGroup group : taskGroups.values()) { - for (Map.Entry entry : group.tasks.entrySet()) { + for (Entry entry : group.tasks.entrySet()) { final String taskId = entry.getKey(); final TaskData taskData = entry.getValue(); @@ -1418,7 +1418,7 @@ public Boolean apply(@Nullable DateTime startTime) // update status of pending completion tasks in pendingCompletionTaskGroups for (List taskGroups : pendingCompletionTaskGroups.values()) { for (TaskGroup group : taskGroups) { - for (Map.Entry entry : group.tasks.entrySet()) { + for (Entry entry : group.tasks.entrySet()) { entry.getValue().status = taskStorage.getStatus(entry.getKey()).get(); } } @@ -1441,7 +1441,7 @@ private void checkTaskDuration() throws InterruptedException, ExecutionException final List>> futures = Lists.newArrayList(); final List futureGroupIds = Lists.newArrayList(); - for (Map.Entry entry : taskGroups.entrySet()) { + for (Entry entry : taskGroups.entrySet()) { Integer groupId = entry.getKey(); TaskGroup group = entry.getValue(); @@ -1474,7 +1474,7 @@ private void checkTaskDuration() throws InterruptedException, ExecutionException pendingCompletionTaskGroups.computeIfAbsent(groupId, k -> new CopyOnWriteArrayList<>()).add(group); // set endOffsets as the next startOffsets - for (Map.Entry entry : endOffsets.entrySet()) { + for (Entry entry : endOffsets.entrySet()) { partitionGroups.get(groupId).put(entry.getKey(), entry.getValue()); } } else { @@ -1500,9 +1500,9 @@ private ListenableFuture> checkpointTaskGroup(final TaskGroup { if (finalize) { // 1) Check if any task completed (in which case we're done) and kill unassigned tasks - Iterator> i = taskGroup.tasks.entrySet().iterator(); + Iterator> i = taskGroup.tasks.entrySet().iterator(); while (i.hasNext()) { - Map.Entry taskEntry = i.next(); + Entry taskEntry = i.next(); String taskId = taskEntry.getKey(); TaskData task = taskEntry.getValue(); @@ -1564,7 +1564,7 @@ public Map apply(List> input) taskGroup.tasks.remove(taskId); } else { // otherwise build a map of the highest offsets seen - for (Map.Entry offset : result.entrySet()) { + for (Entry offset : result.entrySet()) { if (!endOffsets.containsKey(offset.getKey()) || endOffsets.get(offset.getKey()).compareTo(offset.getValue()) < 0) { endOffsets.put(offset.getKey(), offset.getValue()); @@ -1642,7 +1642,7 @@ private void checkPendingCompletionTasks() throws ExecutionException, Interrupte { List> futures = Lists.newArrayList(); - for (Map.Entry> pendingGroupList : pendingCompletionTaskGroups.entrySet()) { + for (Entry> pendingGroupList : pendingCompletionTaskGroups.entrySet()) { boolean stopTasksInTaskGroup = false; Integer groupId = pendingGroupList.getKey(); @@ -1723,9 +1723,9 @@ private void checkPendingCompletionTasks() throws ExecutionException, Interrupte private void checkCurrentTaskState() throws ExecutionException, InterruptedException, TimeoutException { List> futures = Lists.newArrayList(); - Iterator> iTaskGroups = taskGroups.entrySet().iterator(); + Iterator> iTaskGroups = taskGroups.entrySet().iterator(); while (iTaskGroups.hasNext()) { - Map.Entry taskGroupEntry = iTaskGroups.next(); + Entry taskGroupEntry = iTaskGroups.next(); Integer groupId = taskGroupEntry.getKey(); TaskGroup taskGroup = taskGroupEntry.getValue(); @@ -1737,9 +1737,9 @@ private void checkCurrentTaskState() throws ExecutionException, InterruptedExcep log.debug("Task group [%d] pre-pruning: %s", groupId, taskGroup.taskIds()); - Iterator> iTasks = taskGroup.tasks.entrySet().iterator(); + Iterator> iTasks = taskGroup.tasks.entrySet().iterator(); while (iTasks.hasNext()) { - Map.Entry task = iTasks.next(); + Entry task = iTasks.next(); String taskId = task.getKey(); TaskData taskData = task.getValue(); @@ -1810,7 +1810,7 @@ void createNewTasks() throws JsonProcessingException // iterate through all the current task groups and make sure each one has the desired number of replica tasks boolean createdTask = false; - for (Map.Entry entry : taskGroups.entrySet()) { + for (Entry entry : taskGroups.entrySet()) { TaskGroup taskGroup = entry.getValue(); Integer groupId = entry.getKey(); @@ -1898,7 +1898,7 @@ private void createKafkaTasksForGroup(int groupId, int replicas) throws JsonProc private ImmutableMap generateStartingOffsetsForPartitionGroup(int groupId) { ImmutableMap.Builder builder = ImmutableMap.builder(); - for (Map.Entry entry : partitionGroups.get(groupId).entrySet()) { + for (Entry entry : partitionGroups.get(groupId).entrySet()) { Integer partition = entry.getKey(); Long offset = entry.getValue(); @@ -2023,7 +2023,7 @@ private ListenableFuture stopTasksInGroup(@Nullable TaskGroup taskGroup) } final List> futures = Lists.newArrayList(); - for (Map.Entry entry : taskGroup.tasks.entrySet()) { + for (Entry entry : taskGroup.tasks.entrySet()) { final String taskId = entry.getKey(); final TaskData taskData = entry.getValue(); if (taskData.status == null) { @@ -2108,7 +2108,7 @@ private SupervisorReport generateReport(boolean in try { for (TaskGroup taskGroup : taskGroups.values()) { - for (Map.Entry entry : taskGroup.tasks.entrySet()) { + for (Entry entry : taskGroup.tasks.entrySet()) { String taskId = entry.getKey(); @Nullable DateTime startTime = entry.getValue().startTime; Map currentOffsets = entry.getValue().currentOffsets; @@ -2135,7 +2135,7 @@ private SupervisorReport generateReport(boolean in for (List taskGroups : pendingCompletionTaskGroups.values()) { for (TaskGroup taskGroup : taskGroups) { - for (Map.Entry entry : taskGroup.tasks.entrySet()) { + for (Entry entry : taskGroup.tasks.entrySet()) { String taskId = entry.getKey(); @Nullable DateTime startTime = entry.getValue().startTime; Map currentOffsets = entry.getValue().currentOffsets; @@ -2203,7 +2203,7 @@ private Map getHighestCurrentOffsets() .stream() .flatMap(taskGroup -> taskGroup.tasks.entrySet().stream()) .flatMap(taskData -> taskData.getValue().currentOffsets.entrySet().stream()) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, Long::max)); + .collect(Collectors.toMap(Entry::getKey, Entry::getValue, Long::max)); } private Map getLagPerPartition(Map currentOffsets) @@ -2213,7 +2213,7 @@ private Map getLagPerPartition(Map currentOffsets) .stream() .collect( Collectors.toMap( - Map.Entry::getKey, + Entry::getKey, e -> latestOffsetsFromKafka != null && latestOffsetsFromKafka.get(e.getKey()) != null && e.getValue() != null