diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java index 5d48fe19405b..4878da24df0a 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java @@ -26,6 +26,7 @@ import io.druid.segment.indexing.IOConfig; import org.joda.time.DateTime; +import javax.annotation.Nullable; import java.util.Map; public class KafkaIOConfig implements IOConfig @@ -33,6 +34,8 @@ public class KafkaIOConfig implements IOConfig private static final boolean DEFAULT_USE_TRANSACTION = true; private static final boolean DEFAULT_SKIP_OFFSET_GAPS = false; + @Nullable + private final Integer taskGroupId; private final String baseSequenceName; private final KafkaPartitions startPartitions; private final KafkaPartitions endPartitions; @@ -44,6 +47,7 @@ public class KafkaIOConfig implements IOConfig @JsonCreator public KafkaIOConfig( + @JsonProperty("taskGroupId") @Nullable Integer taskGroupId, // can be null for backward compabitility @JsonProperty("baseSequenceName") String baseSequenceName, @JsonProperty("startPartitions") KafkaPartitions startPartitions, @JsonProperty("endPartitions") KafkaPartitions endPartitions, @@ -54,6 +58,7 @@ public KafkaIOConfig( @JsonProperty("skipOffsetGaps") Boolean skipOffsetGaps ) { + this.taskGroupId = taskGroupId; this.baseSequenceName = Preconditions.checkNotNull(baseSequenceName, "baseSequenceName"); this.startPartitions = Preconditions.checkNotNull(startPartitions, "startPartitions"); this.endPartitions = Preconditions.checkNotNull(endPartitions, "endPartitions"); @@ -83,6 +88,13 @@ public KafkaIOConfig( } } + @Nullable + @JsonProperty + public Integer getTaskGroupId() + { + return taskGroupId; + } + @JsonProperty public String getBaseSequenceName() { @@ -135,7 +147,8 @@ public boolean isSkipOffsetGaps() public String toString() { return "KafkaIOConfig{" + - "baseSequenceName='" + baseSequenceName + '\'' + + "taskGroupId=" + taskGroupId + + ", baseSequenceName='" + baseSequenceName + '\'' + ", startPartitions=" + startPartitions + ", endPartitions=" + endPartitions + ", consumerProperties=" + consumerProperties + diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java index 16a67ec6b06c..5f039c0336b4 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java @@ -702,12 +702,13 @@ public void onFailure(Throwable t) sequences ); requestPause(); - if (!toolbox.getTaskActionClient().submit(new CheckPointDataSourceMetadataAction( + final CheckPointDataSourceMetadataAction checkpointAction = new CheckPointDataSourceMetadataAction( getDataSource(), - ioConfig.getBaseSequenceName(), + ioConfig.getTaskGroupId(), new KafkaDataSourceMetadata(new KafkaPartitions(topic, sequenceToCheckpoint.getStartOffsets())), new KafkaDataSourceMetadata(new KafkaPartitions(topic, nextOffsets)) - ))) { + ); + if (!toolbox.getTaskActionClient().submit(checkpointAction)) { throw new ISE("Checkpoint request with offsets [%s] failed, dying", nextOffsets); } } diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java index 454deffe886d..cb3352d94026 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -143,8 +143,7 @@ public class KafkaSupervisor implements Supervisor * time, there should only be up to a maximum of [taskCount] actively-reading task groups (tracked in the [taskGroups] * map) + zero or more pending-completion task groups (tracked in [pendingCompletionTaskGroups]). */ - @VisibleForTesting - static class TaskGroup + private static class TaskGroup { // This specifies the partitions and starting offsets for this task group. It is set on group creation from the data // in [partitionGroups] and never changes during the lifetime of this task group, which will live until a task in @@ -159,7 +158,7 @@ static class TaskGroup DateTime completionTimeout; // is set after signalTasksToFinish(); if not done by timeout, take corrective action final TreeMap> sequenceOffsets = new TreeMap<>(); - public TaskGroup( + TaskGroup( ImmutableMap partitionOffsets, Optional minimumMessageTime, Optional maximumMessageTime @@ -171,7 +170,7 @@ public TaskGroup( this.sequenceOffsets.put(0, partitionOffsets); } - public int addNewCheckpoint(Map checkpoint) + int addNewCheckpoint(Map checkpoint) { sequenceOffsets.put(sequenceOffsets.lastKey() + 1, checkpoint); return sequenceOffsets.lastKey(); @@ -212,9 +211,6 @@ private static class TaskData private final ConcurrentHashMap> partitionGroups = new ConcurrentHashMap<>(); // -------------------------------------------------------- - // BaseSequenceName -> TaskGroup - private final ConcurrentHashMap sequenceTaskGroup = new ConcurrentHashMap<>(); - private final TaskStorage taskStorage; private final TaskMaster taskMaster; private final IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator; @@ -494,13 +490,9 @@ public void reset(DataSourceMetadata dataSourceMetadata) } @Override - public void checkpoint( - String sequenceName, - DataSourceMetadata previousCheckpoint, - DataSourceMetadata currentCheckpoint - ) + public void checkpoint(int taskGroupId, DataSourceMetadata previousCheckpoint, DataSourceMetadata currentCheckpoint) { - Preconditions.checkNotNull(sequenceName, "Cannot checkpoint without a sequence name"); + Preconditions.checkNotNull(previousCheckpoint, "previousCheckpoint"); Preconditions.checkNotNull(currentCheckpoint, "current checkpoint cannot be null"); Preconditions.checkArgument( ioConfig.getTopic() @@ -511,12 +503,14 @@ public void checkpoint( ((KafkaDataSourceMetadata) currentCheckpoint).getKafkaPartitions().getTopic() ); - log.info("Checkpointing [%s] for sequence [%s]", currentCheckpoint, sequenceName); - notices.add(new CheckpointNotice( - sequenceName, - (KafkaDataSourceMetadata) previousCheckpoint, - (KafkaDataSourceMetadata) currentCheckpoint - )); + log.info("Checkpointing [%s] for taskGroup [%s]", currentCheckpoint, taskGroupId); + notices.add( + new CheckpointNotice( + taskGroupId, + (KafkaDataSourceMetadata) previousCheckpoint, + (KafkaDataSourceMetadata) currentCheckpoint + ) + ); } public void possiblyRegisterListener() @@ -618,17 +612,17 @@ public void handle() private class CheckpointNotice implements Notice { - final String sequenceName; + final int taskGroupId; final KafkaDataSourceMetadata previousCheckpoint; final KafkaDataSourceMetadata currentCheckpoint; CheckpointNotice( - String sequenceName, + int taskGroupId, KafkaDataSourceMetadata previousCheckpoint, KafkaDataSourceMetadata currentCheckpoint ) { - this.sequenceName = sequenceName; + this.taskGroupId = taskGroupId; this.previousCheckpoint = previousCheckpoint; this.currentCheckpoint = currentCheckpoint; } @@ -639,17 +633,12 @@ public void handle() throws ExecutionException, InterruptedException, TimeoutExc // check for consistency // if already received request for this sequenceName and dataSourceMetadata combination then return - Preconditions.checkNotNull( - sequenceTaskGroup.get(sequenceName), - "WTH?! cannot find task group for this sequence [%s], sequencesTaskGroup map [%s], taskGroups [%s]", - sequenceName, - sequenceTaskGroup, - taskGroups - ); - final TreeMap> checkpoints = sequenceTaskGroup.get(sequenceName).sequenceOffsets; + final TaskGroup taskGroup = taskGroups.get(taskGroupId); + + if (isValidTaskGroup(taskGroup)) { + final TreeMap> checkpoints = taskGroup.sequenceOffsets; - // check validity of previousCheckpoint if it is not null - if (previousCheckpoint != null) { + // check validity of previousCheckpoint int index = checkpoints.size(); for (int sequenceId : checkpoints.descendingKeySet()) { Map checkpoint = checkpoints.get(sequenceId); @@ -666,26 +655,39 @@ public void handle() throws ExecutionException, InterruptedException, TimeoutExc log.info("Already checkpointed with offsets [%s]", checkpoints.lastEntry().getValue()); return; } - } else { - // There cannot be more than one checkpoint when previous checkpoint is null - // as when the task starts they are sent existing checkpoints - Preconditions.checkState( - checkpoints.size() <= 1, - "Got checkpoint request with null as previous check point, however found more than one checkpoints" + final int taskGroupId = getTaskGroupIdForPartition( + currentCheckpoint.getKafkaPartitions() + .getPartitionOffsetMap() + .keySet() + .iterator() + .next() ); - if (checkpoints.size() == 1) { - log.info("Already checkpointed with dataSourceMetadata [%s]", checkpoints.get(0)); - return; + final Map newCheckpoint = checkpointTaskGroup(taskGroupId, false).get(); + taskGroups.get(taskGroupId).addNewCheckpoint(newCheckpoint); + log.info("Handled checkpoint notice, new checkpoint is [%s] for taskGroup [%s]", newCheckpoint, taskGroupId); + } + } + + private boolean isValidTaskGroup(@Nullable TaskGroup taskGroup) + { + if (taskGroup == null) { + // taskGroup might be in pendingCompletionTaskGroups or partitionGroups + if (pendingCompletionTaskGroups.containsKey(taskGroupId)) { + log.warn( + "Ignoring checkpoint request because taskGroup[%d] has already stopped indexing and is waiting for " + + "publishing segments", + taskGroupId + ); + return false; + } else if (partitionGroups.containsKey(taskGroupId)) { + log.warn("Ignoring checkpoint request because taskGroup[%d] is inactive", taskGroupId); + return false; + } else { + throw new ISE("WTH?! cannot find taskGroup [%s] among all taskGroups [%s]", taskGroupId, taskGroups); } } - final int taskGroupId = getTaskGroupIdForPartition(currentCheckpoint.getKafkaPartitions() - .getPartitionOffsetMap() - .keySet() - .iterator() - .next()); - final Map newCheckpoint = checkpointTaskGroup(taskGroupId, false).get(); - sequenceTaskGroup.get(sequenceName).addNewCheckpoint(newCheckpoint); - log.info("Handled checkpoint notice, new checkpoint is [%s] for sequence [%s]", newCheckpoint, sequenceName); + + return true; } } @@ -699,7 +701,6 @@ void resetInternal(DataSourceMetadata dataSourceMetadata) taskGroups.values().forEach(this::killTasksInGroup); taskGroups.clear(); partitionGroups.clear(); - sequenceTaskGroup.clear(); } else if (!(dataSourceMetadata instanceof KafkaDataSourceMetadata)) { throw new IAE("Expected KafkaDataSourceMetadata but found instance of [%s]", dataSourceMetadata.getClass()); } else { @@ -759,8 +760,7 @@ void resetInternal(DataSourceMetadata dataSourceMetadata) resetKafkaMetadata.getKafkaPartitions().getPartitionOffsetMap().keySet().forEach(partition -> { final int groupId = getTaskGroupIdForPartition(partition); killTaskGroupForPartitions(ImmutableSet.of(partition)); - final TaskGroup removedGroup = taskGroups.remove(groupId); - sequenceTaskGroup.remove(generateSequenceName(removedGroup)); + taskGroups.remove(groupId); partitionGroups.get(groupId).replaceAll((partitionId, offset) -> NOT_SET); }); } else { @@ -936,9 +936,10 @@ private void updatePartitionDataFromKafka() for (int partition = 0; partition < numPartitions; partition++) { int taskGroupId = getTaskGroupIdForPartition(partition); - partitionGroups.putIfAbsent(taskGroupId, new ConcurrentHashMap()); - - ConcurrentHashMap partitionMap = partitionGroups.get(taskGroupId); + ConcurrentHashMap partitionMap = partitionGroups.computeIfAbsent( + taskGroupId, + k -> new ConcurrentHashMap<>() + ); // The starting offset for a new partition in [partitionGroups] is initially set to NOT_SET; when a new task group // is created and is assigned partitions, if the offset in [partitionGroups] is NOT_SET it will take the starting @@ -1068,23 +1069,21 @@ public Boolean apply(KafkaIndexTask.Status status) } return false; } else { - final TaskGroup taskGroup = new TaskGroup( - ImmutableMap.copyOf( - kafkaTask.getIOConfig() - .getStartPartitions() - .getPartitionOffsetMap() - ), kafkaTask.getIOConfig().getMinimumMessageTime(), - kafkaTask.getIOConfig().getMaximumMessageTime() - ); - if (taskGroups.putIfAbsent( + final TaskGroup taskGroup = taskGroups.computeIfAbsent( taskGroupId, - taskGroup - ) == null) { - sequenceTaskGroup.put(generateSequenceName(taskGroup), taskGroups.get(taskGroupId)); - log.info("Created new task group [%d]", taskGroupId); - } + k -> { + log.info("Creating a new task group for taskGroupId[%d]", taskGroupId); + return new TaskGroup( + ImmutableMap.copyOf( + kafkaTask.getIOConfig().getStartPartitions().getPartitionOffsetMap() + ), + kafkaTask.getIOConfig().getMinimumMessageTime(), + kafkaTask.getIOConfig().getMaximumMessageTime() + ); + } + ); taskGroupsToVerify.add(taskGroupId); - taskGroups.get(taskGroupId).tasks.putIfAbsent(taskId, new TaskData()); + taskGroup.tasks.putIfAbsent(taskId, new TaskData()); } } return true; @@ -1237,7 +1236,6 @@ public void onFailure(Throwable t) // killing all tasks or no task left in the group ? // clear state about the taskgroup so that get latest offset information is fetched from metadata store log.warn("Clearing task group [%d] information as no valid tasks left the group", groupId); - sequenceTaskGroup.remove(generateSequenceName(taskGroup)); taskGroups.remove(groupId); partitionGroups.get(groupId).replaceAll((partition, offset) -> NOT_SET); } @@ -1262,9 +1260,10 @@ private void addDiscoveredTaskToPendingCompletionTaskGroups( Map startingPartitions ) { - pendingCompletionTaskGroups.putIfAbsent(groupId, Lists.newCopyOnWriteArrayList()); - - CopyOnWriteArrayList taskGroupList = pendingCompletionTaskGroups.get(groupId); + final CopyOnWriteArrayList taskGroupList = pendingCompletionTaskGroups.computeIfAbsent( + groupId, + k -> new CopyOnWriteArrayList<>() + ); for (TaskGroup taskGroup : taskGroupList) { if (taskGroup.partitionOffsets.equals(startingPartitions)) { if (taskGroup.tasks.putIfAbsent(taskId, new TaskData()) == null) { @@ -1392,8 +1391,7 @@ private void checkTaskDuration() throws InterruptedException, ExecutionException if (endOffsets != null) { // set a timeout and put this group in pendingCompletionTaskGroups so that it can be monitored for completion group.completionTimeout = DateTimes.nowUtc().plus(ioConfig.getCompletionTimeout()); - pendingCompletionTaskGroups.putIfAbsent(groupId, Lists.newCopyOnWriteArrayList()); - pendingCompletionTaskGroups.get(groupId).add(group); + pendingCompletionTaskGroups.computeIfAbsent(groupId, k -> new CopyOnWriteArrayList<>()).add(group); // set endOffsets as the next startOffsets for (Map.Entry entry : endOffsets.entrySet()) { @@ -1413,7 +1411,6 @@ private void checkTaskDuration() throws InterruptedException, ExecutionException partitionGroups.get(groupId).replaceAll((partition, offset) -> NOT_SET); } - sequenceTaskGroup.remove(generateSequenceName(group)); // remove this task group from the list of current task groups now that it has been handled taskGroups.remove(groupId); } @@ -1437,7 +1434,8 @@ private ListenableFuture> checkpointTaskGroup(final int group // metadata store (which will have advanced if we succeeded in publishing and will remain the same if publishing // failed and we need to re-ingest) return Futures.transform( - stopTasksInGroup(taskGroup), new Function>() + stopTasksInGroup(taskGroup), + new Function>() { @Nullable @Override @@ -1606,15 +1604,15 @@ private void checkPendingCompletionTasks() throws ExecutionException, Interrupte log.warn("All tasks in group [%d] failed to publish, killing all tasks for these partitions", groupId); } else { log.makeAlert( - "No task in [%s] succeeded before the completion timeout elapsed [%s]!", + "No task in [%s] for taskGroup [%d] succeeded before the completion timeout elapsed [%s]!", group.taskIds(), + groupId, ioConfig.getCompletionTimeout() ).emit(); } // reset partitions offsets for this task group so that they will be re-read from metadata storage partitionGroups.get(groupId).replaceAll((partition, offset) -> NOT_SET); - sequenceTaskGroup.remove(generateSequenceName(group)); // kill all the tasks in this pending completion group killTasksInGroup(group); // set a flag so the other pending completion groups for this set of partitions will also stop @@ -1674,7 +1672,6 @@ private void checkCurrentTaskState() throws ExecutionException, InterruptedExcep // be recreated with the next set of offsets if (taskData.status.isSuccess()) { futures.add(stopTasksInGroup(taskGroup)); - sequenceTaskGroup.remove(generateSequenceName(taskGroup)); iTaskGroups.remove(); break; } @@ -1716,7 +1713,6 @@ void createNewTasks() throws JsonProcessingException groupId, taskGroup ); - sequenceTaskGroup.put(generateSequenceName(taskGroup), taskGroups.get(groupId)); } } @@ -1759,6 +1755,7 @@ private void createKafkaTasksForGroup(int groupId, int replicas) throws JsonProc DateTime maximumMessageTime = taskGroups.get(groupId).maximumMessageTime.orNull(); KafkaIOConfig kafkaIOConfig = new KafkaIOConfig( + groupId, sequenceName, new KafkaPartitions(ioConfig.getTopic(), startPartitions), new KafkaPartitions(ioConfig.getTopic(), endPartitions), @@ -1924,7 +1921,7 @@ private boolean isTaskCurrent(int taskGroupId, String taskId) } } - private ListenableFuture stopTasksInGroup(TaskGroup taskGroup) + private ListenableFuture stopTasksInGroup(@Nullable TaskGroup taskGroup) { if (taskGroup == null) { return Futures.immediateFuture(null); @@ -2196,4 +2193,26 @@ Runnable updateCurrentAndLatestOffsets() } }; } + + @VisibleForTesting + @Nullable + TaskGroup removeTaskGroup(int taskGroupId) + { + return taskGroups.remove(taskGroupId); + } + + @VisibleForTesting + void moveTaskGroupToPendingCompletion(int taskGroupId) + { + final TaskGroup taskGroup = taskGroups.remove(taskGroupId); + if (taskGroup != null) { + pendingCompletionTaskGroups.computeIfAbsent(taskGroupId, k -> new CopyOnWriteArrayList<>()).add(taskGroup); + } + } + + @VisibleForTesting + int getNoticesQueueSize() + { + return notices.size(); + } } diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIOConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIOConfigTest.java index 22a792f7e6f2..91335ab8ceeb 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIOConfigTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIOConfigTest.java @@ -50,6 +50,7 @@ public void testSerdeWithDefaults() throws Exception { String jsonStr = "{\n" + " \"type\": \"kafka\",\n" + + " \"taskGroupId\": 0,\n" + " \"baseSequenceName\": \"my-sequence-name\",\n" + " \"startPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n" + " \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n" @@ -82,6 +83,7 @@ public void testSerdeWithNonDefaults() throws Exception { String jsonStr = "{\n" + " \"type\": \"kafka\",\n" + + " \"taskGroupId\": 0,\n" + " \"baseSequenceName\": \"my-sequence-name\",\n" + " \"startPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n" + " \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n" @@ -118,6 +120,7 @@ public void testBaseSequenceNameRequired() throws Exception { String jsonStr = "{\n" + " \"type\": \"kafka\",\n" + + " \"taskGroupId\": 0,\n" + " \"startPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n" + " \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n" + " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n" @@ -137,6 +140,7 @@ public void testStartPartitionsRequired() throws Exception { String jsonStr = "{\n" + " \"type\": \"kafka\",\n" + + " \"taskGroupId\": 0,\n" + " \"baseSequenceName\": \"my-sequence-name\",\n" + " \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n" + " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n" @@ -156,6 +160,7 @@ public void testEndPartitionsRequired() throws Exception { String jsonStr = "{\n" + " \"type\": \"kafka\",\n" + + " \"taskGroupId\": 0,\n" + " \"baseSequenceName\": \"my-sequence-name\",\n" + " \"startPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n" + " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n" @@ -175,6 +180,7 @@ public void testConsumerPropertiesRequired() throws Exception { String jsonStr = "{\n" + " \"type\": \"kafka\",\n" + + " \"taskGroupId\": 0,\n" + " \"baseSequenceName\": \"my-sequence-name\",\n" + " \"startPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n" + " \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n" @@ -194,6 +200,7 @@ public void testStartAndEndTopicMatch() throws Exception { String jsonStr = "{\n" + " \"type\": \"kafka\",\n" + + " \"taskGroupId\": 0,\n" + " \"baseSequenceName\": \"my-sequence-name\",\n" + " \"startPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n" + " \"endPartitions\": {\"topic\":\"other\", \"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n" @@ -214,6 +221,7 @@ public void testStartAndEndPartitionSetMatch() throws Exception { String jsonStr = "{\n" + " \"type\": \"kafka\",\n" + + " \"taskGroupId\": 0,\n" + " \"baseSequenceName\": \"my-sequence-name\",\n" + " \"startPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n" + " \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15}},\n" @@ -234,6 +242,7 @@ public void testEndOffsetGreaterThanStart() throws Exception { String jsonStr = "{\n" + " \"type\": \"kafka\",\n" + + " \"taskGroupId\": 0,\n" + " \"baseSequenceName\": \"my-sequence-name\",\n" + " \"startPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n" + " \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15, \"1\":2}},\n" diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java index 4232b582217c..2a852d4a8ad9 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -238,17 +238,17 @@ public KafkaIndexTaskTest(boolean isIncrementalHandoffSupported) private static List> generateRecords(String topic) { return ImmutableList.of( - new ProducerRecord(topic, 0, null, JB("2008", "a", "y", 1.0f)), - new ProducerRecord(topic, 0, null, JB("2009", "b", "y", 1.0f)), - new ProducerRecord(topic, 0, null, JB("2010", "c", "y", 1.0f)), - new ProducerRecord(topic, 0, null, JB("2011", "d", "y", 1.0f)), - new ProducerRecord(topic, 0, null, JB("2011", "e", "y", 1.0f)), - new ProducerRecord(topic, 0, null, JB("246140482-04-24T15:36:27.903Z", "x", "z", 1.0f)), - new ProducerRecord(topic, 0, null, StringUtils.toUtf8("unparseable")), - new ProducerRecord(topic, 0, null, null), - new ProducerRecord(topic, 0, null, JB("2013", "f", "y", 1.0f)), - new ProducerRecord(topic, 1, null, JB("2012", "g", "y", 1.0f)), - new ProducerRecord(topic, 1, null, JB("2011", "h", "y", 1.0f)) + new ProducerRecord<>(topic, 0, null, JB("2008", "a", "y", 1.0f)), + new ProducerRecord<>(topic, 0, null, JB("2009", "b", "y", 1.0f)), + new ProducerRecord<>(topic, 0, null, JB("2010", "c", "y", 1.0f)), + new ProducerRecord<>(topic, 0, null, JB("2011", "d", "y", 1.0f)), + new ProducerRecord<>(topic, 0, null, JB("2011", "e", "y", 1.0f)), + new ProducerRecord<>(topic, 0, null, JB("246140482-04-24T15:36:27.903Z", "x", "z", 1.0f)), + new ProducerRecord<>(topic, 0, null, StringUtils.toUtf8("unparseable")), + new ProducerRecord<>(topic, 0, null, null), + new ProducerRecord<>(topic, 0, null, JB("2013", "f", "y", 1.0f)), + new ProducerRecord<>(topic, 1, null, JB("2012", "g", "y", 1.0f)), + new ProducerRecord<>(topic, 1, null, JB("2011", "h", "y", 1.0f)) ); } @@ -345,6 +345,7 @@ public void testRunAfterDataInserted() throws Exception final KafkaIndexTask task = createTask( null, new KafkaIOConfig( + 0, "sequence0", new KafkaPartitions(topic, ImmutableMap.of(0, 2L)), new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), @@ -386,6 +387,7 @@ public void testRunBeforeDataInserted() throws Exception final KafkaIndexTask task = createTask( null, new KafkaIOConfig( + 0, "sequence0", new KafkaPartitions(topic, ImmutableMap.of(0, 2L)), new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), @@ -461,6 +463,7 @@ public void testIncrementalHandOff() throws Exception final KafkaIndexTask task = createTask( null, new KafkaIOConfig( + 0, baseSequenceName, startPartitions, endPartitions, @@ -482,14 +485,16 @@ public void testIncrementalHandOff() throws Exception Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); Assert.assertEquals(1, checkpointRequestsHash.size()); - Assert.assertTrue(checkpointRequestsHash.contains( - Objects.hash( - DATA_SCHEMA.getDataSource(), - baseSequenceName, - new KafkaDataSourceMetadata(startPartitions), - new KafkaDataSourceMetadata(new KafkaPartitions(topic, currentOffsets)) + Assert.assertTrue( + checkpointRequestsHash.contains( + Objects.hash( + DATA_SCHEMA.getDataSource(), + 0, + new KafkaDataSourceMetadata(startPartitions), + new KafkaDataSourceMetadata(new KafkaPartitions(topic, currentOffsets)) + ) ) - )); + ); // Check metrics Assert.assertEquals(8, task.getFireDepartmentMetrics().processed()); @@ -528,6 +533,7 @@ public void testRunWithMinimumMessageTime() throws Exception final KafkaIndexTask task = createTask( null, new KafkaIOConfig( + 0, "sequence0", new KafkaPartitions(topic, ImmutableMap.of(0, 0L)), new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), @@ -581,6 +587,7 @@ public void testRunWithMaximumMessageTime() throws Exception final KafkaIndexTask task = createTask( null, new KafkaIOConfig( + 0, "sequence0", new KafkaPartitions(topic, ImmutableMap.of(0, 0L)), new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), @@ -644,6 +651,7 @@ public void testRunWithTransformSpec() throws Exception ) ), new KafkaIOConfig( + 0, "sequence0", new KafkaPartitions(topic, ImmutableMap.of(0, 0L)), new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), @@ -703,6 +711,7 @@ public void testRunOnNothing() throws Exception final KafkaIndexTask task = createTask( null, new KafkaIOConfig( + 0, "sequence0", new KafkaPartitions(topic, ImmutableMap.of(0, 2L)), new KafkaPartitions(topic, ImmutableMap.of(0, 2L)), @@ -743,6 +752,7 @@ public void testHandoffConditionTimeoutWhenHandoffOccurs() throws Exception final KafkaIndexTask task = createTask( null, new KafkaIOConfig( + 0, "sequence0", new KafkaPartitions(topic, ImmutableMap.of(0, 2L)), new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), @@ -794,6 +804,7 @@ public void testHandoffConditionTimeoutWhenHandoffDoesNotOccur() throws Exceptio final KafkaIndexTask task = createTask( null, new KafkaIOConfig( + 0, "sequence0", new KafkaPartitions(topic, ImmutableMap.of(0, 2L)), new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), @@ -844,6 +855,7 @@ public void testReportParseExceptions() throws Exception final KafkaIndexTask task = createTask( null, new KafkaIOConfig( + 0, "sequence0", new KafkaPartitions(topic, ImmutableMap.of(0, 2L)), new KafkaPartitions(topic, ImmutableMap.of(0, 7L)), @@ -876,6 +888,7 @@ public void testRunReplicas() throws Exception final KafkaIndexTask task1 = createTask( null, new KafkaIOConfig( + 0, "sequence0", new KafkaPartitions(topic, ImmutableMap.of(0, 2L)), new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), @@ -889,6 +902,7 @@ public void testRunReplicas() throws Exception final KafkaIndexTask task2 = createTask( null, new KafkaIOConfig( + 0, "sequence0", new KafkaPartitions(topic, ImmutableMap.of(0, 2L)), new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), @@ -942,6 +956,7 @@ public void testRunConflicting() throws Exception final KafkaIndexTask task1 = createTask( null, new KafkaIOConfig( + 0, "sequence0", new KafkaPartitions(topic, ImmutableMap.of(0, 2L)), new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), @@ -955,6 +970,7 @@ public void testRunConflicting() throws Exception final KafkaIndexTask task2 = createTask( null, new KafkaIOConfig( + 1, "sequence1", new KafkaPartitions(topic, ImmutableMap.of(0, 3L)), new KafkaPartitions(topic, ImmutableMap.of(0, 9L)), @@ -1009,6 +1025,7 @@ public void testRunConflictingWithoutTransactions() throws Exception final KafkaIndexTask task1 = createTask( null, new KafkaIOConfig( + 0, "sequence0", new KafkaPartitions(topic, ImmutableMap.of(0, 2L)), new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), @@ -1022,6 +1039,7 @@ public void testRunConflictingWithoutTransactions() throws Exception final KafkaIndexTask task2 = createTask( null, new KafkaIOConfig( + 1, "sequence1", new KafkaPartitions(topic, ImmutableMap.of(0, 3L)), new KafkaPartitions(topic, ImmutableMap.of(0, 9L)), @@ -1081,6 +1099,7 @@ public void testRunOneTaskTwoPartitions() throws Exception final KafkaIndexTask task = createTask( null, new KafkaIOConfig( + 0, "sequence0", new KafkaPartitions(topic, ImmutableMap.of(0, 2L, 1, 0L)), new KafkaPartitions(topic, ImmutableMap.of(0, 5L, 1, 2L)), @@ -1145,6 +1164,7 @@ public void testRunTwoTasksTwoPartitions() throws Exception final KafkaIndexTask task1 = createTask( null, new KafkaIOConfig( + 0, "sequence0", new KafkaPartitions(topic, ImmutableMap.of(0, 2L)), new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), @@ -1158,6 +1178,7 @@ public void testRunTwoTasksTwoPartitions() throws Exception final KafkaIndexTask task2 = createTask( null, new KafkaIOConfig( + 1, "sequence1", new KafkaPartitions(topic, ImmutableMap.of(1, 0L)), new KafkaPartitions(topic, ImmutableMap.of(1, 1L)), @@ -1213,6 +1234,7 @@ public void testRestore() throws Exception final KafkaIndexTask task1 = createTask( null, new KafkaIOConfig( + 0, "sequence0", new KafkaPartitions(topic, ImmutableMap.of(0, 2L)), new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), @@ -1249,6 +1271,7 @@ public void testRestore() throws Exception final KafkaIndexTask task2 = createTask( task1.getId(), new KafkaIOConfig( + 0, "sequence0", new KafkaPartitions(topic, ImmutableMap.of(0, 2L)), new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), @@ -1300,6 +1323,7 @@ public void testRunWithPauseAndResume() throws Exception final KafkaIndexTask task = createTask( null, new KafkaIOConfig( + 0, "sequence0", new KafkaPartitions(topic, ImmutableMap.of(0, 2L)), new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), @@ -1383,6 +1407,7 @@ public void testRunWithOffsetOutOfRangeExceptionAndPause() throws Exception final KafkaIndexTask task = createTask( null, new KafkaIOConfig( + 0, "sequence0", new KafkaPartitions(topic, ImmutableMap.of(0, 2L)), new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), @@ -1421,6 +1446,7 @@ public void testRunWithOffsetOutOfRangeExceptionAndNextOffsetGreaterThanLeastAva final KafkaIndexTask task = createTask( null, new KafkaIOConfig( + 0, "sequence0", new KafkaPartitions(topic, ImmutableMap.of(0, 200L)), new KafkaPartitions(topic, ImmutableMap.of(0, 500L)), @@ -1473,6 +1499,7 @@ public void testRunContextSequenceAheadOfStartingOffsets() throws Exception final KafkaIndexTask task = createTask( null, new KafkaIOConfig( + 0, "sequence0", // task should ignore these and use sequence info sent in the context new KafkaPartitions(topic, ImmutableMap.of(0, 0L)), @@ -1749,18 +1776,20 @@ private void makeToolboxFactory() throws IOException @Override public boolean checkPointDataSourceMetadata( String supervisorId, - @Nullable String sequenceName, + int taskGroupId, @Nullable DataSourceMetadata previousDataSourceMetadata, @Nullable DataSourceMetadata currentDataSourceMetadata ) { log.info("Adding checkpoint hash to the set"); - checkpointRequestsHash.add(Objects.hash( - supervisorId, - sequenceName, - previousDataSourceMetadata, - currentDataSourceMetadata - )); + checkpointRequestsHash.add( + Objects.hash( + supervisorId, + taskGroupId, + previousDataSourceMetadata, + currentDataSourceMetadata + ) + ); return true; } } diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 60a47f0d58c0..2add456df8e6 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -19,6 +19,7 @@ package io.druid.indexing.kafka.supervisor; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Charsets; import com.google.common.base.Optional; @@ -29,13 +30,11 @@ import io.druid.data.input.impl.DimensionSchema; import io.druid.data.input.impl.DimensionsSpec; import io.druid.data.input.impl.JSONParseSpec; -import io.druid.java.util.common.parsers.JSONPathFieldSpec; -import io.druid.java.util.common.parsers.JSONPathSpec; import io.druid.data.input.impl.StringDimensionSchema; import io.druid.data.input.impl.StringInputRowParser; import io.druid.data.input.impl.TimestampSpec; -import io.druid.indexing.common.TaskInfoProvider; import io.druid.indexer.TaskLocation; +import io.druid.indexing.common.TaskInfoProvider; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.task.RealtimeIndexTask; import io.druid.indexing.common.task.Task; @@ -60,6 +59,9 @@ import io.druid.java.util.common.ISE; import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.granularity.Granularities; +import io.druid.java.util.common.parsers.JSONPathFieldSpec; +import io.druid.java.util.common.parsers.JSONPathSpec; +import io.druid.java.util.emitter.EmittingLogger; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.segment.TestHelper; @@ -68,6 +70,7 @@ import io.druid.segment.indexing.granularity.UniformGranularitySpec; import io.druid.segment.realtime.FireDepartment; import io.druid.server.metrics.DruidMonitorSchedulerConfig; +import io.druid.server.metrics.ExceptionCapturingServiceEmitter; import io.druid.server.metrics.NoopServiceEmitter; import org.apache.curator.test.TestingCluster; import org.apache.kafka.clients.producer.KafkaProducer; @@ -97,7 +100,9 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; +import java.util.concurrent.TimeoutException; import static org.easymock.EasyMock.anyBoolean; import static org.easymock.EasyMock.anyObject; @@ -138,6 +143,7 @@ public class KafkaSupervisorTest extends EasyMockSupport private KafkaIndexTaskClient taskClient; private TaskQueue taskQueue; private String topic; + private ExceptionCapturingServiceEmitter serviceEmitter; private static String getTopic() { @@ -204,6 +210,8 @@ public void setupTest() throws Exception ); topic = getTopic(); + serviceEmitter = new ExceptionCapturingServiceEmitter(); + EmittingLogger.registerEmitter(serviceEmitter); } @After @@ -544,7 +552,7 @@ public void testKillIncompatibleTasks() throws Exception Task id1 = createKafkaIndexTask( "id1", DATASOURCE, - "index_kafka_testDS__some_other_sequenceName", + 1, new KafkaPartitions("topic", ImmutableMap.of(0, 0L)), new KafkaPartitions("topic", ImmutableMap.of(0, 10L)), null, @@ -555,7 +563,7 @@ public void testKillIncompatibleTasks() throws Exception Task id2 = createKafkaIndexTask( "id2", DATASOURCE, - "sequenceName-0", + 0, new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)), new KafkaPartitions("topic", ImmutableMap.of(0, 333L, 1, 333L, 2, 333L)), null, @@ -566,7 +574,7 @@ public void testKillIncompatibleTasks() throws Exception Task id3 = createKafkaIndexTask( "id3", DATASOURCE, - "index_kafka_testDS__some_other_sequenceName", + 1, new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 1L)), new KafkaPartitions("topic", ImmutableMap.of(0, 333L, 1, 333L, 2, 330L)), null, @@ -577,7 +585,7 @@ public void testKillIncompatibleTasks() throws Exception Task id4 = createKafkaIndexTask( "id4", "other-datasource", - "index_kafka_testDS_d927edff33c4b3f", + 2, new KafkaPartitions("topic", ImmutableMap.of(0, 0L)), new KafkaPartitions("topic", ImmutableMap.of(0, 10L)), null, @@ -625,7 +633,9 @@ public void testKillIncompatibleTasks() throws Exception TreeMap> checkpoints = new TreeMap<>(); checkpoints.put(0, ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)); - expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(2); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints)) + .times(2); replayAll(); @@ -643,7 +653,7 @@ public void testKillBadPartitionAssignment() throws Exception Task id1 = createKafkaIndexTask( "id1", DATASOURCE, - "sequenceName-0", + 0, new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 2, 0L)), new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 2, Long.MAX_VALUE)), null, @@ -652,7 +662,7 @@ public void testKillBadPartitionAssignment() throws Exception Task id2 = createKafkaIndexTask( "id2", DATASOURCE, - "sequenceName-1", + 1, new KafkaPartitions("topic", ImmutableMap.of(1, 0L)), new KafkaPartitions("topic", ImmutableMap.of(1, Long.MAX_VALUE)), null, @@ -661,7 +671,7 @@ public void testKillBadPartitionAssignment() throws Exception Task id3 = createKafkaIndexTask( "id3", DATASOURCE, - "sequenceName-0", + 0, new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)), new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)), null, @@ -670,7 +680,7 @@ public void testKillBadPartitionAssignment() throws Exception Task id4 = createKafkaIndexTask( "id4", DATASOURCE, - "sequenceName-0", + 0, new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L)), new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE)), null, @@ -679,7 +689,7 @@ public void testKillBadPartitionAssignment() throws Exception Task id5 = createKafkaIndexTask( "id5", DATASOURCE, - "sequenceName-0", + 0, new KafkaPartitions("topic", ImmutableMap.of(1, 0L, 2, 0L)), new KafkaPartitions("topic", ImmutableMap.of(1, Long.MAX_VALUE, 2, Long.MAX_VALUE)), null, @@ -718,8 +728,12 @@ public void testKillBadPartitionAssignment() throws Exception checkpoints1.put(0, ImmutableMap.of(0, 0L, 2, 0L)); TreeMap> checkpoints2 = new TreeMap<>(); checkpoints2.put(0, ImmutableMap.of(1, 0L)); - expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints1)).times(1); - expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints2)).times(1); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints1)) + .times(1); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints2)) + .times(1); taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class)); taskQueue.shutdown("id4"); @@ -756,10 +770,12 @@ public void testRequeueTaskWhenFailed() throws Exception checkpoints1.put(0, ImmutableMap.of(0, 0L, 2, 0L)); TreeMap> checkpoints2 = new TreeMap<>(); checkpoints2.put(0, ImmutableMap.of(1, 0L)); - expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints1)) - .anyTimes(); - expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints2)) - .anyTimes(); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints1)) + .anyTimes(); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints2)) + .anyTimes(); taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class)); replayAll(); @@ -821,7 +837,7 @@ public void testRequeueAdoptedTaskWhenFailed() throws Exception Task id1 = createKafkaIndexTask( "id1", DATASOURCE, - "sequenceName-0", + 0, new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 2, 0L)), new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 2, Long.MAX_VALUE)), now, @@ -848,7 +864,9 @@ public void testRequeueAdoptedTaskWhenFailed() throws Exception TreeMap> checkpoints = new TreeMap<>(); checkpoints.put(0, ImmutableMap.of(0, 0L, 2, 0L)); - expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(2); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints)) + .times(2); taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class)); replayAll(); @@ -869,9 +887,12 @@ public void testRequeueAdoptedTaskWhenFailed() throws Exception reset(taskClient); // for the newly created replica task - expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)) - .times(2); - expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints)) + .times(2); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints)) + .times(1); expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(captured.getValue())).anyTimes(); expect(taskStorage.getStatus(iHaveFailed.getId())).andReturn(Optional.of(TaskStatus.failure(iHaveFailed.getId()))); @@ -944,10 +965,12 @@ public void testQueueNextTasksOnSuccess() throws Exception TreeMap> checkpoints2 = new TreeMap<>(); checkpoints2.put(0, ImmutableMap.of(1, 0L)); // there would be 4 tasks, 2 for each task group - expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints1)) - .times(2); - expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints2)) - .times(2); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints1)) + .times(2); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints2)) + .times(2); expect(taskStorage.getActiveTasks()).andReturn(tasks).anyTimes(); for (Task task : tasks) { @@ -1054,10 +1077,12 @@ public void testBeginPublishAndQueueNextTasks() throws Exception checkpoints1.put(0, ImmutableMap.of(0, 0L, 2, 0L)); TreeMap> checkpoints2 = new TreeMap<>(); checkpoints2.put(0, ImmutableMap.of(1, 0L)); - expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints1)) - .times(2); - expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints2)) - .times(2); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints1)) + .times(2); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints2)) + .times(2); replay(taskStorage, taskRunner, taskClient, taskQueue); @@ -1091,7 +1116,7 @@ public void testDiscoverExistingPublishingTask() throws Exception Task task = createKafkaIndexTask( "id1", DATASOURCE, - "sequenceName-0", + 0, new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)), new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)), null, @@ -1185,7 +1210,7 @@ public void testDiscoverExistingPublishingTaskWithDifferentPartitionAllocation() Task task = createKafkaIndexTask( "id1", DATASOURCE, - "sequenceName-0", + 0, new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 2, 0L)), new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 2, Long.MAX_VALUE)), null, @@ -1277,7 +1302,7 @@ public void testDiscoverExistingPublishingAndReadingTask() throws Exception Task id1 = createKafkaIndexTask( "id1", DATASOURCE, - "sequenceName-0", + 0, new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)), new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)), null, @@ -1287,7 +1312,7 @@ public void testDiscoverExistingPublishingAndReadingTask() throws Exception Task id2 = createKafkaIndexTask( "id2", DATASOURCE, - "sequenceName-0", + 0, new KafkaPartitions("topic", ImmutableMap.of(0, 1L, 1, 2L, 2, 3L)), new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)), null, @@ -1325,7 +1350,9 @@ public void testDiscoverExistingPublishingAndReadingTask() throws Exception // since id1 is publishing, so getCheckpoints wouldn't be called for it TreeMap> checkpoints = new TreeMap<>(); checkpoints.put(0, ImmutableMap.of(0, 1L, 1, 2L, 2, 3L)); - expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints)) + .times(1); replayAll(); @@ -1401,10 +1428,12 @@ public void testKillUnresponsiveTasksWhileGettingStartTime() throws Exception checkpoints1.put(0, ImmutableMap.of(0, 0L, 2, 0L)); TreeMap> checkpoints2 = new TreeMap<>(); checkpoints2.put(0, ImmutableMap.of(1, 0L)); - expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints1)) - .times(2); - expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints2)) - .times(2); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints1)) + .times(2); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints2)) + .times(2); expect(taskStorage.getActiveTasks()).andReturn(tasks).anyTimes(); for (Task task : tasks) { @@ -1460,10 +1489,12 @@ public void testKillUnresponsiveTasksWhilePausing() throws Exception checkpoints1.put(0, ImmutableMap.of(0, 0L, 2, 0L)); TreeMap> checkpoints2 = new TreeMap<>(); checkpoints2.put(0, ImmutableMap.of(1, 0L)); - expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints1)) - .times(2); - expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints2)) - .times(2); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints1)) + .times(2); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints2)) + .times(2); captured = Capture.newInstance(CaptureType.ALL); expect(taskStorage.getActiveTasks()).andReturn(tasks).anyTimes(); @@ -1537,10 +1568,12 @@ public void testKillUnresponsiveTasksWhileSettingEndOffsets() throws Exception checkpoints1.put(0, ImmutableMap.of(0, 0L, 2, 0L)); TreeMap> checkpoints2 = new TreeMap<>(); checkpoints2.put(0, ImmutableMap.of(1, 0L)); - expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints1)) - .times(2); - expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints2)) - .times(2); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints1)) + .times(2); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints2)) + .times(2); captured = Capture.newInstance(CaptureType.ALL); expect(taskStorage.getActiveTasks()).andReturn(tasks).anyTimes(); @@ -1619,7 +1652,7 @@ public void testStopGracefully() throws Exception Task id1 = createKafkaIndexTask( "id1", DATASOURCE, - "sequenceName-0", + 0, new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)), new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)), null, @@ -1629,7 +1662,7 @@ public void testStopGracefully() throws Exception Task id2 = createKafkaIndexTask( "id2", DATASOURCE, - "sequenceName-0", + 0, new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)), new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)), null, @@ -1639,7 +1672,7 @@ public void testStopGracefully() throws Exception Task id3 = createKafkaIndexTask( "id3", DATASOURCE, - "sequenceName-0", + 0, new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)), new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)), null, @@ -1675,8 +1708,12 @@ public void testStopGracefully() throws Exception // getCheckpoints will not be called for id1 as it is in publishing state TreeMap> checkpoints = new TreeMap<>(); checkpoints.put(0, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)); - expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1); - expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints)) + .times(1); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints)) + .times(1); taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class)); replayAll(); @@ -1821,7 +1858,7 @@ public void testResetRunningTasks() throws Exception Task id1 = createKafkaIndexTask( "id1", DATASOURCE, - "sequenceName-0", + 0, new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)), new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)), null, @@ -1831,7 +1868,7 @@ public void testResetRunningTasks() throws Exception Task id2 = createKafkaIndexTask( "id2", DATASOURCE, - "sequenceName-0", + 0, new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)), new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)), null, @@ -1841,7 +1878,7 @@ public void testResetRunningTasks() throws Exception Task id3 = createKafkaIndexTask( "id3", DATASOURCE, - "sequenceName-0", + 0, new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)), new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)), null, @@ -1876,8 +1913,12 @@ public void testResetRunningTasks() throws Exception TreeMap> checkpoints = new TreeMap<>(); checkpoints.put(0, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)); - expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1); - expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints)) + .times(1); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints)) + .times(1); taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class)); replayAll(); @@ -1905,7 +1946,7 @@ public void testNoDataIngestionTasks() throws Exception Task id1 = createKafkaIndexTask( "id1", DATASOURCE, - "sequenceName-0", + 0, new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)), new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)), null, @@ -1915,7 +1956,7 @@ public void testNoDataIngestionTasks() throws Exception Task id2 = createKafkaIndexTask( "id2", DATASOURCE, - "sequenceName-0", + 0, new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)), new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)), null, @@ -1925,7 +1966,7 @@ public void testNoDataIngestionTasks() throws Exception Task id3 = createKafkaIndexTask( "id3", DATASOURCE, - "sequenceName-0", + 0, new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)), new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)), null, @@ -1955,9 +1996,15 @@ public void testNoDataIngestionTasks() throws Exception TreeMap> checkpoints = new TreeMap<>(); checkpoints.put(0, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)); - expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1); - expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1); - expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints)) + .times(1); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints)) + .times(1); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints)) + .times(1); taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class)); replayAll(); @@ -1977,6 +2024,172 @@ public void testNoDataIngestionTasks() throws Exception verifyAll(); } + @Test(timeout = 60_000L) + public void testCheckpointForInactiveTaskGroup() + throws InterruptedException, ExecutionException, TimeoutException, JsonProcessingException + { + supervisor = getSupervisor(2, 1, true, "PT1S", null, null, false); + //not adding any events + final Task id1 = createKafkaIndexTask( + "id1", + DATASOURCE, + 0, + new KafkaPartitions(topic, ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)), + new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)), + null, + null + ); + + final Task id2 = createKafkaIndexTask( + "id2", + DATASOURCE, + 0, + new KafkaPartitions(topic, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)), + new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)), + null, + null + ); + + final Task id3 = createKafkaIndexTask( + "id3", + DATASOURCE, + 0, + new KafkaPartitions(topic, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)), + new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)), + null, + null + ); + + expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); + expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); + expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); + expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); + expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes(); + expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes(); + expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes(); + expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes(); + expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes(); + expect( + indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(new KafkaDataSourceMetadata(null) + ).anyTimes(); + expect(taskClient.getStatusAsync("id1")).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.READING)); + expect(taskClient.getStatusAsync("id2")).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.READING)); + expect(taskClient.getStatusAsync("id3")).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.READING)); + + final DateTime startTime = DateTimes.nowUtc(); + expect(taskClient.getStartTimeAsync("id1")).andReturn(Futures.immediateFuture(startTime)); + expect(taskClient.getStartTimeAsync("id2")).andReturn(Futures.immediateFuture(startTime)); + expect(taskClient.getStartTimeAsync("id3")).andReturn(Futures.immediateFuture(startTime)); + + final TreeMap> checkpoints = new TreeMap<>(); + checkpoints.put(0, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints)) + .times(1); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints)) + .times(1); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints)) + .times(1); + + taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class)); + replayAll(); + + supervisor.start(); + supervisor.runInternal(); + + final Map fakeCheckpoints = Collections.emptyMap(); + supervisor.moveTaskGroupToPendingCompletion(0); + supervisor.checkpoint( + 0, + new KafkaDataSourceMetadata(new KafkaPartitions(topic, checkpoints.get(0))), + new KafkaDataSourceMetadata(new KafkaPartitions(topic, fakeCheckpoints)) + ); + + while (supervisor.getNoticesQueueSize() > 0) { + Thread.sleep(100); + } + + verifyAll(); + + Assert.assertNull(serviceEmitter.getStackTrace()); + Assert.assertNull(serviceEmitter.getExceptionMessage()); + Assert.assertNull(serviceEmitter.getExceptionClass()); + } + + @Test(timeout = 60_000L) + public void testCheckpointForUnknownTaskGroup() throws InterruptedException + { + supervisor = getSupervisor(2, 1, true, "PT1S", null, null, false); + //not adding any events + final Task id1 = createKafkaIndexTask( + "id1", + DATASOURCE, + 0, + new KafkaPartitions(topic, ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)), + new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)), + null, + null + ); + + final Task id2 = createKafkaIndexTask( + "id2", + DATASOURCE, + 0, + new KafkaPartitions(topic, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)), + new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)), + null, + null + ); + + final Task id3 = createKafkaIndexTask( + "id3", + DATASOURCE, + 0, + new KafkaPartitions(topic, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)), + new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)), + null, + null + ); + + expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); + expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); + expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); + expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); + expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes(); + expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes(); + expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes(); + expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes(); + expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes(); + expect( + indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(new KafkaDataSourceMetadata(null) + ).anyTimes(); + + replayAll(); + + supervisor.start(); + + supervisor.checkpoint( + 0, + new KafkaDataSourceMetadata(new KafkaPartitions(topic, Collections.emptyMap())), + new KafkaDataSourceMetadata(new KafkaPartitions(topic, Collections.emptyMap())) + ); + + while (supervisor.getNoticesQueueSize() > 0) { + Thread.sleep(100); + } + + verifyAll(); + + Assert.assertNotNull(serviceEmitter.getStackTrace()); + Assert.assertEquals( + "WTH?! cannot find taskGroup [0] among all taskGroups [{}]", + serviceEmitter.getExceptionMessage() + ); + Assert.assertEquals(ISE.class, serviceEmitter.getExceptionClass()); + } + private void addSomeEvents(int numEventsPerPartition) throws Exception { try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { @@ -2101,7 +2314,7 @@ private static DataSchema getDataSchema(String dataSource) private KafkaIndexTask createKafkaIndexTask( String id, String dataSource, - String sequenceName, + int taskGroupId, KafkaPartitions startPartitions, KafkaPartitions endPartitions, DateTime minimumMessageTime, @@ -2114,7 +2327,8 @@ private KafkaIndexTask createKafkaIndexTask( getDataSchema(dataSource), tuningConfig, new KafkaIOConfig( - sequenceName, + taskGroupId, + "sequenceName-" + taskGroupId, startPartitions, endPartitions, ImmutableMap.of(), @@ -2123,7 +2337,7 @@ private KafkaIndexTask createKafkaIndexTask( maximumMessageTime, false ), - ImmutableMap.of(), + Collections.emptyMap(), null, null ); diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java index aca4a1099aef..083ef9e53551 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.base.Preconditions; import io.druid.indexing.common.task.Task; import io.druid.indexing.overlord.DataSourceMetadata; @@ -29,21 +30,21 @@ public class CheckPointDataSourceMetadataAction implements TaskAction { private final String supervisorId; - private final String sequenceName; + private final int taskGroupId; private final DataSourceMetadata previousCheckPoint; private final DataSourceMetadata currentCheckPoint; public CheckPointDataSourceMetadataAction( @JsonProperty("supervisorId") String supervisorId, - @JsonProperty("sequenceName") String sequenceName, + @JsonProperty("taskGroupId") Integer taskGroupId, @JsonProperty("previousCheckPoint") DataSourceMetadata previousCheckPoint, @JsonProperty("currentCheckPoint") DataSourceMetadata currentCheckPoint ) { - this.supervisorId = supervisorId; - this.sequenceName = sequenceName; - this.previousCheckPoint = previousCheckPoint; - this.currentCheckPoint = currentCheckPoint; + this.supervisorId = Preconditions.checkNotNull(supervisorId, "supervisorId"); + this.taskGroupId = Preconditions.checkNotNull(taskGroupId, "taskGroupId"); + this.previousCheckPoint = Preconditions.checkNotNull(previousCheckPoint, "previousCheckPoint"); + this.currentCheckPoint = Preconditions.checkNotNull(currentCheckPoint, "currentCheckPoint"); } @JsonProperty @@ -53,9 +54,9 @@ public String getSupervisorId() } @JsonProperty - public String getSequenceName() + public int getTaskGroupId() { - return sequenceName; + return taskGroupId; } @JsonProperty @@ -83,8 +84,12 @@ public Boolean perform( Task task, TaskActionToolbox toolbox ) throws IOException { - return toolbox.getSupervisorManager() - .checkPointDataSourceMetadata(supervisorId, sequenceName, previousCheckPoint, currentCheckPoint); + return toolbox.getSupervisorManager().checkPointDataSourceMetadata( + supervisorId, + taskGroupId, + previousCheckPoint, + currentCheckPoint + ); } @Override @@ -98,7 +103,7 @@ public String toString() { return "CheckPointDataSourceMetadataAction{" + "supervisorId='" + supervisorId + '\'' + - ", sequenceName='" + sequenceName + '\'' + + ", taskGroupId='" + taskGroupId + '\'' + ", previousCheckPoint=" + previousCheckPoint + ", currentCheckPoint=" + currentCheckPoint + '}'; diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java index 00f2c92c4b75..31e54cfd521a 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java @@ -159,9 +159,9 @@ public boolean resetSupervisor(String id, @Nullable DataSourceMetadata dataSourc public boolean checkPointDataSourceMetadata( String supervisorId, - @Nullable String sequenceName, - @Nullable DataSourceMetadata previousDataSourceMetadata, - @Nullable DataSourceMetadata currentDataSourceMetadata + int taskGroupId, + DataSourceMetadata previousDataSourceMetadata, + DataSourceMetadata currentDataSourceMetadata ) { try { @@ -172,7 +172,7 @@ public boolean checkPointDataSourceMetadata( Preconditions.checkNotNull(supervisor, "supervisor could not be found"); - supervisor.lhs.checkpoint(sequenceName, previousDataSourceMetadata, currentDataSourceMetadata); + supervisor.lhs.checkpoint(taskGroupId, previousDataSourceMetadata, currentDataSourceMetadata); return true; } catch (Exception e) { diff --git a/java-util/src/main/java/io/druid/java/util/emitter/EmittingLogger.java b/java-util/src/main/java/io/druid/java/util/emitter/EmittingLogger.java index 55c679d4991b..4d9404bb34c1 100644 --- a/java-util/src/main/java/io/druid/java/util/emitter/EmittingLogger.java +++ b/java-util/src/main/java/io/druid/java/util/emitter/EmittingLogger.java @@ -35,6 +35,10 @@ */ public class EmittingLogger extends Logger { + public static final String EXCEPTION_TYPE_KEY = "exceptionType"; + public static final String EXCEPTION_MESSAGE_KEY = "exceptionMessage"; + public static final String EXCEPTION_STACK_TRACE_KEY = "exceptionStackTrace"; + private static volatile ServiceEmitter emitter = null; private final String className; diff --git a/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java b/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java index 380861d9fb8f..0ba0701e82d0 100644 --- a/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java +++ b/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java @@ -83,9 +83,9 @@ public void reset(DataSourceMetadata dataSourceMetadata) {} @Override public void checkpoint( - @Nullable String sequenceName, - @Nullable DataSourceMetadata previousCheckPoint, - @Nullable DataSourceMetadata currentCheckPoint + int taskGroupId, + DataSourceMetadata previousCheckPoint, + DataSourceMetadata currentCheckPoint ) { diff --git a/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java b/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java index 681421b48002..58e73ff3df71 100644 --- a/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java +++ b/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java @@ -21,8 +21,6 @@ import io.druid.indexing.overlord.DataSourceMetadata; -import javax.annotation.Nullable; - public interface Supervisor { void start(); @@ -45,13 +43,9 @@ public interface Supervisor * for example - Kafka Supervisor uses this to merge and handoff segments containing at least the data * represented by {@param currentCheckpoint} DataSourceMetadata * - * @param sequenceName unique Identifier to figure out for which sequence to do checkpointing + * @param taskGroupId unique Identifier to figure out for which sequence to do checkpointing * @param previousCheckPoint DataSourceMetadata checkpointed in previous call * @param currentCheckPoint current DataSourceMetadata to be checkpointed */ - void checkpoint( - @Nullable String sequenceName, - @Nullable DataSourceMetadata previousCheckPoint, - @Nullable DataSourceMetadata currentCheckPoint - ); + void checkpoint(int taskGroupId, DataSourceMetadata previousCheckPoint, DataSourceMetadata currentCheckPoint); } diff --git a/server/src/test/java/io/druid/server/metrics/ExceptionCapturingServiceEmitter.java b/server/src/test/java/io/druid/server/metrics/ExceptionCapturingServiceEmitter.java new file mode 100644 index 000000000000..a145046b4162 --- /dev/null +++ b/server/src/test/java/io/druid/server/metrics/ExceptionCapturingServiceEmitter.java @@ -0,0 +1,72 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.metrics; + +import io.druid.java.util.emitter.EmittingLogger; +import io.druid.java.util.emitter.core.Event; +import io.druid.java.util.emitter.service.ServiceEmitter; + +import javax.annotation.Nullable; +import java.util.Map; + +public class ExceptionCapturingServiceEmitter extends ServiceEmitter +{ + private volatile Class exceptionClass; + private volatile String exceptionMessage; + private volatile String stackTrace; + + public ExceptionCapturingServiceEmitter() + { + super("", "", null); + } + + @Override + public void emit(Event event) + { + //noinspection unchecked + final Map dataMap = (Map) event.toMap().get("data"); + final Class exceptionClass = (Class) dataMap.get(EmittingLogger.EXCEPTION_TYPE_KEY); + if (exceptionClass != null) { + final String exceptionMessage = (String) dataMap.get(EmittingLogger.EXCEPTION_MESSAGE_KEY); + final String stackTrace = (String) dataMap.get(EmittingLogger.EXCEPTION_STACK_TRACE_KEY); + this.exceptionClass = exceptionClass; + this.exceptionMessage = exceptionMessage; + this.stackTrace = stackTrace; + } + } + + @Nullable + public Class getExceptionClass() + { + return exceptionClass; + } + + @Nullable + public String getExceptionMessage() + { + return exceptionMessage; + } + + @Nullable + public String getStackTrace() + { + return stackTrace; + } +}