Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using Entry directly instead of Map.Entry in KafkaSupervisor #6291

Merged
merged 1 commit into from
Sep 27, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -793,7 +793,7 @@ void resetInternal(DataSourceMetadata dataSourceMetadata)
// defend against consecutive reset requests from replicas
// as well as the case where the metadata store do not have an entry for the reset partitions
boolean doReset = false;
for (Map.Entry<Integer, Long> resetPartitionOffset : resetKafkaMetadata.getKafkaPartitions()
for (Entry<Integer, Long> resetPartitionOffset : resetKafkaMetadata.getKafkaPartitions()
.getPartitionOffsetMap()
.entrySet()) {
final Long partitionOffsetInMetadataStore = currentMetadata == null
Expand Down Expand Up @@ -876,7 +876,7 @@ void gracefulShutdownInternal() throws ExecutionException, InterruptedException,
// checkTaskDuration() will be triggered. This is better than just telling these tasks to publish whatever they
// have, as replicas that are supposed to publish the same segment may not have read the same set of offsets.
for (TaskGroup taskGroup : taskGroups.values()) {
for (Map.Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
for (Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
if (taskInfoProvider.getTaskLocation(entry.getKey()).equals(TaskLocation.unknown())) {
killTask(entry.getKey());
} else {
Expand Down Expand Up @@ -915,7 +915,7 @@ String generateSequenceName(
{
StringBuilder sb = new StringBuilder();

for (Map.Entry<Integer, Long> entry : startPartitions.entrySet()) {
for (Entry<Integer, Long> entry : startPartitions.entrySet()) {
sb.append(StringUtils.format("+%d(%d)", entry.getKey(), entry.getValue()));
}
String partitionOffsetStr = sb.toString().substring(1);
Expand Down Expand Up @@ -1083,7 +1083,7 @@ public Boolean apply(KafkaIndexTask.Status status)
// existing) so that the next tasks will start reading from where this task left off
Map<Integer, Long> publishingTaskEndOffsets = taskClient.getEndOffsets(taskId);

for (Map.Entry<Integer, Long> entry : publishingTaskEndOffsets.entrySet()) {
for (Entry<Integer, Long> entry : publishingTaskEndOffsets.entrySet()) {
Integer partition = entry.getKey();
Long offset = entry.getValue();
ConcurrentHashMap<Integer, Long> partitionOffsets = partitionGroups.get(
Expand Down Expand Up @@ -1375,7 +1375,7 @@ private void updateTaskStatus() throws ExecutionException, InterruptedException,

// update status (and startTime if unknown) of current tasks in taskGroups
for (TaskGroup group : taskGroups.values()) {
for (Map.Entry<String, TaskData> entry : group.tasks.entrySet()) {
for (Entry<String, TaskData> entry : group.tasks.entrySet()) {
final String taskId = entry.getKey();
final TaskData taskData = entry.getValue();

Expand Down Expand Up @@ -1418,7 +1418,7 @@ public Boolean apply(@Nullable DateTime startTime)
// update status of pending completion tasks in pendingCompletionTaskGroups
for (List<TaskGroup> taskGroups : pendingCompletionTaskGroups.values()) {
for (TaskGroup group : taskGroups) {
for (Map.Entry<String, TaskData> entry : group.tasks.entrySet()) {
for (Entry<String, TaskData> entry : group.tasks.entrySet()) {
entry.getValue().status = taskStorage.getStatus(entry.getKey()).get();
}
}
Expand All @@ -1441,7 +1441,7 @@ private void checkTaskDuration() throws InterruptedException, ExecutionException
final List<ListenableFuture<Map<Integer, Long>>> futures = Lists.newArrayList();
final List<Integer> futureGroupIds = Lists.newArrayList();

for (Map.Entry<Integer, TaskGroup> entry : taskGroups.entrySet()) {
for (Entry<Integer, TaskGroup> entry : taskGroups.entrySet()) {
Integer groupId = entry.getKey();
TaskGroup group = entry.getValue();

Expand Down Expand Up @@ -1474,7 +1474,7 @@ private void checkTaskDuration() throws InterruptedException, ExecutionException
pendingCompletionTaskGroups.computeIfAbsent(groupId, k -> new CopyOnWriteArrayList<>()).add(group);

// set endOffsets as the next startOffsets
for (Map.Entry<Integer, Long> entry : endOffsets.entrySet()) {
for (Entry<Integer, Long> entry : endOffsets.entrySet()) {
partitionGroups.get(groupId).put(entry.getKey(), entry.getValue());
}
} else {
Expand All @@ -1500,9 +1500,9 @@ private ListenableFuture<Map<Integer, Long>> checkpointTaskGroup(final TaskGroup
{
if (finalize) {
// 1) Check if any task completed (in which case we're done) and kill unassigned tasks
Iterator<Map.Entry<String, TaskData>> i = taskGroup.tasks.entrySet().iterator();
Iterator<Entry<String, TaskData>> i = taskGroup.tasks.entrySet().iterator();
while (i.hasNext()) {
Map.Entry<String, TaskData> taskEntry = i.next();
Entry<String, TaskData> taskEntry = i.next();
String taskId = taskEntry.getKey();
TaskData task = taskEntry.getValue();

Expand Down Expand Up @@ -1564,7 +1564,7 @@ public Map<Integer, Long> apply(List<Map<Integer, Long>> input)
taskGroup.tasks.remove(taskId);

} else { // otherwise build a map of the highest offsets seen
for (Map.Entry<Integer, Long> offset : result.entrySet()) {
for (Entry<Integer, Long> offset : result.entrySet()) {
if (!endOffsets.containsKey(offset.getKey())
|| endOffsets.get(offset.getKey()).compareTo(offset.getValue()) < 0) {
endOffsets.put(offset.getKey(), offset.getValue());
Expand Down Expand Up @@ -1642,7 +1642,7 @@ private void checkPendingCompletionTasks() throws ExecutionException, Interrupte
{
List<ListenableFuture<?>> futures = Lists.newArrayList();

for (Map.Entry<Integer, CopyOnWriteArrayList<TaskGroup>> pendingGroupList : pendingCompletionTaskGroups.entrySet()) {
for (Entry<Integer, CopyOnWriteArrayList<TaskGroup>> pendingGroupList : pendingCompletionTaskGroups.entrySet()) {

boolean stopTasksInTaskGroup = false;
Integer groupId = pendingGroupList.getKey();
Expand Down Expand Up @@ -1723,9 +1723,9 @@ private void checkPendingCompletionTasks() throws ExecutionException, Interrupte
private void checkCurrentTaskState() throws ExecutionException, InterruptedException, TimeoutException
{
List<ListenableFuture<?>> futures = Lists.newArrayList();
Iterator<Map.Entry<Integer, TaskGroup>> iTaskGroups = taskGroups.entrySet().iterator();
Iterator<Entry<Integer, TaskGroup>> iTaskGroups = taskGroups.entrySet().iterator();
while (iTaskGroups.hasNext()) {
Map.Entry<Integer, TaskGroup> taskGroupEntry = iTaskGroups.next();
Entry<Integer, TaskGroup> taskGroupEntry = iTaskGroups.next();
Integer groupId = taskGroupEntry.getKey();
TaskGroup taskGroup = taskGroupEntry.getValue();

Expand All @@ -1737,9 +1737,9 @@ private void checkCurrentTaskState() throws ExecutionException, InterruptedExcep

log.debug("Task group [%d] pre-pruning: %s", groupId, taskGroup.taskIds());

Iterator<Map.Entry<String, TaskData>> iTasks = taskGroup.tasks.entrySet().iterator();
Iterator<Entry<String, TaskData>> iTasks = taskGroup.tasks.entrySet().iterator();
while (iTasks.hasNext()) {
Map.Entry<String, TaskData> task = iTasks.next();
Entry<String, TaskData> task = iTasks.next();
String taskId = task.getKey();
TaskData taskData = task.getValue();

Expand Down Expand Up @@ -1810,7 +1810,7 @@ void createNewTasks() throws JsonProcessingException

// iterate through all the current task groups and make sure each one has the desired number of replica tasks
boolean createdTask = false;
for (Map.Entry<Integer, TaskGroup> entry : taskGroups.entrySet()) {
for (Entry<Integer, TaskGroup> entry : taskGroups.entrySet()) {
TaskGroup taskGroup = entry.getValue();
Integer groupId = entry.getKey();

Expand Down Expand Up @@ -1898,7 +1898,7 @@ private void createKafkaTasksForGroup(int groupId, int replicas) throws JsonProc
private ImmutableMap<Integer, Long> generateStartingOffsetsForPartitionGroup(int groupId)
{
ImmutableMap.Builder<Integer, Long> builder = ImmutableMap.builder();
for (Map.Entry<Integer, Long> entry : partitionGroups.get(groupId).entrySet()) {
for (Entry<Integer, Long> entry : partitionGroups.get(groupId).entrySet()) {
Integer partition = entry.getKey();
Long offset = entry.getValue();

Expand Down Expand Up @@ -2023,7 +2023,7 @@ private ListenableFuture<?> stopTasksInGroup(@Nullable TaskGroup taskGroup)
}

final List<ListenableFuture<Void>> futures = Lists.newArrayList();
for (Map.Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
for (Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
final String taskId = entry.getKey();
final TaskData taskData = entry.getValue();
if (taskData.status == null) {
Expand Down Expand Up @@ -2108,7 +2108,7 @@ private SupervisorReport<KafkaSupervisorReportPayload> generateReport(boolean in

try {
for (TaskGroup taskGroup : taskGroups.values()) {
for (Map.Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
for (Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
String taskId = entry.getKey();
@Nullable DateTime startTime = entry.getValue().startTime;
Map<Integer, Long> currentOffsets = entry.getValue().currentOffsets;
Expand All @@ -2135,7 +2135,7 @@ private SupervisorReport<KafkaSupervisorReportPayload> generateReport(boolean in

for (List<TaskGroup> taskGroups : pendingCompletionTaskGroups.values()) {
for (TaskGroup taskGroup : taskGroups) {
for (Map.Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
for (Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
String taskId = entry.getKey();
@Nullable DateTime startTime = entry.getValue().startTime;
Map<Integer, Long> currentOffsets = entry.getValue().currentOffsets;
Expand Down Expand Up @@ -2203,7 +2203,7 @@ private Map<Integer, Long> getHighestCurrentOffsets()
.stream()
.flatMap(taskGroup -> taskGroup.tasks.entrySet().stream())
.flatMap(taskData -> taskData.getValue().currentOffsets.entrySet().stream())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, Long::max));
.collect(Collectors.toMap(Entry::getKey, Entry::getValue, Long::max));
}

private Map<Integer, Long> getLagPerPartition(Map<Integer, Long> currentOffsets)
Expand All @@ -2213,7 +2213,7 @@ private Map<Integer, Long> getLagPerPartition(Map<Integer, Long> currentOffsets)
.stream()
.collect(
Collectors.toMap(
Map.Entry::getKey,
Entry::getKey,
e -> latestOffsetsFromKafka != null
&& latestOffsetsFromKafka.get(e.getKey()) != null
&& e.getValue() != null
Expand Down