From f22fb2c2cf896b4b436e718548b825d7496aa470 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Tue, 1 Mar 2016 16:51:50 -0800 Subject: [PATCH] KafkaIndexTask. Reads a specific offset range from specific partitions, and can use dataSource metadata transactions to guarantee exactly-once ingestion. Each task has a finite lifecycle, so it is expected that some process will be supervising existing tasks and creating new ones when needed. --- distribution/pom.xml | 2 + .../kafka-indexing-service/pom.xml | 91 ++ .../kafka/KafkaDataSourceMetadata.java | 123 ++ .../druid/indexing/kafka/KafkaIOConfig.java | 115 ++ .../druid/indexing/kafka/KafkaIndexTask.java | 587 ++++++++ .../indexing/kafka/KafkaIndexTaskModule.java | 51 + .../druid/indexing/kafka/KafkaPartitions.java | 97 ++ .../indexing/kafka/KafkaTuningConfig.java | 147 ++ .../io.druid.initialization.DruidModule | 1 + .../kafka/KafkaDataSourceMetadataTest.java | 101 ++ .../indexing/kafka/KafkaIndexTaskTest.java | 1244 +++++++++++++++++ .../druid/indexing/kafka/test/TestBroker.java | 114 ++ pom.xml | 1 + 13 files changed, 2674 insertions(+) create mode 100644 extensions-core/kafka-indexing-service/pom.xml create mode 100644 extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaDataSourceMetadata.java create mode 100644 extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java create mode 100644 extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java create mode 100644 extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTaskModule.java create mode 100644 extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaPartitions.java create mode 100644 extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaTuningConfig.java create mode 100644 extensions-core/kafka-indexing-service/src/main/resources/META-INF/services/io.druid.initialization.DruidModule create mode 100644 extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaDataSourceMetadataTest.java create mode 100644 extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java create mode 100644 extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/test/TestBroker.java diff --git a/distribution/pom.xml b/distribution/pom.xml index eb6d4e20f569..2e089af916c6 100644 --- a/distribution/pom.xml +++ b/distribution/pom.xml @@ -81,6 +81,8 @@ -c io.druid.extensions:druid-kafka-extraction-namespace -c + io.druid.extensions:druid-kafka-indexing-service + -c io.druid.extensions:mysql-metadata-storage -c io.druid.extensions:druid-namespace-lookup diff --git a/extensions-core/kafka-indexing-service/pom.xml b/extensions-core/kafka-indexing-service/pom.xml new file mode 100644 index 000000000000..7b32b5bf48b0 --- /dev/null +++ b/extensions-core/kafka-indexing-service/pom.xml @@ -0,0 +1,91 @@ + + + + + 4.0.0 + + io.druid.extensions + druid-kafka-indexing-service + druid-kafka-indexing-service + druid-kafka-indexing-service + + + io.druid + druid + 0.9.0-SNAPSHOT + ../../pom.xml + + + + + io.druid + druid-api + + + io.druid + druid-indexing-service + 0.9.0-SNAPSHOT + provided + + + org.apache.kafka + kafka-clients + 0.9.0.0 + + + + + junit + junit + test + + + org.apache.kafka + kafka_2.11 + 0.9.0.0 + test + + + io.druid + druid-server + 0.9.0-SNAPSHOT + test-jar + test + + + io.druid + druid-indexing-service + 0.9.0-SNAPSHOT + test-jar + test + + + org.apache.curator + curator-test + test + + + org.easymock + easymock + test + + + + diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaDataSourceMetadata.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaDataSourceMetadata.java new file mode 100644 index 000000000000..84b61dca2b09 --- /dev/null +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaDataSourceMetadata.java @@ -0,0 +1,123 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.kafka; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.Maps; +import com.metamx.common.IAE; +import io.druid.indexing.overlord.DataSourceMetadata; + +import java.util.Map; +import java.util.Objects; + +public class KafkaDataSourceMetadata implements DataSourceMetadata +{ + private final KafkaPartitions kafkaPartitions; + + @JsonCreator + public KafkaDataSourceMetadata( + @JsonProperty("partitions") KafkaPartitions kafkaPartitions + ) + { + this.kafkaPartitions = kafkaPartitions; + } + + @JsonProperty("partitions") + public KafkaPartitions getKafkaPartitions() + { + return kafkaPartitions; + } + + @Override + public boolean isValidStart() + { + return true; + } + + @Override + public boolean matches(DataSourceMetadata other) + { + if (getClass() != other.getClass()) { + return false; + } + + return plus(other).equals(other.plus(this)); + } + + @Override + public DataSourceMetadata plus(DataSourceMetadata other) + { + if (!(other instanceof KafkaDataSourceMetadata)) { + throw new IAE( + "Expected instance of %s, got %s", + KafkaDataSourceMetadata.class.getCanonicalName(), + other.getClass().getCanonicalName() + ); + } + + final KafkaDataSourceMetadata that = (KafkaDataSourceMetadata) other; + + if (that.getKafkaPartitions().getTopic().equals(kafkaPartitions.getTopic())) { + // Same topic, merge offsets. + final Map newMap = Maps.newHashMap(); + + for (Map.Entry entry : kafkaPartitions.getPartitionOffsetMap().entrySet()) { + newMap.put(entry.getKey(), entry.getValue()); + } + + for (Map.Entry entry : that.getKafkaPartitions().getPartitionOffsetMap().entrySet()) { + newMap.put(entry.getKey(), entry.getValue()); + } + + return new KafkaDataSourceMetadata(new KafkaPartitions(kafkaPartitions.getTopic(), newMap)); + } else { + // Different topic, prefer "other". + return other; + } + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + KafkaDataSourceMetadata that = (KafkaDataSourceMetadata) o; + return Objects.equals(kafkaPartitions, that.kafkaPartitions); + } + + @Override + public int hashCode() + { + return Objects.hash(kafkaPartitions); + } + + @Override + public String toString() + { + return "KafkaDataSourceMetadata{" + + "kafkaPartitions=" + kafkaPartitions + + '}'; + } +} diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java new file mode 100644 index 000000000000..550465e6ef68 --- /dev/null +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java @@ -0,0 +1,115 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.kafka; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import io.druid.segment.indexing.IOConfig; + +import java.util.Map; + +public class KafkaIOConfig implements IOConfig +{ + private static final boolean DEFAULT_USE_TRANSACTION = true; + + private final String sequenceName; + private final KafkaPartitions startPartitions; + private final KafkaPartitions endPartitions; + private final Map consumerProperties; + private final boolean useTransaction; + + @JsonCreator + public KafkaIOConfig( + @JsonProperty("sequenceName") String sequenceName, + @JsonProperty("startPartitions") KafkaPartitions startPartitions, + @JsonProperty("endPartitions") KafkaPartitions endPartitions, + @JsonProperty("consumerProperties") Map consumerProperties, + @JsonProperty("useTransaction") Boolean useTransaction + ) + { + this.sequenceName = Preconditions.checkNotNull(sequenceName, "sequenceName"); + this.startPartitions = Preconditions.checkNotNull(startPartitions, "startPartitions"); + this.endPartitions = Preconditions.checkNotNull(endPartitions, "endPartitions"); + this.consumerProperties = Preconditions.checkNotNull(consumerProperties, "consumerProperties"); + this.useTransaction = useTransaction != null ? useTransaction : DEFAULT_USE_TRANSACTION; + + Preconditions.checkArgument( + startPartitions.getTopic().equals(endPartitions.getTopic()), + "start topic and end topic must match" + ); + + Preconditions.checkArgument( + startPartitions.getPartitionOffsetMap().keySet().equals(endPartitions.getPartitionOffsetMap().keySet()), + "start partition set and end partition set must match" + ); + + for (int partition : endPartitions.getPartitionOffsetMap().keySet()) { + Preconditions.checkArgument( + endPartitions.getPartitionOffsetMap().get(partition) >= startPartitions.getPartitionOffsetMap() + .get(partition), + "end offset must be >= start offset for partition[%d]", + partition + ); + } + } + + @JsonProperty + public String getSequenceName() + { + return sequenceName; + } + + @JsonProperty + public KafkaPartitions getStartPartitions() + { + return startPartitions; + } + + @JsonProperty + public KafkaPartitions getEndPartitions() + { + return endPartitions; + } + + @JsonProperty + public Map getConsumerProperties() + { + return consumerProperties; + } + + @JsonProperty + public boolean isUseTransaction() + { + return useTransaction; + } + + @Override + public String toString() + { + return "KafkaIOConfig{" + + "sequenceName='" + sequenceName + '\'' + + ", startPartitions=" + startPartitions + + ", endPartitions=" + endPartitions + + ", consumerProperties=" + consumerProperties + + ", useTransaction=" + useTransaction + + '}'; + } +} diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java new file mode 100644 index 000000000000..6474ca5cd241 --- /dev/null +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java @@ -0,0 +1,587 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.kafka; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.base.Supplier; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.google.common.primitives.Ints; +import com.metamx.common.ISE; +import com.metamx.common.RetryUtils; +import com.metamx.common.guava.Sequence; +import com.metamx.common.logger.Logger; +import com.metamx.common.parsers.ParseException; +import io.druid.data.input.Committer; +import io.druid.data.input.InputRow; +import io.druid.data.input.impl.InputRowParser; +import io.druid.indexing.appenderator.ActionBasedSegmentAllocator; +import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker; +import io.druid.indexing.common.TaskStatus; +import io.druid.indexing.common.TaskToolbox; +import io.druid.indexing.common.actions.SegmentInsertAction; +import io.druid.indexing.common.actions.TaskActionClient; +import io.druid.indexing.common.task.AbstractTask; +import io.druid.indexing.common.task.TaskResource; +import io.druid.query.DruidMetrics; +import io.druid.query.NoopQueryRunner; +import io.druid.query.Query; +import io.druid.query.QueryRunner; +import io.druid.segment.indexing.DataSchema; +import io.druid.segment.indexing.RealtimeIOConfig; +import io.druid.segment.realtime.FireDepartment; +import io.druid.segment.realtime.FireDepartmentMetrics; +import io.druid.segment.realtime.RealtimeMetricsMonitor; +import io.druid.segment.realtime.appenderator.Appenderator; +import io.druid.segment.realtime.appenderator.Appenderators; +import io.druid.segment.realtime.appenderator.FiniteAppenderatorDriver; +import io.druid.segment.realtime.appenderator.SegmentIdentifier; +import io.druid.segment.realtime.appenderator.SegmentsAndMetadata; +import io.druid.segment.realtime.appenderator.TransactionalSegmentPublisher; +import io.druid.timeline.DataSegment; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.Properties; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.Callable; + +public class KafkaIndexTask extends AbstractTask +{ + private static final Logger log = new Logger(KafkaIndexTask.class); + private static final String TYPE = "index_kafka"; + private static final Random RANDOM = new Random(); + private static final long POLL_TIMEOUT = 100; + private static final String METADATA_NEXT_PARTITIONS = "nextPartitions"; + + private final DataSchema dataSchema; + private final InputRowParser parser; + private final KafkaTuningConfig tuningConfig; + private final KafkaIOConfig ioConfig; + + private volatile Appenderator appenderator = null; + private volatile FireDepartmentMetrics fireDepartmentMetrics = null; + private volatile boolean startedReading = false; + private volatile boolean stopping = false; + private volatile boolean publishing = false; + private volatile Thread runThread = null; + + @JsonCreator + public KafkaIndexTask( + @JsonProperty("id") String id, + @JsonProperty("resource") TaskResource taskResource, + @JsonProperty("dataSchema") DataSchema dataSchema, + @JsonProperty("tuningConfig") KafkaTuningConfig tuningConfig, + @JsonProperty("ioConfig") KafkaIOConfig ioConfig, + @JsonProperty("context") Map context + ) + { + super( + id == null ? makeTaskId(dataSchema.getDataSource(), RANDOM.nextInt()) : id, + String.format("%s_%s", TYPE, dataSchema.getDataSource()), + taskResource, + dataSchema.getDataSource(), + context + ); + + this.dataSchema = Preconditions.checkNotNull(dataSchema, "dataSchema"); + this.parser = Preconditions.checkNotNull((InputRowParser) dataSchema.getParser(), "parser"); + this.tuningConfig = Preconditions.checkNotNull(tuningConfig, "tuningConfig"); + this.ioConfig = Preconditions.checkNotNull(ioConfig, "ioConfig"); + } + + private static String makeTaskId(String dataSource, int randomBits) + { + final StringBuilder suffix = new StringBuilder(8); + for (int i = 0; i < Ints.BYTES * 2; ++i) { + suffix.append((char) ('a' + ((randomBits >>> (i * 4)) & 0x0F))); + } + return Joiner.on("_").join(TYPE, dataSource, suffix); + } + + @Override + public String getType() + { + return TYPE; + } + + @Override + public boolean isReady(TaskActionClient taskActionClient) throws Exception + { + return true; + } + + @JsonProperty + public DataSchema getDataSchema() + { + return dataSchema; + } + + @JsonProperty + public KafkaTuningConfig getTuningConfig() + { + return tuningConfig; + } + + @JsonProperty("ioConfig") + public KafkaIOConfig getIOConfig() + { + return ioConfig; + } + + /** + * Public for tests. + */ + @JsonIgnore + public boolean hasStartedReading() + { + return startedReading; + } + + @Override + public TaskStatus run(final TaskToolbox toolbox) throws Exception + { + log.info("Starting up!"); + + runThread = Thread.currentThread(); + + // Set up FireDepartmentMetrics + final FireDepartment fireDepartmentForMetrics = new FireDepartment( + dataSchema, + new RealtimeIOConfig(null, null, null), + null + ); + fireDepartmentMetrics = fireDepartmentForMetrics.getMetrics(); + toolbox.getMonitorScheduler().addMonitor( + new RealtimeMetricsMonitor( + ImmutableList.of(fireDepartmentForMetrics), + ImmutableMap.of(DruidMetrics.TASK_ID, new String[]{getId()}) + ) + ); + + try ( + final Appenderator appenderator0 = newAppenderator(fireDepartmentMetrics, toolbox); + final FiniteAppenderatorDriver driver = newDriver(appenderator0, toolbox); + final KafkaConsumer consumer = newConsumer() + ) { + appenderator = appenderator0; + + final String topic = ioConfig.getStartPartitions().getTopic(); + + // Start up, set up initial offsets. + final Object restoredMetadata = driver.startJob(); + final Map nextOffsets = Maps.newHashMap(); + if (restoredMetadata == null) { + nextOffsets.putAll(ioConfig.getStartPartitions().getPartitionOffsetMap()); + } else { + final Map restoredMetadataMap = (Map) restoredMetadata; + final KafkaPartitions restoredNextPartitions = toolbox.getObjectMapper().convertValue( + restoredMetadataMap.get(METADATA_NEXT_PARTITIONS), + KafkaPartitions.class + ); + nextOffsets.putAll(restoredNextPartitions.getPartitionOffsetMap()); + + // Sanity checks. + if (!restoredNextPartitions.getTopic().equals(ioConfig.getStartPartitions().getTopic())) { + throw new ISE( + "WTF?! Restored topic[%s] but expected topic[%s]", + restoredNextPartitions.getTopic(), + ioConfig.getStartPartitions().getTopic() + ); + } + + if (!nextOffsets.keySet().equals(ioConfig.getStartPartitions().getPartitionOffsetMap().keySet())) { + throw new ISE( + "WTF?! Restored partitions[%s] but expected partitions[%s]", + nextOffsets.keySet(), + ioConfig.getStartPartitions().getPartitionOffsetMap().keySet() + ); + } + } + + // Set up committer. + final Supplier committerSupplier = new Supplier() + { + @Override + public Committer get() + { + final Map snapshot = ImmutableMap.copyOf(nextOffsets); + + return new Committer() + { + @Override + public Object getMetadata() + { + return ImmutableMap.of( + METADATA_NEXT_PARTITIONS, new KafkaPartitions( + ioConfig.getStartPartitions().getTopic(), + snapshot + ) + ); + } + + @Override + public void run() + { + // Do nothing. + } + }; + } + }; + + // Initialize consumer assignment. + final Set assignment = Sets.newHashSet(); + for (Map.Entry entry : nextOffsets.entrySet()) { + final long endOffset = ioConfig.getEndPartitions().getPartitionOffsetMap().get(entry.getKey()); + if (entry.getValue() < endOffset) { + assignment.add(entry.getKey()); + } else if (entry.getValue() == endOffset) { + log.info("Finished reading partition[%d].", entry.getKey()); + } else { + throw new ISE( + "WTF?! Cannot start from offset[%,d] > endOffset[%,d]", + entry.getValue(), + endOffset + ); + } + } + + assignPartitions(consumer, topic, assignment); + + // Seek to starting offsets. + for (final int partition : assignment) { + final long offset = nextOffsets.get(partition); + log.info("Seeking partition[%d] to offset[%,d].", partition, offset); + consumer.seek(new TopicPartition(topic, partition), offset); + } + + // Main loop. + // Could eventually support early termination (triggered by a supervisor) + // Could eventually support leader/follower mode (for keeping replicas more in sync) + boolean stillReading = true; + while (stillReading) { + if (stopping) { + log.info("Stopping early."); + break; + } + + // The retrying business is because the KafkaConsumer throws OffsetOutOfRangeException if the seeked-to + // offset is not present in the topic-partition. This can happen if we're asking a task to read from data + // that has not been written yet (which is totally legitimate). So let's wait for it to show up. + final ConsumerRecords records = RetryUtils.retry( + new Callable>() + { + @Override + public ConsumerRecords call() throws Exception + { + try { + return consumer.poll(POLL_TIMEOUT); + } + finally { + startedReading = true; + } + } + }, + new Predicate() + { + @Override + public boolean apply(Throwable input) + { + return input instanceof OffsetOutOfRangeException; + } + }, + Integer.MAX_VALUE + ); + + for (ConsumerRecord record : records) { + if (log.isTraceEnabled()) { + log.trace( + "Got topic[%s] partition[%d] offset[%,d].", + record.topic(), + record.partition(), + record.offset() + ); + } + + if (record.offset() < ioConfig.getEndPartitions().getPartitionOffsetMap().get(record.partition())) { + if (record.offset() != nextOffsets.get(record.partition())) { + throw new ISE( + "WTF?! Got offset[%,d] after offset[%,d] in partition[%d].", + record.offset(), + nextOffsets.get(record.partition()), + record.partition() + ); + } + + try { + final InputRow row = Preconditions.checkNotNull(parser.parse(ByteBuffer.wrap(record.value())), "row"); + final SegmentIdentifier identifier = driver.add(row, committerSupplier); + + if (identifier == null) { + // Failure to allocate segment puts determinism at risk, bail out to be safe. + // May want configurable behavior here at some point. + // If we allow continuing, then consider blacklisting the interval for a while to avoid constant checks. + throw new ISE("Could not allocate segment for row with timestamp[%s]", row.getTimestamp()); + } + + fireDepartmentMetrics.incrementProcessed(); + } + catch (ParseException e) { + if (tuningConfig.isReportParseExceptions()) { + throw e; + } else { + log.debug( + e, + "Dropping unparseable row from partition[%d] offset[%,d].", + record.partition(), + record.offset() + ); + + fireDepartmentMetrics.incrementUnparseable(); + } + } + + final long nextOffset = record.offset() + 1; + final long endOffset = ioConfig.getEndPartitions().getPartitionOffsetMap().get(record.partition()); + + nextOffsets.put(record.partition(), nextOffset); + + if (nextOffset == endOffset && assignment.remove(record.partition())) { + log.info("Finished reading topic[%s], partition[%,d].", record.topic(), record.partition()); + assignPartitions(consumer, topic, assignment); + stillReading = !assignment.isEmpty(); + } + } + } + } + + // Persist pending data. + final Committer finalCommitter = committerSupplier.get(); + driver.persist(finalCommitter); + + publishing = true; + if (stopping) { + // Stopped gracefully. Exit code shouldn't matter, so fail to be on the safe side. + return TaskStatus.failure(getId()); + } + + final TransactionalSegmentPublisher publisher = new TransactionalSegmentPublisher() + { + @Override + public boolean publishSegments(Set segments, Object commitMetadata) throws IOException + { + // Sanity check, we should only be publishing things that match our desired end state. + if (!ioConfig.getEndPartitions().equals(((Map) commitMetadata).get(METADATA_NEXT_PARTITIONS))) { + throw new ISE("WTF?! Driver attempted to publish invalid metadata[%s].", commitMetadata); + } + + final SegmentInsertAction action; + + if (ioConfig.isUseTransaction()) { + action = new SegmentInsertAction( + segments, + new KafkaDataSourceMetadata(ioConfig.getStartPartitions()), + new KafkaDataSourceMetadata(ioConfig.getEndPartitions()) + ); + } else { + action = new SegmentInsertAction(segments, null, null); + } + + log.info("Publishing with isTransaction[%s].", ioConfig.isUseTransaction()); + + return toolbox.getTaskActionClient().submit(action).isSuccess(); + } + }; + + final SegmentsAndMetadata published = driver.finish(publisher, committerSupplier.get()); + if (published == null) { + throw new ISE("Transaction failure publishing segments, aborting"); + } else { + log.info( + "Published segments[%s] with metadata[%s].", + Joiner.on(", ").join( + Iterables.transform( + published.getSegments(), + new Function() + { + @Override + public String apply(DataSegment input) + { + return input.getIdentifier(); + } + } + ) + ), + published.getCommitMetadata() + ); + } + } + + return success(); + } + + @Override + public boolean canRestore() + { + return true; + } + + @Override + public void stopGracefully() + { + log.info("Stopping gracefully."); + + stopping = true; + if (publishing && runThread.isAlive()) { + log.info("stopGracefully: Run thread started publishing, interrupting it."); + runThread.interrupt(); + } + } + + @Override + public QueryRunner getQueryRunner(Query query) + { + if (appenderator == null) { + // Not yet initialized, no data yet, just return a noop runner. + return new NoopQueryRunner<>(); + } + + return new QueryRunner() + { + @Override + public Sequence run(final Query query, final Map responseContext) + { + return query.run(appenderator, responseContext); + } + }; + } + + @VisibleForTesting + public FireDepartmentMetrics getFireDepartmentMetrics() + { + return fireDepartmentMetrics; + } + + private Appenderator newAppenderator(FireDepartmentMetrics metrics, TaskToolbox toolbox) + { + return Appenderators.createRealtime( + dataSchema, + tuningConfig.withBasePersistDirectory(new File(toolbox.getTaskWorkDir(), "persist")), + metrics, + toolbox.getSegmentPusher(), + toolbox.getObjectMapper(), + toolbox.getIndexIO(), + tuningConfig.getBuildV9Directly() ? toolbox.getIndexMergerV9() : toolbox.getIndexMerger(), + toolbox.getQueryRunnerFactoryConglomerate(), + toolbox.getSegmentAnnouncer(), + toolbox.getEmitter(), + toolbox.getQueryExecutorService(), + toolbox.getCache(), + toolbox.getCacheConfig() + ); + } + + private FiniteAppenderatorDriver newDriver( + final Appenderator appenderator, + final TaskToolbox toolbox + ) + { + return new FiniteAppenderatorDriver( + appenderator, + new ActionBasedSegmentAllocator( + toolbox.getTaskActionClient(), + dataSchema, + ioConfig.getSequenceName() + ), + toolbox.getSegmentHandoffNotifierFactory(), + new ActionBasedUsedSegmentChecker(toolbox.getTaskActionClient()), + toolbox.getObjectMapper(), + tuningConfig.getMaxRowsPerSegment(), + tuningConfig.getHandoffConditionTimeout() + ); + } + + private KafkaConsumer newConsumer() + { + ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader(); + try { + Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); + + final Properties props = new Properties(); + + for (Map.Entry entry : ioConfig.getConsumerProperties().entrySet()) { + props.setProperty(entry.getKey(), entry.getValue()); + } + + props.setProperty("enable.auto.commit", "false"); + props.setProperty("auto.offset.reset", "none"); + props.setProperty("key.deserializer", ByteArrayDeserializer.class.getName()); + props.setProperty("value.deserializer", ByteArrayDeserializer.class.getName()); + + return new KafkaConsumer<>(props); + } + finally { + Thread.currentThread().setContextClassLoader(currCtxCl); + } + } + + private static void assignPartitions( + final KafkaConsumer consumer, + final String topic, + final Set partitions + ) + { + consumer.assign( + Lists.newArrayList( + Iterables.transform( + partitions, + new Function() + { + @Override + public TopicPartition apply(Integer n) + { + return new TopicPartition(topic, n); + } + } + ) + ) + ); + } +} diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTaskModule.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTaskModule.java new file mode 100644 index 000000000000..cce67287be50 --- /dev/null +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTaskModule.java @@ -0,0 +1,51 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.kafka; + +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.common.collect.ImmutableList; +import com.google.inject.Binder; +import io.druid.initialization.DruidModule; + +import java.util.List; + +public class KafkaIndexTaskModule implements DruidModule +{ + @Override + public List getJacksonModules() + { + return ImmutableList.of( + new SimpleModule(getClass().getSimpleName()) + .registerSubtypes( + new NamedType(KafkaIndexTask.class, "index_kafka"), + new NamedType(KafkaDataSourceMetadata.class, "kafka"), + new NamedType(KafkaIOConfig.class, "kafka"), + new NamedType(KafkaTuningConfig.class, "kafka") + ) + ); + } + + @Override + public void configure(Binder binder) + { + } +} diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaPartitions.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaPartitions.java new file mode 100644 index 000000000000..f0d7370bfc8f --- /dev/null +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaPartitions.java @@ -0,0 +1,97 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.kafka; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; + +import java.util.Map; +import java.util.Objects; + +public class KafkaPartitions +{ + private final String topic; + private final Map partitionOffsetMap; + + @JsonCreator + public KafkaPartitions( + @JsonProperty("topic") final String topic, + @JsonProperty("partitionOffsetMap") final Map partitionOffsetMap + ) + { + this.topic = topic; + this.partitionOffsetMap = ImmutableMap.copyOf(partitionOffsetMap); + + // Validate partitionOffsetMap + for (Map.Entry entry : partitionOffsetMap.entrySet()) { + Preconditions.checkArgument( + entry.getValue() >= 0, + String.format( + "partition[%d] offset[%d] invalid", + entry.getKey(), + entry.getValue() + ) + ); + } + } + + @JsonProperty + public String getTopic() + { + return topic; + } + + @JsonProperty + public Map getPartitionOffsetMap() + { + return partitionOffsetMap; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + KafkaPartitions that = (KafkaPartitions) o; + return Objects.equals(topic, that.topic) && + Objects.equals(partitionOffsetMap, that.partitionOffsetMap); + } + + @Override + public int hashCode() + { + return Objects.hash(topic, partitionOffsetMap); + } + + @Override + public String toString() + { + return "KafkaPartitions{" + + "topic='" + topic + '\'' + + ", partitionOffsetMap=" + partitionOffsetMap + + '}'; + } +} diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaTuningConfig.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaTuningConfig.java new file mode 100644 index 000000000000..374b2dec909c --- /dev/null +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaTuningConfig.java @@ -0,0 +1,147 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.kafka; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.druid.segment.IndexSpec; +import io.druid.segment.indexing.RealtimeTuningConfig; +import io.druid.segment.indexing.TuningConfig; +import io.druid.segment.realtime.appenderator.AppenderatorConfig; +import org.joda.time.Period; + +import java.io.File; + +public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig +{ + private static final int DEFAULT_MAX_ROWS_PER_SEGMENT = 5_000_000; + + private final int maxRowsInMemory; + private final int maxRowsPerSegment; + private final Period intermediatePersistPeriod; + private final File basePersistDirectory; + private final int maxPendingPersists; + private final IndexSpec indexSpec; + private final boolean buildV9Directly; + private final boolean reportParseExceptions; + private final long handoffConditionTimeout; + + @JsonCreator + public KafkaTuningConfig( + @JsonProperty("maxRowsInMemory") Integer maxRowsInMemory, + @JsonProperty("maxRowsPerSegment") Integer maxRowsPerSegment, + @JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod, + @JsonProperty("basePersistDirectory") File basePersistDirectory, + @JsonProperty("maxPendingPersists") Integer maxPendingPersists, + @JsonProperty("indexSpec") IndexSpec indexSpec, + @JsonProperty("buildV9Directly") Boolean buildV9Directly, + @JsonProperty("reportParseExceptions") Boolean reportParseExceptions, + @JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout + ) + { + // Cannot be a static because default basePersistDirectory is unique per-instance + final RealtimeTuningConfig defaults = RealtimeTuningConfig.makeDefaultTuningConfig(basePersistDirectory); + + this.maxRowsInMemory = maxRowsInMemory == null ? defaults.getMaxRowsInMemory() : maxRowsInMemory; + this.maxRowsPerSegment = maxRowsPerSegment == null ? DEFAULT_MAX_ROWS_PER_SEGMENT : maxRowsPerSegment; + this.intermediatePersistPeriod = intermediatePersistPeriod == null + ? defaults.getIntermediatePersistPeriod() + : intermediatePersistPeriod; + this.basePersistDirectory = defaults.getBasePersistDirectory(); + this.maxPendingPersists = maxPendingPersists == null ? defaults.getMaxPendingPersists() : maxPendingPersists; + this.indexSpec = indexSpec == null ? defaults.getIndexSpec() : indexSpec; + this.buildV9Directly = buildV9Directly == null ? defaults.getBuildV9Directly() : buildV9Directly; + this.reportParseExceptions = reportParseExceptions == null + ? defaults.isReportParseExceptions() + : reportParseExceptions; + this.handoffConditionTimeout = handoffConditionTimeout == null + ? defaults.getHandoffConditionTimeout() + : handoffConditionTimeout; + } + + @JsonProperty + public int getMaxRowsInMemory() + { + return maxRowsInMemory; + } + + @JsonProperty + public int getMaxRowsPerSegment() + { + return maxRowsPerSegment; + } + + @JsonProperty + public Period getIntermediatePersistPeriod() + { + return intermediatePersistPeriod; + } + + @JsonProperty + public File getBasePersistDirectory() + { + return basePersistDirectory; + } + + @JsonProperty + public int getMaxPendingPersists() + { + return maxPendingPersists; + } + + @JsonProperty + public IndexSpec getIndexSpec() + { + return indexSpec; + } + + @JsonProperty + public boolean getBuildV9Directly() + { + return buildV9Directly; + } + + @JsonProperty + public boolean isReportParseExceptions() + { + return reportParseExceptions; + } + + @JsonProperty + public long getHandoffConditionTimeout() + { + return handoffConditionTimeout; + } + + public KafkaTuningConfig withBasePersistDirectory(File dir) + { + return new KafkaTuningConfig( + maxRowsInMemory, + maxRowsPerSegment, + intermediatePersistPeriod, + dir, + maxPendingPersists, + indexSpec, + buildV9Directly, + reportParseExceptions, + handoffConditionTimeout + ); + } +} diff --git a/extensions-core/kafka-indexing-service/src/main/resources/META-INF/services/io.druid.initialization.DruidModule b/extensions-core/kafka-indexing-service/src/main/resources/META-INF/services/io.druid.initialization.DruidModule new file mode 100644 index 000000000000..16aec94a8dfe --- /dev/null +++ b/extensions-core/kafka-indexing-service/src/main/resources/META-INF/services/io.druid.initialization.DruidModule @@ -0,0 +1 @@ +io.druid.indexing.kafka.KafkaIndexTaskModule diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaDataSourceMetadataTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaDataSourceMetadataTest.java new file mode 100644 index 000000000000..32e9b08022c1 --- /dev/null +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaDataSourceMetadataTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.kafka; + +import com.google.common.collect.ImmutableMap; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Map; + +public class KafkaDataSourceMetadataTest +{ + private static final KafkaDataSourceMetadata KM0 = KM("foo", ImmutableMap.of()); + private static final KafkaDataSourceMetadata KM1 = KM("foo", ImmutableMap.of(0, 2L, 1, 3L)); + private static final KafkaDataSourceMetadata KM2 = KM("foo", ImmutableMap.of(0, 2L, 1, 4L, 2, 5L)); + private static final KafkaDataSourceMetadata KM3 = KM("foo", ImmutableMap.of(0, 2L, 2, 5L)); + + @Test + public void testMatches() + { + Assert.assertTrue(KM0.matches(KM0)); + Assert.assertTrue(KM0.matches(KM1)); + Assert.assertTrue(KM0.matches(KM2)); + Assert.assertTrue(KM0.matches(KM3)); + + Assert.assertTrue(KM1.matches(KM0)); + Assert.assertTrue(KM1.matches(KM1)); + Assert.assertFalse(KM1.matches(KM2)); + Assert.assertTrue(KM1.matches(KM3)); + + Assert.assertTrue(KM2.matches(KM0)); + Assert.assertFalse(KM2.matches(KM1)); + Assert.assertTrue(KM2.matches(KM2)); + Assert.assertTrue(KM2.matches(KM3)); + + Assert.assertTrue(KM3.matches(KM0)); + Assert.assertTrue(KM3.matches(KM1)); + Assert.assertTrue(KM3.matches(KM2)); + Assert.assertTrue(KM3.matches(KM3)); + } + + @Test + public void testIsValidStart() + { + Assert.assertTrue(KM0.isValidStart()); + Assert.assertTrue(KM1.isValidStart()); + Assert.assertTrue(KM2.isValidStart()); + Assert.assertTrue(KM3.isValidStart()); + } + + @Test + public void testPlus() + { + Assert.assertEquals( + KM("foo", ImmutableMap.of(0, 2L, 1, 3L, 2, 5L)), + KM1.plus(KM3) + ); + + Assert.assertEquals( + KM("foo", ImmutableMap.of(0, 2L, 1, 4L, 2, 5L)), + KM0.plus(KM2) + ); + + Assert.assertEquals( + KM("foo", ImmutableMap.of(0, 2L, 1, 4L, 2, 5L)), + KM1.plus(KM2) + ); + + Assert.assertEquals( + KM("foo", ImmutableMap.of(0, 2L, 1, 3L, 2, 5L)), + KM2.plus(KM1) + ); + + Assert.assertEquals( + KM("foo", ImmutableMap.of(0, 2L, 1, 4L, 2, 5L)), + KM2.plus(KM2) + ); + } + + private static KafkaDataSourceMetadata KM(String topic, Map offsets) + { + return new KafkaDataSourceMetadata(new KafkaPartitions(topic, offsets)); + } +} diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java new file mode 100644 index 000000000000..70911cc3dd73 --- /dev/null +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -0,0 +1,1244 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.kafka; + +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Charsets; +import com.google.common.base.Function; +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; +import com.google.common.base.Throwables; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.io.Files; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.metamx.common.CompressionUtils; +import com.metamx.common.Granularity; +import com.metamx.common.ISE; +import com.metamx.common.guava.Sequences; +import com.metamx.common.logger.Logger; +import com.metamx.emitter.EmittingLogger; +import com.metamx.emitter.core.LoggingEmitter; +import com.metamx.emitter.service.ServiceEmitter; +import com.metamx.metrics.MonitorScheduler; +import io.druid.client.cache.CacheConfig; +import io.druid.client.cache.MapCache; +import io.druid.concurrent.Execs; +import io.druid.data.input.impl.DimensionsSpec; +import io.druid.data.input.impl.JSONParseSpec; +import io.druid.data.input.impl.JSONPathFieldSpec; +import io.druid.data.input.impl.JSONPathSpec; +import io.druid.data.input.impl.StringInputRowParser; +import io.druid.data.input.impl.TimestampSpec; +import io.druid.granularity.QueryGranularity; +import io.druid.indexing.common.SegmentLoaderFactory; +import io.druid.indexing.common.TaskLock; +import io.druid.indexing.common.TaskStatus; +import io.druid.indexing.common.TaskToolbox; +import io.druid.indexing.common.TaskToolboxFactory; +import io.druid.indexing.common.TestUtils; +import io.druid.indexing.common.actions.LocalTaskActionClientFactory; +import io.druid.indexing.common.actions.TaskActionClientFactory; +import io.druid.indexing.common.actions.TaskActionToolbox; +import io.druid.indexing.common.config.TaskConfig; +import io.druid.indexing.common.config.TaskStorageConfig; +import io.druid.indexing.common.task.Task; +import io.druid.indexing.kafka.test.TestBroker; +import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator; +import io.druid.indexing.overlord.MetadataTaskStorage; +import io.druid.indexing.overlord.TaskLockbox; +import io.druid.indexing.overlord.TaskStorage; +import io.druid.indexing.test.TestDataSegmentAnnouncer; +import io.druid.indexing.test.TestDataSegmentKiller; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.metadata.EntryExistsException; +import io.druid.metadata.IndexerSQLMetadataStorageCoordinator; +import io.druid.metadata.SQLMetadataStorageActionHandlerFactory; +import io.druid.metadata.TestDerbyConnector; +import io.druid.query.DefaultQueryRunnerFactoryConglomerate; +import io.druid.query.Druids; +import io.druid.query.IntervalChunkingQueryRunnerDecorator; +import io.druid.query.Query; +import io.druid.query.QueryRunner; +import io.druid.query.QueryRunnerFactory; +import io.druid.query.QueryRunnerFactoryConglomerate; +import io.druid.query.QueryToolChest; +import io.druid.query.QueryWatcher; +import io.druid.query.Result; +import io.druid.query.SegmentDescriptor; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.CountAggregatorFactory; +import io.druid.query.aggregation.LongSumAggregatorFactory; +import io.druid.query.timeseries.TimeseriesQuery; +import io.druid.query.timeseries.TimeseriesQueryEngine; +import io.druid.query.timeseries.TimeseriesQueryQueryToolChest; +import io.druid.query.timeseries.TimeseriesQueryRunnerFactory; +import io.druid.query.timeseries.TimeseriesResultValue; +import io.druid.segment.IndexIO; +import io.druid.segment.QueryableIndex; +import io.druid.segment.column.DictionaryEncodedColumn; +import io.druid.segment.indexing.DataSchema; +import io.druid.segment.indexing.granularity.UniformGranularitySpec; +import io.druid.segment.loading.DataSegmentPusher; +import io.druid.segment.loading.LocalDataSegmentPusher; +import io.druid.segment.loading.LocalDataSegmentPusherConfig; +import io.druid.segment.loading.SegmentLoaderConfig; +import io.druid.segment.loading.SegmentLoaderLocalCacheManager; +import io.druid.segment.loading.StorageLocationConfig; +import io.druid.segment.realtime.plumber.SegmentHandoffNotifier; +import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; +import io.druid.timeline.DataSegment; +import org.apache.curator.test.TestingCluster; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.easymock.EasyMock; +import org.joda.time.Interval; +import org.joda.time.Period; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +@RunWith(Parameterized.class) +public class KafkaIndexTaskTest +{ + private final boolean buildV9Directly; + private long handoffConditionTimeout = 0; + private boolean reportParseExceptions = false; + private boolean doHandoff = true; + + private TestingCluster zkServer; + private TestBroker kafkaServer; + private ServiceEmitter emitter; + private ListeningExecutorService taskExec; + private TaskToolboxFactory toolboxFactory; + private IndexerMetadataStorageCoordinator metadataStorageCoordinator; + private TaskStorage taskStorage; + private TaskLockbox taskLockbox; + private File directory; + + private final List runningTasks = Lists.newArrayList(); + + private static final Logger log = new Logger(KafkaIndexTaskTest.class); + + private static final DataSchema DATA_SCHEMA; + + private static final List> RECORDS = ImmutableList.of( + new ProducerRecord("topic0", 0, null, JB("2008", "a", "y", 1.0f)), + new ProducerRecord("topic0", 0, null, JB("2009", "b", "y", 1.0f)), + new ProducerRecord("topic0", 0, null, JB("2010", "c", "y", 1.0f)), + new ProducerRecord("topic0", 0, null, JB("2011", "d", "y", 1.0f)), + new ProducerRecord("topic0", 0, null, JB("2011", "e", "y", 1.0f)), + new ProducerRecord("topic0", 0, null, "unparseable".getBytes()), + new ProducerRecord("topic0", 0, null, JB("2013", "f", "y", 1.0f)), + new ProducerRecord("topic0", 1, null, JB("2012", "g", "y", 1.0f)) + ); + + static { + ObjectMapper objectMapper = new DefaultObjectMapper(); + DATA_SCHEMA = new DataSchema( + "test_ds", + objectMapper.convertValue( + new StringInputRowParser( + new JSONParseSpec( + new TimestampSpec("timestamp", "iso", null), + new DimensionsSpec( + ImmutableList.of("dim1", "dim2"), + null, + null + ), + new JSONPathSpec(true, ImmutableList.of()), + ImmutableMap.of() + ), + Charsets.UTF_8.name() + ), + Map.class + ), + new AggregatorFactory[]{new CountAggregatorFactory("rows")}, + new UniformGranularitySpec(Granularity.DAY, QueryGranularity.NONE, null), + objectMapper + ); + } + + @Parameterized.Parameters(name = "buildV9Directly = {0}") + public static Iterable constructorFeeder() + { + return ImmutableList.of(new Object[]{true}, new Object[]{false}); + } + + public KafkaIndexTaskTest(boolean buildV9Directly) + { + this.buildV9Directly = buildV9Directly; + } + + @Rule + public final TemporaryFolder tempFolder = new TemporaryFolder(); + + @Rule + public final TestDerbyConnector.DerbyConnectorRule derby = new TestDerbyConnector.DerbyConnectorRule(); + + @Before + public void setUp() throws Exception + { + emitter = new ServiceEmitter( + "service", + "host", + new LoggingEmitter( + log, + LoggingEmitter.Level.ERROR, + new DefaultObjectMapper() + ) + ); + emitter.start(); + EmittingLogger.registerEmitter(emitter); + + makeToolboxFactory(); + + zkServer = new TestingCluster(1); + zkServer.start(); + + kafkaServer = new TestBroker( + zkServer.getConnectString(), + tempFolder.newFolder(), + 1, + ImmutableMap.of("num.partitions", "2") + ); + kafkaServer.start(); + + taskExec = MoreExecutors.listeningDecorator( + Executors.newCachedThreadPool( + Execs.makeThreadFactory("kafka-task-test-%d") + ) + ); + + handoffConditionTimeout = 0; + reportParseExceptions = false; + doHandoff = true; + } + + @After + public void tearDown() throws Exception + { + emitter.close(); + + synchronized (runningTasks) { + for (Task task : runningTasks) { + task.stopGracefully(); + } + + runningTasks.clear(); + } + + taskExec.shutdown(); + taskExec.awaitTermination(9999, TimeUnit.DAYS); + + kafkaServer.close(); + kafkaServer = null; + + zkServer.stop(); + zkServer = null; + + destroyToolboxFactory(); + } + + @Test(timeout = 60_000L) + public void testRunAfterDataInserted() throws Exception + { + // Insert data + try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { + for (ProducerRecord record : RECORDS) { + kafkaProducer.send(record).get(); + } + } + + final KafkaIndexTask task = createTask( + null, + new KafkaIOConfig( + "sequence0", + new KafkaPartitions("topic0", ImmutableMap.of(0, 2L)), + new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)), + kafkaServer.consumerProperties(), + true + ), + null + ); + + final ListenableFuture future = runTask(task); + + // Wait for task to exit + Assert.assertEquals(TaskStatus.Status.SUCCESS, future.get().getStatusCode()); + + // Check metrics + Assert.assertEquals(3, task.getFireDepartmentMetrics().processed()); + Assert.assertEquals(0, task.getFireDepartmentMetrics().unparseable()); + Assert.assertEquals(0, task.getFireDepartmentMetrics().thrownAway()); + + // Check published metadata + SegmentDescriptor desc1 = SD(task, "2010/P1D", 0); + SegmentDescriptor desc2 = SD(task, "2011/P1D", 0); + Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors()); + Assert.assertEquals( + new KafkaDataSourceMetadata(new KafkaPartitions("topic0", ImmutableMap.of(0, 5L))), + metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource()) + ); + + // Check segments in deep storage + Assert.assertEquals(ImmutableList.of("c"), readSegmentDim1(desc1)); + Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentDim1(desc2)); + } + + @Test(timeout = 60_000L) + public void testRunBeforeDataInserted() throws Exception + { + final KafkaIndexTask task = createTask( + null, + new KafkaIOConfig( + "sequence0", + new KafkaPartitions("topic0", ImmutableMap.of(0, 2L)), + new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)), + kafkaServer.consumerProperties(), + true + ), + null + ); + + final ListenableFuture future = runTask(task); + + // Wait for the task to start reading + while (!task.hasStartedReading()) { + Thread.sleep(10); + } + + // Insert data + try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { + for (ProducerRecord record : RECORDS) { + kafkaProducer.send(record).get(); + } + } + + // Wait for task to exit + Assert.assertEquals(TaskStatus.Status.SUCCESS, future.get().getStatusCode()); + + // Check metrics + Assert.assertEquals(3, task.getFireDepartmentMetrics().processed()); + Assert.assertEquals(0, task.getFireDepartmentMetrics().unparseable()); + Assert.assertEquals(0, task.getFireDepartmentMetrics().thrownAway()); + + // Check published metadata + SegmentDescriptor desc1 = SD(task, "2010/P1D", 0); + SegmentDescriptor desc2 = SD(task, "2011/P1D", 0); + Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors()); + Assert.assertEquals( + new KafkaDataSourceMetadata(new KafkaPartitions("topic0", ImmutableMap.of(0, 5L))), + metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource()) + ); + + // Check segments in deep storage + Assert.assertEquals(ImmutableList.of("c"), readSegmentDim1(desc1)); + Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentDim1(desc2)); + } + + @Test(timeout = 60_000L) + public void testHandoffConditionTimeoutWhenHandoffOccurs() throws Exception + { + handoffConditionTimeout = 5_000; + + // Insert data + try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { + for (ProducerRecord record : RECORDS) { + kafkaProducer.send(record).get(); + } + } + + final KafkaIndexTask task = createTask( + null, + new KafkaIOConfig( + "sequence0", + new KafkaPartitions("topic0", ImmutableMap.of(0, 2L)), + new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)), + kafkaServer.consumerProperties(), + true + ), + null + ); + + final ListenableFuture future = runTask(task); + + // Wait for task to exit + Assert.assertEquals(TaskStatus.Status.SUCCESS, future.get().getStatusCode()); + + // Check metrics + Assert.assertEquals(3, task.getFireDepartmentMetrics().processed()); + Assert.assertEquals(0, task.getFireDepartmentMetrics().unparseable()); + Assert.assertEquals(0, task.getFireDepartmentMetrics().thrownAway()); + + // Check published metadata + SegmentDescriptor desc1 = SD(task, "2010/P1D", 0); + SegmentDescriptor desc2 = SD(task, "2011/P1D", 0); + Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors()); + Assert.assertEquals( + new KafkaDataSourceMetadata(new KafkaPartitions("topic0", ImmutableMap.of(0, 5L))), + metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource()) + ); + + // Check segments in deep storage + Assert.assertEquals(ImmutableList.of("c"), readSegmentDim1(desc1)); + Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentDim1(desc2)); + } + + @Test(timeout = 60_000L) + public void testHandoffConditionTimeoutWhenHandoffDoesNotOccur() throws Exception + { + doHandoff = false; + handoffConditionTimeout = 100; + + // Insert data + try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { + for (ProducerRecord record : RECORDS) { + kafkaProducer.send(record).get(); + } + } + + final KafkaIndexTask task = createTask( + null, + new KafkaIOConfig( + "sequence0", + new KafkaPartitions("topic0", ImmutableMap.of(0, 2L)), + new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)), + kafkaServer.consumerProperties(), + true + ), + null + ); + + final ListenableFuture future = runTask(task); + + // Wait for task to exit + Assert.assertEquals(TaskStatus.Status.FAILED, future.get().getStatusCode()); + + // Check metrics + Assert.assertEquals(3, task.getFireDepartmentMetrics().processed()); + Assert.assertEquals(0, task.getFireDepartmentMetrics().unparseable()); + Assert.assertEquals(0, task.getFireDepartmentMetrics().thrownAway()); + + // Check published metadata + SegmentDescriptor desc1 = SD(task, "2010/P1D", 0); + SegmentDescriptor desc2 = SD(task, "2011/P1D", 0); + Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors()); + Assert.assertEquals( + new KafkaDataSourceMetadata(new KafkaPartitions("topic0", ImmutableMap.of(0, 5L))), + metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource()) + ); + + // Check segments in deep storage + Assert.assertEquals(ImmutableList.of("c"), readSegmentDim1(desc1)); + Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentDim1(desc2)); + } + + @Test(timeout = 60_000L) + public void testReportParseExceptions() throws Exception + { + reportParseExceptions = true; + + // Insert data + try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { + for (ProducerRecord record : RECORDS) { + kafkaProducer.send(record).get(); + } + } + + final KafkaIndexTask task = createTask( + null, + new KafkaIOConfig( + "sequence0", + new KafkaPartitions("topic0", ImmutableMap.of(0, 2L)), + new KafkaPartitions("topic0", ImmutableMap.of(0, 7L)), + kafkaServer.consumerProperties(), + true + ), + null + ); + + final ListenableFuture future = runTask(task); + + // Wait for task to exit + Assert.assertEquals(TaskStatus.Status.FAILED, future.get().getStatusCode()); + + // Check metrics + Assert.assertEquals(3, task.getFireDepartmentMetrics().processed()); + Assert.assertEquals(0, task.getFireDepartmentMetrics().unparseable()); + Assert.assertEquals(0, task.getFireDepartmentMetrics().thrownAway()); + + // Check published metadata + Assert.assertEquals(ImmutableSet.of(), publishedDescriptors()); + Assert.assertNull(metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource())); + } + + @Test(timeout = 60_000L) + public void testRunReplicas() throws Exception + { + final KafkaIndexTask task1 = createTask( + null, + new KafkaIOConfig( + "sequence0", + new KafkaPartitions("topic0", ImmutableMap.of(0, 2L)), + new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)), + kafkaServer.consumerProperties(), + true + ), + null + ); + final KafkaIndexTask task2 = createTask( + null, + new KafkaIOConfig( + "sequence0", + new KafkaPartitions("topic0", ImmutableMap.of(0, 2L)), + new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)), + kafkaServer.consumerProperties(), + true + ), + null + ); + + final ListenableFuture future1 = runTask(task1); + final ListenableFuture future2 = runTask(task2); + + // Insert data + try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { + for (ProducerRecord record : RECORDS) { + kafkaProducer.send(record).get(); + } + } + + // Wait for tasks to exit + Assert.assertEquals(TaskStatus.Status.SUCCESS, future1.get().getStatusCode()); + Assert.assertEquals(TaskStatus.Status.SUCCESS, future2.get().getStatusCode()); + + // Check metrics + Assert.assertEquals(3, task1.getFireDepartmentMetrics().processed()); + Assert.assertEquals(0, task1.getFireDepartmentMetrics().unparseable()); + Assert.assertEquals(0, task1.getFireDepartmentMetrics().thrownAway()); + Assert.assertEquals(3, task2.getFireDepartmentMetrics().processed()); + Assert.assertEquals(0, task2.getFireDepartmentMetrics().unparseable()); + Assert.assertEquals(0, task2.getFireDepartmentMetrics().thrownAway()); + + // Check published segments & metadata + SegmentDescriptor desc1 = SD(task1, "2010/P1D", 0); + SegmentDescriptor desc2 = SD(task1, "2011/P1D", 0); + Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors()); + Assert.assertEquals( + new KafkaDataSourceMetadata(new KafkaPartitions("topic0", ImmutableMap.of(0, 5L))), + metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource()) + ); + + // Check segments in deep storage + Assert.assertEquals(ImmutableList.of("c"), readSegmentDim1(desc1)); + Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentDim1(desc2)); + } + + @Test(timeout = 60_000L) + public void testRunConflicting() throws Exception + { + final KafkaIndexTask task1 = createTask( + null, + new KafkaIOConfig( + "sequence0", + new KafkaPartitions("topic0", ImmutableMap.of(0, 2L)), + new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)), + kafkaServer.consumerProperties(), + true + ), + null + ); + final KafkaIndexTask task2 = createTask( + null, + new KafkaIOConfig( + "sequence1", + new KafkaPartitions("topic0", ImmutableMap.of(0, 3L)), + new KafkaPartitions("topic0", ImmutableMap.of(0, 7L)), + kafkaServer.consumerProperties(), + true + ), + null + ); + + // Insert data + try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { + for (ProducerRecord record : RECORDS) { + kafkaProducer.send(record).get(); + } + } + + // Run first task + final ListenableFuture future1 = runTask(task1); + Assert.assertEquals(TaskStatus.Status.SUCCESS, future1.get().getStatusCode()); + + // Run second task + final ListenableFuture future2 = runTask(task2); + Assert.assertEquals(TaskStatus.Status.FAILED, future2.get().getStatusCode()); + + // Check metrics + Assert.assertEquals(3, task1.getFireDepartmentMetrics().processed()); + Assert.assertEquals(0, task1.getFireDepartmentMetrics().unparseable()); + Assert.assertEquals(0, task1.getFireDepartmentMetrics().thrownAway()); + Assert.assertEquals(3, task2.getFireDepartmentMetrics().processed()); + Assert.assertEquals(1, task2.getFireDepartmentMetrics().unparseable()); + Assert.assertEquals(0, task2.getFireDepartmentMetrics().thrownAway()); + + // Check published segments & metadata, should all be from the first task + SegmentDescriptor desc1 = SD(task1, "2010/P1D", 0); + SegmentDescriptor desc2 = SD(task1, "2011/P1D", 0); + Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors()); + Assert.assertEquals( + new KafkaDataSourceMetadata(new KafkaPartitions("topic0", ImmutableMap.of(0, 5L))), + metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource()) + ); + + // Check segments in deep storage + Assert.assertEquals(ImmutableList.of("c"), readSegmentDim1(desc1)); + Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentDim1(desc2)); + } + + @Test(timeout = 60_000L) + public void testRunConflictingWithoutTransactions() throws Exception + { + final KafkaIndexTask task1 = createTask( + null, + new KafkaIOConfig( + "sequence0", + new KafkaPartitions("topic0", ImmutableMap.of(0, 2L)), + new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)), + kafkaServer.consumerProperties(), + false + ), + null + ); + final KafkaIndexTask task2 = createTask( + null, + new KafkaIOConfig( + "sequence1", + new KafkaPartitions("topic0", ImmutableMap.of(0, 3L)), + new KafkaPartitions("topic0", ImmutableMap.of(0, 7L)), + kafkaServer.consumerProperties(), + false + ), + null + ); + + // Insert data + try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { + for (ProducerRecord record : RECORDS) { + kafkaProducer.send(record).get(); + } + } + + // Run first task + final ListenableFuture future1 = runTask(task1); + Assert.assertEquals(TaskStatus.Status.SUCCESS, future1.get().getStatusCode()); + + // Check published segments & metadata + SegmentDescriptor desc1 = SD(task1, "2010/P1D", 0); + SegmentDescriptor desc2 = SD(task1, "2011/P1D", 0); + Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors()); + Assert.assertNull(metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource())); + + // Run second task + final ListenableFuture future2 = runTask(task2); + Assert.assertEquals(TaskStatus.Status.SUCCESS, future2.get().getStatusCode()); + + // Check metrics + Assert.assertEquals(3, task1.getFireDepartmentMetrics().processed()); + Assert.assertEquals(0, task1.getFireDepartmentMetrics().unparseable()); + Assert.assertEquals(0, task1.getFireDepartmentMetrics().thrownAway()); + Assert.assertEquals(3, task2.getFireDepartmentMetrics().processed()); + Assert.assertEquals(1, task2.getFireDepartmentMetrics().unparseable()); + Assert.assertEquals(0, task2.getFireDepartmentMetrics().thrownAway()); + + // Check published segments & metadata + SegmentDescriptor desc3 = SD(task2, "2011/P1D", 1); + SegmentDescriptor desc4 = SD(task2, "2013/P1D", 0); + Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3, desc4), publishedDescriptors()); + Assert.assertNull(metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource())); + + // Check segments in deep storage + Assert.assertEquals(ImmutableList.of("c"), readSegmentDim1(desc1)); + Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentDim1(desc2)); + Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentDim1(desc3)); + Assert.assertEquals(ImmutableList.of("f"), readSegmentDim1(desc4)); + } + + @Test(timeout = 60_000L) + public void testRunOneTaskTwoPartitions() throws Exception + { + final KafkaIndexTask task = createTask( + null, + new KafkaIOConfig( + "sequence0", + new KafkaPartitions("topic0", ImmutableMap.of(0, 2L, 1, 0L)), + new KafkaPartitions("topic0", ImmutableMap.of(0, 5L, 1, 1L)), + kafkaServer.consumerProperties(), + true + ), + null + ); + + final ListenableFuture future = runTask(task); + + // Insert data + try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { + for (ProducerRecord record : RECORDS) { + kafkaProducer.send(record).get(); + } + } + + // Wait for tasks to exit + Assert.assertEquals(TaskStatus.Status.SUCCESS, future.get().getStatusCode()); + + // Check metrics + Assert.assertEquals(4, task.getFireDepartmentMetrics().processed()); + Assert.assertEquals(0, task.getFireDepartmentMetrics().unparseable()); + Assert.assertEquals(0, task.getFireDepartmentMetrics().thrownAway()); + + // Check published segments & metadata + SegmentDescriptor desc1 = SD(task, "2010/P1D", 0); + SegmentDescriptor desc2 = SD(task, "2011/P1D", 0); + SegmentDescriptor desc3 = SD(task, "2012/P1D", 0); + Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3), publishedDescriptors()); + Assert.assertEquals( + new KafkaDataSourceMetadata(new KafkaPartitions("topic0", ImmutableMap.of(0, 5L, 1, 1L))), + metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource()) + ); + + // Check segments in deep storage + Assert.assertEquals(ImmutableList.of("c"), readSegmentDim1(desc1)); + Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentDim1(desc2)); + Assert.assertEquals(ImmutableList.of("g"), readSegmentDim1(desc3)); + } + + @Test(timeout = 60_000L) + public void testRunTwoTasksTwoPartitions() throws Exception + { + final KafkaIndexTask task1 = createTask( + null, + new KafkaIOConfig( + "sequence0", + new KafkaPartitions("topic0", ImmutableMap.of(0, 2L)), + new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)), + kafkaServer.consumerProperties(), + true + ), + null + ); + final KafkaIndexTask task2 = createTask( + null, + new KafkaIOConfig( + "sequence1", + new KafkaPartitions("topic0", ImmutableMap.of(1, 0L)), + new KafkaPartitions("topic0", ImmutableMap.of(1, 1L)), + kafkaServer.consumerProperties(), + true + ), + null + ); + + final ListenableFuture future1 = runTask(task1); + final ListenableFuture future2 = runTask(task2); + + // Insert data + try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { + for (ProducerRecord record : RECORDS) { + kafkaProducer.send(record).get(); + } + } + + // Wait for tasks to exit + Assert.assertEquals(TaskStatus.Status.SUCCESS, future1.get().getStatusCode()); + Assert.assertEquals(TaskStatus.Status.SUCCESS, future2.get().getStatusCode()); + + // Check metrics + Assert.assertEquals(3, task1.getFireDepartmentMetrics().processed()); + Assert.assertEquals(0, task1.getFireDepartmentMetrics().unparseable()); + Assert.assertEquals(0, task1.getFireDepartmentMetrics().thrownAway()); + Assert.assertEquals(1, task2.getFireDepartmentMetrics().processed()); + Assert.assertEquals(0, task2.getFireDepartmentMetrics().unparseable()); + Assert.assertEquals(0, task2.getFireDepartmentMetrics().thrownAway()); + + // Check published segments & metadata + SegmentDescriptor desc1 = SD(task1, "2010/P1D", 0); + SegmentDescriptor desc2 = SD(task1, "2011/P1D", 0); + SegmentDescriptor desc3 = SD(task2, "2012/P1D", 0); + Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3), publishedDescriptors()); + Assert.assertEquals( + new KafkaDataSourceMetadata(new KafkaPartitions("topic0", ImmutableMap.of(0, 5L, 1, 1L))), + metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource()) + ); + + // Check segments in deep storage + Assert.assertEquals(ImmutableList.of("c"), readSegmentDim1(desc1)); + Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentDim1(desc2)); + Assert.assertEquals(ImmutableList.of("g"), readSegmentDim1(desc3)); + } + + @Test(timeout = 60_000L) + public void testRestore() throws Exception + { + final KafkaIndexTask task1 = createTask( + null, + new KafkaIOConfig( + "sequence0", + new KafkaPartitions("topic0", ImmutableMap.of(0, 2L)), + new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)), + kafkaServer.consumerProperties(), + true + ), + null + ); + + final ListenableFuture future1 = runTask(task1); + + // Insert some data, but not enough for the task to finish + try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { + for (ProducerRecord record : Iterables.limit(RECORDS, 4)) { + kafkaProducer.send(record).get(); + } + } + + while (countEvents(task1) != 2) { + Thread.sleep(25); + } + + Assert.assertEquals(2, countEvents(task1)); + + // Stop gracefully + task1.stopGracefully(); + Assert.assertEquals(TaskStatus.Status.FAILED, future1.get().getStatusCode()); + + // Start a new task + final KafkaIndexTask task2 = createTask( + task1.getId(), + new KafkaIOConfig( + "sequence0", + new KafkaPartitions("topic0", ImmutableMap.of(0, 2L)), + new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)), + kafkaServer.consumerProperties(), + true + ), + null + ); + + final ListenableFuture future2 = runTask(task2); + + // Insert remaining data + try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { + for (ProducerRecord record : Iterables.skip(RECORDS, 4)) { + kafkaProducer.send(record).get(); + } + } + + // Wait for task to exit + Assert.assertEquals(TaskStatus.Status.SUCCESS, future2.get().getStatusCode()); + + // Check metrics + Assert.assertEquals(2, task1.getFireDepartmentMetrics().processed()); + Assert.assertEquals(0, task1.getFireDepartmentMetrics().unparseable()); + Assert.assertEquals(0, task1.getFireDepartmentMetrics().thrownAway()); + Assert.assertEquals(1, task2.getFireDepartmentMetrics().processed()); + Assert.assertEquals(0, task2.getFireDepartmentMetrics().unparseable()); + Assert.assertEquals(0, task2.getFireDepartmentMetrics().thrownAway()); + + // Check published segments & metadata + SegmentDescriptor desc1 = SD(task1, "2010/P1D", 0); + SegmentDescriptor desc2 = SD(task1, "2011/P1D", 0); + Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors()); + Assert.assertEquals( + new KafkaDataSourceMetadata(new KafkaPartitions("topic0", ImmutableMap.of(0, 5L))), + metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource()) + ); + + // Check segments in deep storage + Assert.assertEquals(ImmutableList.of("c"), readSegmentDim1(desc1)); + Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentDim1(desc2)); + } + + private ListenableFuture runTask(final Task task) + { + try { + taskStorage.insert(task, TaskStatus.running(task.getId())); + } + catch (EntryExistsException e) { + // suppress + } + taskLockbox.syncFromStorage(); + final TaskToolbox toolbox = toolboxFactory.build(task); + synchronized (runningTasks) { + runningTasks.add(task); + } + return taskExec.submit( + new Callable() + { + @Override + public TaskStatus call() throws Exception + { + try { + if (task.isReady(toolbox.getTaskActionClient())) { + return task.run(toolbox); + } else { + throw new ISE("Task is not ready"); + } + } + catch (Exception e) { + log.warn(e, "Task failed"); + return TaskStatus.failure(task.getId()); + } + } + } + ); + } + + private TaskLock getLock(final Task task, final Interval interval) + { + return Iterables.find( + taskLockbox.findLocksForTask(task), + new Predicate() + { + @Override + public boolean apply(TaskLock lock) + { + return lock.getInterval().contains(interval); + } + } + ); + } + + private KafkaIndexTask createTask( + final String taskId, + final KafkaIOConfig ioConfig, + final Integer maxRowsPerSegment + ) + { + final KafkaTuningConfig tuningConfig = new KafkaTuningConfig( + 1000, + maxRowsPerSegment, + new Period("P1Y"), + null, + null, + null, + buildV9Directly, + reportParseExceptions, + handoffConditionTimeout + ); + return new KafkaIndexTask( + taskId, + null, + DATA_SCHEMA, + tuningConfig, + ioConfig, + null + ); + } + + private QueryRunnerFactoryConglomerate makeTimeseriesOnlyConglomerate() + { + return new DefaultQueryRunnerFactoryConglomerate( + ImmutableMap., QueryRunnerFactory>of( + TimeseriesQuery.class, + new TimeseriesQueryRunnerFactory( + new TimeseriesQueryQueryToolChest( + new IntervalChunkingQueryRunnerDecorator(null, null, null) + { + @Override + public QueryRunner decorate( + QueryRunner delegate, QueryToolChest> toolChest + ) + { + return delegate; + } + } + ), + new TimeseriesQueryEngine(), + new QueryWatcher() + { + @Override + public void registerQuery(Query query, ListenableFuture future) + { + // do nothing + } + } + ) + ) + ); + } + + private void makeToolboxFactory() throws IOException + { + directory = tempFolder.newFolder(); + final TestUtils testUtils = new TestUtils(); + final ObjectMapper objectMapper = testUtils.getTestObjectMapper(); + for (Module module : new KafkaIndexTaskModule().getJacksonModules()) { + objectMapper.registerModule(module); + } + final TaskConfig taskConfig = new TaskConfig( + new File(directory, "taskBaseDir").getPath(), + null, + null, + 50000, + null, + false, + null, + null + ); + final TestDerbyConnector derbyConnector = derby.getConnector(); + derbyConnector.createDataSourceTable(); + derbyConnector.createPendingSegmentsTable(); + derbyConnector.createSegmentTable(); + derbyConnector.createRulesTable(); + derbyConnector.createConfigTable(); + derbyConnector.createTaskTables(); + derbyConnector.createAuditTable(); + taskStorage = new MetadataTaskStorage( + derbyConnector, + new TaskStorageConfig(null), + new SQLMetadataStorageActionHandlerFactory( + derbyConnector, + derby.metadataTablesConfigSupplier().get(), + objectMapper + ) + ); + metadataStorageCoordinator = new IndexerSQLMetadataStorageCoordinator( + testUtils.getTestObjectMapper(), + derby.metadataTablesConfigSupplier().get(), + derbyConnector + ); + taskLockbox = new TaskLockbox(taskStorage); + final TaskActionToolbox taskActionToolbox = new TaskActionToolbox( + taskLockbox, + metadataStorageCoordinator, + emitter + ); + final TaskActionClientFactory taskActionClientFactory = new LocalTaskActionClientFactory( + taskStorage, + taskActionToolbox + ); + final SegmentHandoffNotifierFactory handoffNotifierFactory = new SegmentHandoffNotifierFactory() + { + @Override + public SegmentHandoffNotifier createSegmentHandoffNotifier(String dataSource) + { + return new SegmentHandoffNotifier() + { + @Override + public boolean registerSegmentHandoffCallback( + SegmentDescriptor descriptor, Executor exec, Runnable handOffRunnable + ) + { + if (doHandoff) { + // Simulate immediate handoff + exec.execute(handOffRunnable); + } + return true; + } + + @Override + public void start() + { + //Noop + } + + @Override + public void close() + { + //Noop + } + }; + } + }; + final LocalDataSegmentPusherConfig dataSegmentPusherConfig = new LocalDataSegmentPusherConfig(); + dataSegmentPusherConfig.storageDirectory = getSegmentDirectory(); + final DataSegmentPusher dataSegmentPusher = new LocalDataSegmentPusher(dataSegmentPusherConfig, objectMapper); + toolboxFactory = new TaskToolboxFactory( + taskConfig, + taskActionClientFactory, + emitter, + dataSegmentPusher, + new TestDataSegmentKiller(), + null, // DataSegmentMover + null, // DataSegmentArchiver + new TestDataSegmentAnnouncer(), + handoffNotifierFactory, + makeTimeseriesOnlyConglomerate(), + MoreExecutors.sameThreadExecutor(), // queryExecutorService + EasyMock.createMock(MonitorScheduler.class), + new SegmentLoaderFactory( + new SegmentLoaderLocalCacheManager( + null, + new SegmentLoaderConfig() + { + @Override + public List getLocations() + { + return Lists.newArrayList(); + } + }, testUtils.getTestObjectMapper() + ) + ), + testUtils.getTestObjectMapper(), + testUtils.getTestIndexMerger(), + testUtils.getTestIndexIO(), + MapCache.create(1024), + new CacheConfig(), + testUtils.getTestIndexMergerV9() + ); + } + + private void destroyToolboxFactory() + { + toolboxFactory = null; + taskStorage = null; + taskLockbox = null; + metadataStorageCoordinator = null; + } + + private Set publishedDescriptors() throws IOException + { + return FluentIterable.from( + metadataStorageCoordinator.getUsedSegmentsForInterval( + DATA_SCHEMA.getDataSource(), + new Interval("0000/3000") + ) + ).transform( + new Function() + { + @Override + public SegmentDescriptor apply(DataSegment input) + { + return input.toDescriptor(); + } + } + ).toSet(); + } + + private File getSegmentDirectory() + { + return new File(directory, "segments"); + } + + private List readSegmentDim1(final SegmentDescriptor descriptor) throws IOException + { + File indexZip = new File( + String.format( + "%s/%s/%s_%s/%s/%d/index.zip", + getSegmentDirectory(), + DATA_SCHEMA.getDataSource(), + descriptor.getInterval().getStart(), + descriptor.getInterval().getEnd(), + descriptor.getVersion(), + descriptor.getPartitionNumber() + ) + ); + File outputLocation = new File( + directory, + String.format( + "%s_%s_%s_%s", + descriptor.getInterval().getStart(), + descriptor.getInterval().getEnd(), + descriptor.getVersion(), + descriptor.getPartitionNumber() + ) + ); + outputLocation.mkdir(); + CompressionUtils.unzip( + Files.asByteSource(indexZip), + outputLocation, + Predicates.alwaysFalse(), + false + ); + IndexIO indexIO = new TestUtils().getTestIndexIO(); + QueryableIndex index = indexIO.loadIndex(outputLocation); + DictionaryEncodedColumn dim1 = index.getColumn("dim1").getDictionaryEncoding(); + List values = Lists.newArrayList(); + for (int i = 0; i < dim1.length(); i++) { + int id = dim1.getSingleValueRow(i); + String value = dim1.lookupName(id); + values.add(value); + } + return values; + } + + public long countEvents(final Task task) throws Exception + { + // Do a query. + TimeseriesQuery query = Druids.newTimeseriesQueryBuilder() + .dataSource(DATA_SCHEMA.getDataSource()) + .aggregators( + ImmutableList.of( + new LongSumAggregatorFactory("rows", "rows") + ) + ).granularity(QueryGranularity.ALL) + .intervals("0000/3000") + .build(); + + ArrayList> results = Sequences.toList( + task.getQueryRunner(query).run(query, ImmutableMap.of()), + Lists.>newArrayList() + ); + + return results.isEmpty() ? 0 : results.get(0).getValue().getLongMetric("rows"); + } + + private static byte[] JB(String timestamp, String dim1, String dim2, double met1) + { + try { + return new ObjectMapper().writeValueAsBytes( + ImmutableMap.of("timestamp", timestamp, "dim1", dim1, "dim2", dim2, "met1", met1) + ); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + + private SegmentDescriptor SD(final Task task, final String intervalString, final int partitionNum) + { + final Interval interval = new Interval(intervalString); + return new SegmentDescriptor(interval, getLock(task, interval).getVersion(), partitionNum); + } +} diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/test/TestBroker.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/test/TestBroker.java new file mode 100644 index 000000000000..9a98d430359d --- /dev/null +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/test/TestBroker.java @@ -0,0 +1,114 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.kafka.test; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import kafka.server.KafkaConfig; +import kafka.server.KafkaServer; +import kafka.utils.SystemTime$; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import scala.Some; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.util.Map; +import java.util.Properties; +import java.util.Random; + +public class TestBroker implements Closeable +{ + private final String zookeeperConnect; + private final File directory; + private final int id; + private final Map brokerProps; + + private volatile KafkaServer server; + + public TestBroker(String zookeeperConnect, File directory, int id, Map brokerProps) + { + this.zookeeperConnect = zookeeperConnect; + this.directory = directory; + this.id = id; + this.brokerProps = brokerProps == null ? ImmutableMap.of() : brokerProps; + } + + public void start() + { + final Properties props = new Properties(); + props.setProperty("zookeeper.connect", zookeeperConnect); + props.setProperty("log.dirs", directory.toString()); + props.setProperty("broker.id", String.valueOf(id)); + props.setProperty("port", String.valueOf(new Random().nextInt(9999) + 10000)); + props.putAll(brokerProps); + + final KafkaConfig config = new KafkaConfig(props); + + server = new KafkaServer(config, SystemTime$.MODULE$, Some.apply(String.format("TestingBroker[%d]-", id))); + server.startup(); + } + + public int getPort() + { + return server.socketServer().config().port(); + } + + public KafkaProducer newProducer() + { + return new KafkaProducer(producerProperties()); + } + + public KafkaConsumer newConsumer() + { + return new KafkaConsumer(consumerProperties()); + } + + public Map producerProperties() + { + final Map props = Maps.newHashMap(); + props.put("bootstrap.servers", String.format("localhost:%d", getPort())); + props.put("key.serializer", ByteArraySerializer.class.getName()); + props.put("value.serializer", ByteArraySerializer.class.getName()); + props.put("acks", "all"); + return props; + } + + public Map consumerProperties() + { + final Map props = Maps.newHashMap(); + props.put("bootstrap.servers", String.format("localhost:%d", getPort())); + props.put("key.deserializer", ByteArrayDeserializer.class.getName()); + props.put("value.deserializer", ByteArrayDeserializer.class.getName()); + return props; + } + + @Override + public void close() throws IOException + { + if (server != null) { + server.shutdown(); + server.awaitShutdown(); + } + } +} diff --git a/pom.xml b/pom.xml index a70e710d8b58..a3c857fe5662 100644 --- a/pom.xml +++ b/pom.xml @@ -88,6 +88,7 @@ extensions-core/histogram extensions-core/kafka-eight extensions-core/kafka-extraction-namespace + extensions-core/kafka-indexing-service extensions-core/mysql-metadata-storage extensions-core/postgresql-metadata-storage extensions-core/namespace-lookup