diff --git a/.travis.yml b/.travis.yml index b54e29edefcd..f787f99c34e0 100644 --- a/.travis.yml +++ b/.travis.yml @@ -307,6 +307,7 @@ jobs: # Integration tests Java Compile version is set by the machine environment jdk (set by the jdk key) # Integration tests Java Runtime version is set by the JVM_RUNTIME env property (set env key to -Djvm.runtime=) + # Integration tests will either use MiddleManagers or Indexers # (Currently integration tests only support running with jvm runtime 8 and 11) # START - Integration tests for Compile with Java 8 and Run with Java 8 - &integration_batch_index @@ -314,9 +315,9 @@ jobs: jdk: openjdk8 services: &integration_test_services - docker - env: TESTNG_GROUPS='-Dgroups=batch-index' JVM_RUNTIME='-Djvm.runtime=8' + env: TESTNG_GROUPS='-Dgroups=batch-index' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='middleManager' script: &run_integration_test - - ${MVN} verify -pl integration-tests -P integration-tests ${TESTNG_GROUPS} ${JVM_RUNTIME} ${MAVEN_SKIP} + - ${MVN} verify -pl integration-tests -P integration-tests ${TESTNG_GROUPS} ${JVM_RUNTIME} -Dit.indexer=${USE_INDEXER} ${MAVEN_SKIP} after_failure: &integration_test_diags - for v in ~/shared/logs/*.log ; do echo $v logtail ======================== ; tail -100 $v ; @@ -326,51 +327,75 @@ jobs: docker exec -it druid-$v sh -c 'dmesg | tail -3' ; done + - <<: *integration_batch_index + name: "(Compile=openjdk8, Run=openjdk8) batch index integration test with Indexer" + env: TESTNG_GROUPS='-Dgroups=batch-index' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='indexer' + - &integration_input_format name: "(Compile=openjdk8, Run=openjdk8) input format integration test" jdk: openjdk8 services: *integration_test_services - env: TESTNG_GROUPS='-Dgroups=input-format' JVM_RUNTIME='-Djvm.runtime=8' + env: TESTNG_GROUPS='-Dgroups=input-format' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='middleManager' script: *run_integration_test after_failure: *integration_test_diags + - <<: *integration_input_format + name: "(Compile=openjdk8, Run=openjdk8) input format integration test with Indexer" + env: TESTNG_GROUPS='-Dgroups=input-format' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='indexer' + - &integration_input_source name: "(Compile=openjdk8, Run=openjdk8) input source integration test" jdk: openjdk8 services: *integration_test_services - env: TESTNG_GROUPS='-Dgroups=input-source' JVM_RUNTIME='-Djvm.runtime=8' + env: TESTNG_GROUPS='-Dgroups=input-source' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='middleManager' script: *run_integration_test after_failure: *integration_test_diags + - <<: *integration_input_source + name: "(Compile=openjdk8, Run=openjdk8) input source integration test with Indexer" + env: TESTNG_GROUPS='-Dgroups=input-source' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='indexer' + - &integration_perfect_rollup_parallel_batch_index name: "(Compile=openjdk8, Run=openjdk8) perfect rollup parallel batch index integration test" jdk: openjdk8 services: *integration_test_services - env: TESTNG_GROUPS='-Dgroups=perfect-rollup-parallel-batch-index' JVM_RUNTIME='-Djvm.runtime=8' + env: TESTNG_GROUPS='-Dgroups=perfect-rollup-parallel-batch-index' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='middleManager' script: *run_integration_test after_failure: *integration_test_diags + - <<: *integration_perfect_rollup_parallel_batch_index + name: "(Compile=openjdk8, Run=openjdk8) perfect rollup parallel batch index integration test with Indexer" + env: TESTNG_GROUPS='-Dgroups=perfect-rollup-parallel-batch-index' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='indexer' + - &integration_kafka_index name: "(Compile=openjdk8, Run=openjdk8) kafka index integration test" jdk: openjdk8 services: *integration_test_services - env: TESTNG_GROUPS='-Dgroups=kafka-index' JVM_RUNTIME='-Djvm.runtime=8' + env: TESTNG_GROUPS='-Dgroups=kafka-index' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='middleManager' script: *run_integration_test after_failure: *integration_test_diags + - <<: *integration_kafka_index + name: "(Compile=openjdk8, Run=openjdk8) kafka index, transactional kafka index integration test with Indexer" + env: TESTNG_GROUPS='-Dgroups=kafka-index,kafka-transactional-index' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='indexer' + - &integration_kafka_index_slow name: "(Compile=openjdk8, Run=openjdk8) kafka index integration test slow" jdk: openjdk8 services: *integration_test_services - env: TESTNG_GROUPS='-Dgroups=kafka-index-slow' JVM_RUNTIME='-Djvm.runtime=8' + env: TESTNG_GROUPS='-Dgroups=kafka-index-slow' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='middleManager' script: *run_integration_test after_failure: *integration_test_diags + - <<: *integration_kafka_index_slow + name: "(Compile=openjdk8, Run=openjdk8) kafka index integration test slow with Indexer" + env: TESTNG_GROUPS='-Dgroups=kafka-index-slow' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='indexer' + - &integration_kafka_transactional_index name: "(Compile=openjdk8, Run=openjdk8) transactional kafka index integration test" jdk: openjdk8 services: *integration_test_services - env: TESTNG_GROUPS='-Dgroups=kafka-transactional-index' JVM_RUNTIME='-Djvm.runtime=8' + env: TESTNG_GROUPS='-Dgroups=kafka-transactional-index' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='middleManager' script: *run_integration_test after_failure: *integration_test_diags @@ -378,23 +403,31 @@ jobs: name: "(Compile=openjdk8, Run=openjdk8) transactional kafka index integration test slow" jdk: openjdk8 services: *integration_test_services - env: TESTNG_GROUPS='-Dgroups=kafka-transactional-index-slow' JVM_RUNTIME='-Djvm.runtime=8' + env: TESTNG_GROUPS='-Dgroups=kafka-transactional-index-slow' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='middleManager' script: *run_integration_test after_failure: *integration_test_diags + - <<: *integration_kafka_transactional_index_slow + name: "(Compile=openjdk8, Run=openjdk8) transactional kafka index integration test slow with Indexer" + env: TESTNG_GROUPS='-Dgroups=kafka-transactional-index-slow' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='indexer' + - &integration_kafka_format_tests - name: "(Compile=openjdk8, Run=openjdk8) Kafka index integration test with various formats" - jdk: openjdk8 - services: *integration_test_services - env: TESTNG_GROUPS='-Dgroups=kafka-data-format' JVM_RUNTIME='-Djvm.runtime=8' - script: *run_integration_test - after_failure: *integration_test_diags + name: "(Compile=openjdk8, Run=openjdk8) Kafka index integration test with various formats" + jdk: openjdk8 + services: *integration_test_services + env: TESTNG_GROUPS='-Dgroups=kafka-data-format' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='middleManager' + script: *run_integration_test + after_failure: *integration_test_diags + + - <<: *integration_kafka_format_tests + name: "(Compile=openjdk8, Run=openjdk8) Kafka index integration test with various formats with Indexer" + env: TESTNG_GROUPS='-Dgroups=kafka-data-format' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='indexer' - &integration_query name: "(Compile=openjdk8, Run=openjdk8) query integration test" jdk: openjdk8 services: *integration_test_services - env: TESTNG_GROUPS='-Dgroups=query' JVM_RUNTIME='-Djvm.runtime=8' + env: TESTNG_GROUPS='-Dgroups=query' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='middleManager' script: *run_integration_test after_failure: *integration_test_diags @@ -402,7 +435,7 @@ jobs: name: "(Compile=openjdk8, Run=openjdk8) query retry integration test for missing segments" jdk: openjdk8 services: *integration_test_services - env: TESTNG_GROUPS='-Dgroups=query-retry' JVM_RUNTIME='-Djvm.runtime=8' + env: TESTNG_GROUPS='-Dgroups=query-retry' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='middleManager' script: *run_integration_test after_failure: *integration_test_diags @@ -410,7 +443,7 @@ jobs: name: "(Compile=openjdk8, Run=openjdk8) security integration test" jdk: openjdk8 services: *integration_test_services - env: TESTNG_GROUPS='-Dgroups=security' JVM_RUNTIME='-Djvm.runtime=8' + env: TESTNG_GROUPS='-Dgroups=security' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='middleManager' script: *run_integration_test after_failure: *integration_test_diags @@ -418,7 +451,7 @@ jobs: name: "(Compile=openjdk8, Run=openjdk8) realtime index integration test" jdk: openjdk8 services: *integration_test_services - env: TESTNG_GROUPS='-Dgroups=realtime-index' JVM_RUNTIME='-Djvm.runtime=8' + env: TESTNG_GROUPS='-Dgroups=realtime-index' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='middleManager' script: *run_integration_test after_failure: *integration_test_diags @@ -426,82 +459,94 @@ jobs: name: "(Compile=openjdk8, Run=openjdk8) append ingestion integration test" jdk: openjdk8 services: *integration_test_services - env: TESTNG_GROUPS='-Dgroups=append-ingestion' JVM_RUNTIME='-Djvm.runtime=8' + env: TESTNG_GROUPS='-Dgroups=append-ingestion' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='middleManager' script: *run_integration_test after_failure: *integration_test_diags + - <<: *integration_append_ingestion + name: "(Compile=openjdk8, Run=openjdk8) append ingestion integration test with Indexer" + env: TESTNG_GROUPS='-Dgroups=append-ingestion' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='indexer' + - &integration_compaction_tests name: "(Compile=openjdk8, Run=openjdk8) compaction integration test" jdk: openjdk8 services: *integration_test_services - env: TESTNG_GROUPS='-Dgroups=compaction' JVM_RUNTIME='-Djvm.runtime=8' + env: TESTNG_GROUPS='-Dgroups=compaction' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='middleManager' script: *run_integration_test after_failure: *integration_test_diags + - <<: *integration_compaction_tests + name: "(Compile=openjdk8, Run=openjdk8) compaction integration test with Indexer" + env: TESTNG_GROUPS='-Dgroups=compaction' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='indexer' + - &integration_tests - name: "(Compile=openjdk8, Run=openjdk8) other integration test" + name: "(Compile=openjdk8, Run=openjdk8) other integration tests" jdk: openjdk8 services: *integration_test_services - env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage,append-ingestion,compaction' JVM_RUNTIME='-Djvm.runtime=11' + env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage,append-ingestion,compaction' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='middleManager' script: *run_integration_test after_failure: *integration_test_diags + + - <<: *integration_tests + name: "(Compile=openjdk8, Run=openjdk8) other integration tests with Indexer" + env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage,append-ingestion,compaction' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='indexer' # END - Integration tests for Compile with Java 8 and Run with Java 8 # START - Integration tests for Compile with Java 8 and Run with Java 11 - <<: *integration_batch_index name: "(Compile=openjdk8, Run=openjdk11) batch index integration test" jdk: openjdk8 - env: TESTNG_GROUPS='-Dgroups=batch-index' JVM_RUNTIME='-Djvm.runtime=11' + env: TESTNG_GROUPS='-Dgroups=batch-index' JVM_RUNTIME='-Djvm.runtime=11' USE_INDEXER='middleManager' - <<: *integration_input_format name: "(Compile=openjdk8, Run=openjdk11) input format integration test" jdk: openjdk8 - env: TESTNG_GROUPS='-Dgroups=input-format' JVM_RUNTIME='-Djvm.runtime=11' + env: TESTNG_GROUPS='-Dgroups=input-format' JVM_RUNTIME='-Djvm.runtime=11' USE_INDEXER='middleManager' - <<: *integration_input_source name: "(Compile=openjdk8, Run=openjdk11) input source integration test" jdk: openjdk8 - env: TESTNG_GROUPS='-Dgroups=input-source' JVM_RUNTIME='-Djvm.runtime=11' + env: TESTNG_GROUPS='-Dgroups=input-source' JVM_RUNTIME='-Djvm.runtime=11' USE_INDEXER='middleManager' - <<: *integration_perfect_rollup_parallel_batch_index name: "(Compile=openjdk8, Run=openjdk11) perfect rollup parallel batch index integration test" jdk: openjdk8 - env: TESTNG_GROUPS='-Dgroups=perfect-rollup-parallel-batch-index' JVM_RUNTIME='-Djvm.runtime=11' + env: TESTNG_GROUPS='-Dgroups=perfect-rollup-parallel-batch-index' JVM_RUNTIME='-Djvm.runtime=11' USE_INDEXER='middleManager' - <<: *integration_query name: "(Compile=openjdk8, Run=openjdk11) query integration test" jdk: openjdk8 - env: TESTNG_GROUPS='-Dgroups=query' JVM_RUNTIME='-Djvm.runtime=11' + env: TESTNG_GROUPS='-Dgroups=query' JVM_RUNTIME='-Djvm.runtime=11' USE_INDEXER='middleManager' - <<: *integration_query_retry name: "(Compile=openjdk8, Run=openjdk11) query retry integration test for missing segments" jdk: openjdk8 - env: TESTNG_GROUPS='-Dgroups=query-retry' JVM_RUNTIME='-Djvm.runtime=11' + env: TESTNG_GROUPS='-Dgroups=query-retry' JVM_RUNTIME='-Djvm.runtime=11' USE_INDEXER='middleManager' - <<: *integration_security name: "(Compile=openjdk8, Run=openjdk11) security integration test" jdk: openjdk8 - env: TESTNG_GROUPS='-Dgroups=security' JVM_RUNTIME='-Djvm.runtime=11' + env: TESTNG_GROUPS='-Dgroups=security' JVM_RUNTIME='-Djvm.runtime=11' USE_INDEXER='middleManager' - <<: *integration_realtime_index name: "(Compile=openjdk8, Run=openjdk11) realtime index integration test" jdk: openjdk8 - env: TESTNG_GROUPS='-Dgroups=realtime-index' JVM_RUNTIME='-Djvm.runtime=11' + env: TESTNG_GROUPS='-Dgroups=realtime-index' JVM_RUNTIME='-Djvm.runtime=11' USE_INDEXER='middleManager' - <<: *integration_append_ingestion name: "(Compile=openjdk8, Run=openjdk11) append ingestion integration test" jdk: openjdk8 - env: TESTNG_GROUPS='-Dgroups=append-ingestion' JVM_RUNTIME='-Djvm.runtime=11' + env: TESTNG_GROUPS='-Dgroups=append-ingestion' JVM_RUNTIME='-Djvm.runtime=11' USE_INDEXER='middleManager' - <<: *integration_compaction_tests name: "(Compile=openjdk8, Run=openjdk11) compaction integration test" jdk: openjdk8 - env: TESTNG_GROUPS='-Dgroups=compaction' JVM_RUNTIME='-Djvm.runtime=11' + env: TESTNG_GROUPS='-Dgroups=compaction' JVM_RUNTIME='-Djvm.runtime=11' USE_INDEXER='middleManager' - <<: *integration_tests name: "(Compile=openjdk8, Run=openjdk11) other integration test" jdk: openjdk8 - env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage,append-ingestion,compaction' JVM_RUNTIME='-Djvm.runtime=11' + env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage,append-ingestion,compaction' JVM_RUNTIME='-Djvm.runtime=11' USE_INDEXER='middleManager' # END - Integration tests for Compile with Java 8 and Run with Java 11 - name: "security vulnerabilities" diff --git a/docs/design/indexer.md b/docs/design/indexer.md index 791bde1a44b8..fc47bb75fd4b 100644 --- a/docs/design/indexer.md +++ b/docs/design/indexer.md @@ -89,6 +89,8 @@ By default, the number of concurrent persist/merge operations is limited to (`dr Separate task logs are not currently supported when using the Indexer; all task log messages will instead be logged in the Indexer process log. -The Indexer currently imposes an identical memory limit on each task. In later releases, the per-task memory limit will be removed and only the global limit will apply. The limit on concurrent merges will also be removed. +The Indexer currently imposes an identical memory limit on each task. In later releases, the per-task memory limit will be removed and only the global limit will apply. The limit on concurrent merges will also be removed. -In later releases, per-task memory usage will be dynamically managed. Please see https://github.com/apache/druid/issues/7900 for details on future enhancements to the Indexer. \ No newline at end of file +The Indexer does not work properly with [`index_realtime`](../ingestion/tasks.md#index_realtime) task types. Therefore, it is not compatible with [Tranquility](../ingestion/tranquility.md). If you are using Tranquility, consider migrating to Druid's builtin [Apache Kafka](../development/extensions-core/kafka-ingestion.md) or [Amazon Kinesis](../development/extensions-core/kinesis-ingestion.md) ingestion options. + +In later releases, per-task memory usage will be dynamically managed. Please see https://github.com/apache/druid/issues/7900 for details on future enhancements to the Indexer. diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java index 83849fefd136..a316ce8d7867 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java @@ -44,6 +44,7 @@ import org.apache.druid.indexing.firehose.IngestSegmentFirehoseFactory; import org.apache.druid.indexing.firehose.WindowedSegmentId; import org.apache.druid.indexing.overlord.Segments; +import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.JodaUtils; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.granularity.GranularityType; @@ -82,7 +83,7 @@ /** * Abstract class for batch tasks like {@link IndexTask}. * Provides some methods such as {@link #determineSegmentGranularity}, {@link #findInputSegments}, - * and {@link #determineLockGranularityandTryLock} for easily acquiring task locks. + * and {@link #determineLockGranularityAndTryLock} for easily acquiring task locks. */ public abstract class AbstractBatchIndexTask extends AbstractTask { @@ -122,6 +123,17 @@ protected AbstractBatchIndexTask( @Override public TaskStatus run(TaskToolbox toolbox) throws Exception { + if (taskLockHelper == null) { + // Subclasses generally use "isReady" to initialize the taskLockHelper. It's not guaranteed to be called before + // "run", and so we call it here to ensure it happens. + // + // We're only really calling it for its side effects, and we expect it to return "true". If it doesn't, something + // strange is going on, so bail out. + if (!isReady(toolbox.getTaskActionClient())) { + throw new ISE("Cannot start; not ready!"); + } + } + synchronized (this) { if (stopped) { return TaskStatus.failure(getId()); @@ -251,9 +263,17 @@ public TaskLockHelper getTaskLockHelper() } /** - * Determine lockGranularity to use and try to acquire necessary locks. - * This method respects the value of 'forceTimeChunkLock' in task context. - * If it's set to false or missing, this method checks if this task can use segmentLock. + * Attempts to acquire a lock that covers the intervals specified in a certain granularitySpec. + * + * This method uses {@link GranularitySpec#bucketIntervals()} to get the list of intervals to lock, and passes them + * to {@link #determineLockGranularityAndTryLock(TaskActionClient, List)}. + * + * Will look at {@link Tasks#FORCE_TIME_CHUNK_LOCK_KEY} to decide whether to acquire a time chunk or segment lock. + * + * If {@link Tasks#FORCE_TIME_CHUNK_LOCK_KEY} is set, or if {@param intervals} is nonempty, then this method + * will initialize {@link #taskLockHelper} as a side effect. + * + * @return whether the lock was acquired */ protected boolean determineLockGranularityAndTryLock( TaskActionClient client, @@ -263,10 +283,20 @@ protected boolean determineLockGranularityAndTryLock( final List intervals = granularitySpec.bucketIntervals().isPresent() ? new ArrayList<>(granularitySpec.bucketIntervals().get()) : Collections.emptyList(); - return determineLockGranularityandTryLock(client, intervals); + return determineLockGranularityAndTryLock(client, intervals); } - boolean determineLockGranularityandTryLock(TaskActionClient client, List intervals) throws IOException + /** + * Attempts to acquire a lock that covers certain intervals. + * + * Will look at {@link Tasks#FORCE_TIME_CHUNK_LOCK_KEY} to decide whether to acquire a time chunk or segment lock. + * + * If {@link Tasks#FORCE_TIME_CHUNK_LOCK_KEY} is set, or if {@param intervals} is nonempty, then this method + * will initialize {@link #taskLockHelper} as a side effect. + * + * @return whether the lock was acquired + */ + boolean determineLockGranularityAndTryLock(TaskActionClient client, List intervals) throws IOException { final boolean forceTimeChunkLock = getContextValue( Tasks.FORCE_TIME_CHUNK_LOCK_KEY, @@ -287,12 +317,22 @@ boolean determineLockGranularityandTryLock(TaskActionClient client, List segments, BiConsumer> segmentCheckFunction @@ -396,7 +436,8 @@ private LockGranularityDetermineResult determineSegmentGranularity(List !segmentGranularityFromSpec.isAligned(segment.getInterval())))) { + || segments.stream() + .anyMatch(segment -> !segmentGranularityFromSpec.isAligned(segment.getInterval())))) { // This case is one of the followings: // 1) Segments have different granularities. // 2) Segment granularity in ingestion spec is different from the one of existig segments. diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index afaf0cfb01e9..e773b6f95d33 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -312,7 +312,7 @@ public int getPriority() public boolean isReady(TaskActionClient taskActionClient) throws Exception { final List segments = segmentProvider.findSegments(taskActionClient); - return determineLockGranularityandTryLockWithSegments(taskActionClient, segments, segmentProvider::checkSegments); + return determineLockGranularityAndTryLockWithSegments(taskActionClient, segments, segmentProvider::checkSegments); } @Override diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index f351a5db6c7b..69484b54f13a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -476,7 +476,7 @@ public TaskStatus runTask(final TaskToolbox toolbox) final List allocateIntervals = new ArrayList<>(partitionAnalysis.getAllIntervalsToIndex()); final DataSchema dataSchema; if (determineIntervals) { - if (!determineLockGranularityandTryLock(toolbox.getTaskActionClient(), allocateIntervals)) { + if (!determineLockGranularityAndTryLock(toolbox.getTaskActionClient(), allocateIntervals)) { throw new ISE("Failed to get locks for intervals[%s]", allocateIntervals); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java index 197d9017fa4e..5529aebc6423 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java @@ -160,8 +160,11 @@ default int getPriority() /** * Execute preflight actions for a task. This can be used to acquire locks, check preconditions, and so on. The * actions must be idempotent, since this method may be executed multiple times. This typically runs on the - * coordinator. If this method throws an exception, the task should be considered a failure. - *

+ * Overlord. If this method throws an exception, the task should be considered a failure. + * + * This method will not necessarily be executed before {@link #run(TaskToolbox)}, since this task readiness check + * may be done on a different machine from the one that actually runs the task. + * * This method must be idempotent, as it may be run multiple times per task. * * @param taskActionClient action client for this task (not the full toolbox) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/TaskLockHelper.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/TaskLockHelper.java index af80d8f9dce5..1e9dbc8fd5cb 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/TaskLockHelper.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/TaskLockHelper.java @@ -51,7 +51,7 @@ * - {@link #verifyAndLockExistingSegments} is to verify the granularity of existing segments and lock them. * This method must be called before the task starts indexing. * - Tells the task what {@link LockGranularity} it should use. Note that the LockGranularity is determined in - * {@link AbstractBatchIndexTask#determineLockGranularityandTryLock}. + * {@link AbstractBatchIndexTask#determineLockGranularityAndTryLock}. * - Provides some util methods for {@link LockGranularity#SEGMENT}. Also caches the information of locked segments when * - the SEGMENt lock granularity is used. */ diff --git a/integration-tests/README.md b/integration-tests/README.md index bf9baa86d402..58ecf2fd5944 100644 --- a/integration-tests/README.md +++ b/integration-tests/README.md @@ -37,7 +37,7 @@ Integration Testing Using Docker Before starting, if you don't already have docker on your machine, install it as described on [Docker installation instructions](https://docs.docker.com/install/). Ensure that you have at least 4GB of memory allocated to the docker engine. (You can verify it -under Preferences > Advanced.) +under Preferences > Resources > Advanced.) Also set the `DOCKER_IP` environment variable to localhost on your system, as follows: @@ -72,31 +72,40 @@ Druid routers for security group integration test (permissive tls, no client aut ## Docker compose -Docker compose yamls located in "docker" folder +There are a few different Docker compose yamls located in "docker" folder. Before you can run any of these, you must +build the Docker images. See "Manually bringing up Docker containers and running tests" below. docker-compose.base.yml - Base file that defines all containers for integration test docker-compose.yml - Defines Druid cluster with default configuration that is used for running integration tests in Travis CI. - - docker-compose -f docker-compose.yml up - // DRUID_INTEGRATION_TEST_GROUP - this variable is used in Druid docker container for "security" and "query" test group. Use next docker-compose if you want to run security/query tests. - DRUID_INTEGRATION_TEST_GROUP=security docker-compose -f docker-compose.yml up + +``` +docker-compose -f docker-compose.yml up +// DRUID_INTEGRATION_TEST_GROUP - this variable is used in Druid docker container for "security" and "query" test group. Use next docker-compose if you want to run security/query tests. +DRUID_INTEGRATION_TEST_GROUP=security docker-compose -f docker-compose.yml up +``` docker-compose.override-env.yml - Defines Druid cluster with default configuration plus any additional and/or overriden configurations from override-env file. - // OVERRIDE_ENV - variable that must contains path to Druid configuration file - OVERRIDE_ENV=./environment-configs/override-examples/s3 docker-compose -f docker-compose.override-env.yml up - +``` +// OVERRIDE_ENV - variable that must contains path to Druid configuration file +OVERRIDE_ENV=./environment-configs/override-examples/s3 docker-compose -f docker-compose.override-env.yml up +``` + docker-compose.security.yml - Defines three additional Druid router services with permissive tls, no client auth tls, and custom check tls respectively. This is meant to be use together with docker-compose.yml or docker-compose.override-env.yml and is only needed for the "security" group integration test. - docker-compose -f docker-compose.yml -f docker-compose.security.yml up - +``` +docker-compose -f docker-compose.yml -f docker-compose.security.yml up +``` + docker-compose.druid-hadoop.yml - for starting Apache Hadoop 2.8.5 cluster with the same setup as the Druid tutorial - docker-compose -f docker-compose.druid-hadoop.yml up +``` +docker-compose -f docker-compose.druid-hadoop.yml up +``` -## Manual bringing up docker containers and running tests +## Manually bringing up Docker containers and running tests 1. Build druid-cluster, druid-hadoop docker images. From root module run maven command: ``` @@ -106,30 +115,38 @@ mvn clean install -pl integration-tests -P integration-tests -Ddocker.run.skip=t 2. Run druid cluster by docker-compose: ``` -- Basic Druid cluster (skip this if running Druid cluster with override configs): +# Basic Druid cluster (skip this if running Druid cluster with override configs): docker-compose -f integration-tests/docker/docker-compose.yml up -- Druid cluster with override configs (skip this if running Basic Druid cluster): + +# Druid cluster with override configs (skip this if running Basic Druid cluster): OVERRIDE_ENV= docker-compose -f ${DOCKERDIR}/docker-compose.override-env.yml up -- Druid hadoop (if needed): + +# Druid hadoop (if needed): docker-compose -f ${DOCKERDIR}/docker-compose.druid-hadoop.yml up -- Druid routers for security group integration test (if needed): - docker-compose -f ${DOCKERDIR}/docker-compose.security.yml up + +# Druid routers for security group integration test (if needed): +docker-compose -f ${DOCKERDIR}/docker-compose.security.yml up ``` 3. Run maven command to execute tests with -Ddocker.build.skip=true -Ddocker.run.skip=true +For example: + +``` +mvn verify -P integration-tests -pl integration-tests -Dit.test=ITIndexerTest -Ddocker.build.skip=true -Ddocker.run.skip=true +``` + ## Tips & tricks for debugging and developing integration tests ### Useful mvn command flags -- -Ddocker.build.skip=true to skip build druid containers. -If you do not apply any change to druid then you can do not rebuild druid. -This can save ~4 minutes to build druid cluster and druid hadoop. +- -Ddocker.build.skip=true to skip building the containers. +If you do not apply any change to Druid then you skip rebuilding the containers. This can save ~4 minutes. You need to build druid containers only once, after you can skip docker build step. - -Ddocker.run.skip=true to skip starting docker containers. This can save ~3 minutes by skipping building and bringing up the docker containers (Druid, Kafka, Hadoop, MYSQL, zookeeper, etc). Please make sure that you actually do have these containers already running if using this flag. Additionally, please make sure that the running containers -are in the same state that the setup script (run_cluster.sh) would have brought it up in. +are in the same state that the setup script (run_cluster.sh) would have brought it up in. ### Debugging Druid while running tests diff --git a/integration-tests/build_run_cluster.sh b/integration-tests/build_run_cluster.sh index ced683eef9f1..97176e5ae865 100755 --- a/integration-tests/build_run_cluster.sh +++ b/integration-tests/build_run_cluster.sh @@ -14,6 +14,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +set -e + echo $DRUID_INTEGRATION_TEST_OVERRIDE_CONFIG_PATH export DIR=$(cd $(dirname $0) && pwd) diff --git a/integration-tests/docker/Dockerfile b/integration-tests/docker/Dockerfile index dbd8a3618241..54fab63b4fd9 100644 --- a/integration-tests/docker/Dockerfile +++ b/integration-tests/docker/Dockerfile @@ -94,11 +94,12 @@ EXPOSE 8300 8301 8302 8303 8304 8305 EXPOSE 9092 9093 WORKDIR /var/lib/druid -ENTRYPOINT /tls/generate-server-certs-and-keystores.sh \ +ENTRYPOINT /tls/generate-server-certs-and-keystores.sh \ + && . /druid.sh \ # Create druid service config files with all the config variables - && . /druid.sh; setupConfig \ + && setupConfig \ # Some test groups require pre-existing data to be setup - && . /druid.sh; setupData \ + && setupData \ # Export the service config file path to use in supervisord conf file && export DRUID_COMMON_CONF_DIR="$(. /druid.sh; getConfPath ${DRUID_SERVICE})" \ # Export the common config file path to use in supervisord conf file diff --git a/integration-tests/docker/docker-compose.base.yml b/integration-tests/docker/docker-compose.base.yml index 1f7931a623af..1dc1949203a1 100644 --- a/integration-tests/docker/docker-compose.base.yml +++ b/integration-tests/docker/docker-compose.base.yml @@ -45,7 +45,7 @@ services: - 51111:51111 networks: druid-it-net: - ipv4_address: 172.172.172.13 + ipv4_address: 172.172.172.15 privileged: true volumes: - ${HOME}/shared:/shared @@ -173,12 +173,31 @@ services: - ./environment-configs/common - ./environment-configs/middlemanager + druid-indexer: + image: druid/cluster + container_name: druid-indexer + networks: + druid-it-net: + ipv4_address: 172.172.172.8 + ports: + - 5008:5008 + - 8091:8091 + - 8291:8291 + privileged: true + volumes: + - ./../src/test/resources:/resources + - ${HOME}/shared:/shared + - ./service-supervisords/druid.conf:/usr/lib/druid/conf/druid.conf + env_file: + - ./environment-configs/common + - ./environment-configs/indexer + druid-broker: image: druid/cluster container_name: druid-broker networks: druid-it-net: - ipv4_address: 172.172.172.8 + ipv4_address: 172.172.172.9 ports: - 5005:5005 - 8082:8082 @@ -196,7 +215,7 @@ services: container_name: druid-router networks: druid-it-net: - ipv4_address: 172.172.172.9 + ipv4_address: 172.172.172.10 ports: - 5004:5004 - 8888:8888 @@ -214,7 +233,7 @@ services: container_name: druid-router-permissive-tls networks: druid-it-net: - ipv4_address: 172.172.172.10 + ipv4_address: 172.172.172.11 ports: - 5001:5001 - 8889:8889 @@ -232,7 +251,7 @@ services: container_name: druid-router-no-client-auth-tls networks: druid-it-net: - ipv4_address: 172.172.172.11 + ipv4_address: 172.172.172.12 ports: - 5002:5002 - 8890:8890 @@ -250,7 +269,7 @@ services: container_name: druid-router-custom-check-tls networks: druid-it-net: - ipv4_address: 172.172.172.12 + ipv4_address: 172.172.172.13 ports: - 5003:5003 - 8891:8891 diff --git a/integration-tests/docker/docker-compose.cli-indexer.yml b/integration-tests/docker/docker-compose.cli-indexer.yml new file mode 100644 index 000000000000..fad6ec12ce89 --- /dev/null +++ b/integration-tests/docker/docker-compose.cli-indexer.yml @@ -0,0 +1,119 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +version: "2.2" +services: + druid-zookeeper-kafka: + extends: + file: docker-compose.base.yml + service: druid-zookeeper-kafka + + druid-metadata-storage: + extends: + file: docker-compose.base.yml + service: druid-metadata-storage + environment: + - DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP} + depends_on: + - druid-zookeeper-kafka + + druid-overlord: + extends: + file: docker-compose.base.yml + service: druid-overlord + environment: + - DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP} + links: + - druid-metadata-storage:druid-metadata-storage + - druid-zookeeper-kafka:druid-zookeeper-kafka + depends_on: + - druid-metadata-storage + - druid-zookeeper-kafka + + druid-coordinator: + extends: + file: docker-compose.base.yml + service: druid-coordinator + environment: + - DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP} + links: + - druid-overlord:druid-overlord + - druid-metadata-storage:druid-metadata-storage + - druid-zookeeper-kafka:druid-zookeeper-kafka + depends_on: + - druid-overlord + - druid-metadata-storage + - druid-zookeeper-kafka + + druid-historical: + extends: + file: docker-compose.base.yml + service: druid-historical + environment: + - DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP} + links: + - druid-zookeeper-kafka:druid-zookeeper-kafka + depends_on: + - druid-zookeeper-kafka + + druid-indexer: + extends: + file: docker-compose.base.yml + service: druid-indexer + environment: + - DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP} + links: + - druid-zookeeper-kafka:druid-zookeeper-kafka + - druid-overlord:druid-overlord + depends_on: + - druid-zookeeper-kafka + - druid-overlord + + druid-broker: + extends: + file: docker-compose.base.yml + service: druid-broker + environment: + - DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP} + links: + - druid-zookeeper-kafka:druid-zookeeper-kafka + - druid-indexer:druid-indexer + - druid-historical:druid-historical + depends_on: + - druid-zookeeper-kafka + - druid-indexer + - druid-historical + + druid-router: + extends: + file: docker-compose.base.yml + service: druid-router + environment: + - DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP} + links: + - druid-zookeeper-kafka:druid-zookeeper-kafka + - druid-coordinator:druid-coordinator + - druid-broker:druid-broker + depends_on: + - druid-zookeeper-kafka + - druid-coordinator + - druid-broker + +networks: + druid-it-net: + name: druid-it-net + ipam: + config: + - subnet: 172.172.172.0/24 \ No newline at end of file diff --git a/integration-tests/docker/docker-compose.query-retry-test.yml b/integration-tests/docker/docker-compose.query-retry-test.yml index 9b5a5a6ee8b8..98d55e846708 100644 --- a/integration-tests/docker/docker-compose.query-retry-test.yml +++ b/integration-tests/docker/docker-compose.query-retry-test.yml @@ -101,7 +101,7 @@ services: container_name: druid-historical-for-query-retry-test networks: druid-it-net: - ipv4_address: 172.172.172.13 + ipv4_address: 172.172.172.14 ports: - 8084:8083 - 8284:8283 diff --git a/integration-tests/docker/druid.sh b/integration-tests/docker/druid.sh index f3269ee79abc..aff352213264 100755 --- a/integration-tests/docker/druid.sh +++ b/integration-tests/docker/druid.sh @@ -25,6 +25,7 @@ getConfPath() historical) echo $cluster_conf_base/data/historical ;; historical-for-query-retry-test) echo $cluster_conf_base/data/historical ;; middleManager) echo $cluster_conf_base/data/middleManager ;; + indexer) echo $cluster_conf_base/data/indexer ;; coordinator) echo $cluster_conf_base/master/coordinator ;; broker) echo $cluster_conf_base/query/broker ;; router) echo $cluster_conf_base/query/router ;; diff --git a/integration-tests/docker/environment-configs/broker b/integration-tests/docker/environment-configs/broker index 3f2424f5d795..20da489ea9c2 100644 --- a/integration-tests/docker/environment-configs/broker +++ b/integration-tests/docker/environment-configs/broker @@ -21,7 +21,7 @@ DRUID_SERVICE=broker DRUID_LOG_PATH=/shared/logs/broker.log # JAVA OPTS -SERVICE_DRUID_JAVA_OPTS=-server -Xmx256m -Xms256m -XX:+UseG1GC -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005 +SERVICE_DRUID_JAVA_OPTS=-server -Xmx192m -Xms192m -XX:+UseG1GC -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005 # Druid configs druid_processing_buffer_sizeBytes=25000000 diff --git a/integration-tests/docker/environment-configs/common b/integration-tests/docker/environment-configs/common index aba937b9f541..6e4849e81d33 100644 --- a/integration-tests/docker/environment-configs/common +++ b/integration-tests/docker/environment-configs/common @@ -22,7 +22,7 @@ LANGUAGE=C.UTF-8 LC_ALL=C.UTF-8 # JAVA OPTS -COMMON_DRUID_JAVA_OPTS=-Duser.timezone=UTC -Dfile.encoding=UTF-8 -Dlog4j.configurationFile=/shared/docker/lib/log4j2.xml +COMMON_DRUID_JAVA_OPTS=-Duser.timezone=UTC -Dfile.encoding=UTF-8 -Dlog4j.configurationFile=/shared/docker/lib/log4j2.xml -XX:+ExitOnOutOfMemoryError -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp DRUID_DEP_LIB_DIR=/shared/hadoop_xml:/shared/docker/lib/*:/usr/local/druid/lib/mysql-connector-java.jar # Druid configs diff --git a/integration-tests/docker/environment-configs/coordinator b/integration-tests/docker/environment-configs/coordinator index 04bfa9b6df59..3206d5403567 100644 --- a/integration-tests/docker/environment-configs/coordinator +++ b/integration-tests/docker/environment-configs/coordinator @@ -21,7 +21,7 @@ DRUID_SERVICE=coordinator DRUID_LOG_PATH=/shared/logs/coordinator.log # JAVA OPTS -SERVICE_DRUID_JAVA_OPTS=-server -Xmx128m -Xms128m -XX:+UseG1GC -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5006 +SERVICE_DRUID_JAVA_OPTS=-server -Xmx64m -Xms64m -XX:+UseG1GC -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5006 # Druid configs druid_metadata_storage_type=mysql diff --git a/integration-tests/docker/environment-configs/indexer b/integration-tests/docker/environment-configs/indexer new file mode 100644 index 000000000000..841ccacf5630 --- /dev/null +++ b/integration-tests/docker/environment-configs/indexer @@ -0,0 +1,37 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +DRUID_SERVICE=indexer +DRUID_LOG_PATH=/shared/logs/indexer.log + +# JAVA OPTS +SERVICE_DRUID_JAVA_OPTS=-server -Xmx512m -Xms512m -XX:+UseG1GC -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5008 + +# Druid configs +druid_server_http_numThreads=4 +druid_storage_storageDirectory=/shared/storage + +druid_processing_buffer_sizeBytes=25000000 +druid_processing_numThreads=1 +druid_selectors_indexing_serviceName=druid/overlord +druid_indexer_task_chathandler_type=announce +druid_auth_basic_common_cacheDirectory=/tmp/authCache/indexer +druid_startup_logging_logProperties=true +druid_server_https_crlPath=/tls/revocations.crl +druid_worker_capacity=10 diff --git a/integration-tests/docker/environment-configs/overlord b/integration-tests/docker/environment-configs/overlord index ebb3d5bf18e4..2852df8a2bee 100644 --- a/integration-tests/docker/environment-configs/overlord +++ b/integration-tests/docker/environment-configs/overlord @@ -21,7 +21,7 @@ DRUID_SERVICE=overlord DRUID_LOG_PATH=/shared/logs/overlord.log # JAVA OPTS -SERVICE_DRUID_JAVA_OPTS=-server -Xmx128m -Xms128m -XX:+UseG1GC -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5009 +SERVICE_DRUID_JAVA_OPTS=-server -Xmx64m -Xms64m -XX:+UseG1GC -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5009 # Druid configs druid_metadata_storage_type=mysql diff --git a/integration-tests/docker/environment-configs/router b/integration-tests/docker/environment-configs/router index c3f4e23d9669..1992e526ecde 100644 --- a/integration-tests/docker/environment-configs/router +++ b/integration-tests/docker/environment-configs/router @@ -21,7 +21,7 @@ DRUID_SERVICE=router DRUID_LOG_PATH=/shared/logs/router.log # JAVA OPTS -SERVICE_DRUID_JAVA_OPTS=-server -Xmx128m -XX:+UseG1GC -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5004 +SERVICE_DRUID_JAVA_OPTS=-server -Xmx64m -Xms64m -XX:+UseG1GC -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5004 # Druid configs druid_auth_basic_common_cacheDirectory=/tmp/authCache/router diff --git a/integration-tests/docker/environment-configs/router-custom-check-tls b/integration-tests/docker/environment-configs/router-custom-check-tls index ece8531d677c..d506a80a4bf6 100644 --- a/integration-tests/docker/environment-configs/router-custom-check-tls +++ b/integration-tests/docker/environment-configs/router-custom-check-tls @@ -21,7 +21,7 @@ DRUID_SERVICE=router DRUID_LOG_PATH=/shared/logs/router-custom-check-tls.log # JAVA OPTS -SERVICE_DRUID_JAVA_OPTS=-server -Xmx128m -XX:+UseConcMarkSweepGC -XX:+PrintGCDetails -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5003 +SERVICE_DRUID_JAVA_OPTS=-server -Xmx64m -Xms64m -XX:+UseConcMarkSweepGC -XX:+PrintGCDetails -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5003 # Druid configs druid_plaintextPort=8891 diff --git a/integration-tests/docker/environment-configs/router-no-client-auth-tls b/integration-tests/docker/environment-configs/router-no-client-auth-tls index 4b703bac5ee7..7d3cc8144ed6 100644 --- a/integration-tests/docker/environment-configs/router-no-client-auth-tls +++ b/integration-tests/docker/environment-configs/router-no-client-auth-tls @@ -21,7 +21,7 @@ DRUID_SERVICE=router DRUID_LOG_PATH=/shared/logs/router-no-client-auth-tls.log # JAVA OPTS -SERVICE_DRUID_JAVA_OPTS=-server -Xmx128m -XX:+UseConcMarkSweepGC -XX:+PrintGCDetails -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5002 +SERVICE_DRUID_JAVA_OPTS=-server -Xmx64m -Xms64m -XX:+UseConcMarkSweepGC -XX:+PrintGCDetails -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5002 # Druid configs druid_plaintextPort=8890 diff --git a/integration-tests/docker/environment-configs/router-permissive-tls b/integration-tests/docker/environment-configs/router-permissive-tls index 41346cb15610..d4aae6aa3ddc 100644 --- a/integration-tests/docker/environment-configs/router-permissive-tls +++ b/integration-tests/docker/environment-configs/router-permissive-tls @@ -21,7 +21,7 @@ DRUID_SERVICE=router DRUID_LOG_PATH=/shared/logs/router-permissive-tls.log # JAVA OPTS -SERVICE_DRUID_JAVA_OPTS=-server -Xmx128m -XX:+UseConcMarkSweepGC -XX:+PrintGCDetails -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5001 +SERVICE_DRUID_JAVA_OPTS=-server -Xmx64m -Xms64m -XX:+UseConcMarkSweepGC -XX:+PrintGCDetails -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5001 # Druid configs druid_plaintextPort=8889 diff --git a/integration-tests/docker/tls/set-docker-host-ip.sh b/integration-tests/docker/tls/set-docker-host-ip.sh index b7f0a300d1f6..66b14645950c 100755 --- a/integration-tests/docker/tls/set-docker-host-ip.sh +++ b/integration-tests/docker/tls/set-docker-host-ip.sh @@ -16,11 +16,17 @@ # limitations under the License. DOCKER_HOST_IP="$(host "$(hostname)" | perl -nle '/has address (.*)/ && print $1')" + if [ -z "$DOCKER_HOST_IP" ]; then # Mac specific way to get host ip DOCKER_HOST_IP="$(dscacheutil -q host -a name "$(HOSTNAME)" | perl -nle '/ip_address: (.*)/ && print $1' | tail -n1)" fi +if [ -z "$DOCKER_HOST_IP" ]; then + # Another Mac specific way, when the machine isn't able to resolve its own name + DOCKER_HOST_IP="$(ifconfig | fgrep 'inet ' | fgrep -v 127.0.0.1 | awk '{print $2}')" +fi + if [ -z "$DOCKER_HOST_IP" ]; then >&2 echo "Could not set docker host IP - integration tests can not run" exit 1 diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml index cdc384914dcf..57f78b0029d9 100644 --- a/integration-tests/pom.xml +++ b/integration-tests/pom.xml @@ -370,6 +370,7 @@ false false false + middleManager @@ -397,6 +398,7 @@ ${resource.file.dir.path} ${docker.build.skip} ${docker.run.skip} + ${it.indexer} ${project.basedir}/build_run_cluster.sh diff --git a/integration-tests/script/copy_hadoop_resources.sh b/integration-tests/script/copy_hadoop_resources.sh index 82dd0d023d59..5f4c17e5c6a5 100755 --- a/integration-tests/script/copy_hadoop_resources.sh +++ b/integration-tests/script/copy_hadoop_resources.sh @@ -14,6 +14,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +set -e + # wait for hadoop namenode to be up echo "Waiting for hadoop namenode to be up" docker exec -t druid-it-hadoop sh -c "./usr/local/hadoop/bin/hdfs dfs -mkdir -p /druid" @@ -41,4 +43,4 @@ echo "Finished setting up druid hadoop dirs" echo "Copying Hadoop XML files to shared" docker exec -t druid-it-hadoop sh -c "cp /usr/local/hadoop/etc/hadoop/*.xml /shared/hadoop_xml" -echo "Copied Hadoop XML files to shared" \ No newline at end of file +echo "Copied Hadoop XML files to shared" diff --git a/integration-tests/script/copy_resources.sh b/integration-tests/script/copy_resources.sh index 6ecd3077ba4e..6495dad44b1a 100755 --- a/integration-tests/script/copy_resources.sh +++ b/integration-tests/script/copy_resources.sh @@ -14,6 +14,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +set -e + # setup client keystore ./docker/tls/generate-client-certs-and-keystores.sh rm -rf docker/client_tls diff --git a/integration-tests/script/docker_build_containers.sh b/integration-tests/script/docker_build_containers.sh index 30aa56a8de8f..ce491ca17707 100755 --- a/integration-tests/script/docker_build_containers.sh +++ b/integration-tests/script/docker_build_containers.sh @@ -14,6 +14,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +set -e + # Build Druid Cluster Image if [ -z "$DRUID_INTEGRATION_TEST_JVM_RUNTIME" ] then diff --git a/integration-tests/script/docker_run_cluster.sh b/integration-tests/script/docker_run_cluster.sh index c5faa4aeb3c0..c5e37df74a1b 100755 --- a/integration-tests/script/docker_run_cluster.sh +++ b/integration-tests/script/docker_run_cluster.sh @@ -14,6 +14,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +set -e + # Create docker network { docker network create --subnet=172.172.172.0/24 druid-it-net @@ -45,21 +47,40 @@ fi docker-compose -f ${DOCKERDIR}/docker-compose.druid-hadoop.yml up -d fi + # Start Druid services if [ -z "$DRUID_INTEGRATION_TEST_OVERRIDE_CONFIG_PATH" ] then - if [ "$DRUID_INTEGRATION_TEST_GROUP" = "security" ] - then - # Start default Druid services and additional druid router (custom-check-tls, permissive-tls, no-client-auth-tls) - docker-compose -f ${DOCKERDIR}/docker-compose.yml -f ${DOCKERDIR}/docker-compose.security.yml up -d - elif [ "$DRUID_INTEGRATION_TEST_GROUP" = "query-retry" ] - then - # Start default Druid services with an additional historical modified for query retry test - # See CliHistoricalForQueryRetryTest. - docker-compose -f ${DOCKERDIR}/docker-compose.query-retry-test.yml up -d - else - # Start default Druid services - docker-compose -f ${DOCKERDIR}/docker-compose.yml up -d - fi + # Sanity check: DRUID_INTEGRATION_TEST_INDEXER must be "indexer" or "middleManager" + if [ "$DRUID_INTEGRATION_TEST_INDEXER" != "indexer" ] && [ "$DRUID_INTEGRATION_TEST_INDEXER" != "middleManager" ] + then + echo "DRUID_INTEGRATION_TEST_INDEXER must be 'indexer' or 'middleManager' (is '$DRUID_INTEGRATION_TEST_INDEXER')" + exit 1 + fi + + if [ "$DRUID_INTEGRATION_TEST_INDEXER" = "indexer" ] + then + # Sanity check: cannot combine CliIndexer tests with security, query-retry tests + if [ "$DRUID_INTEGRATION_TEST_GROUP" = "security" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "query-retry" ] + then + echo "Cannot run test group '$DRUID_INTEGRATION_TEST_GROUP' with CliIndexer" + exit 1 + fi + + # Replace MiddleManager with Indexer + docker-compose -f ${DOCKERDIR}/docker-compose.cli-indexer.yml up -d + elif [ "$DRUID_INTEGRATION_TEST_GROUP" = "security" ] + then + # Start default Druid services and additional druid router (custom-check-tls, permissive-tls, no-client-auth-tls) + docker-compose -f ${DOCKERDIR}/docker-compose.yml -f ${DOCKERDIR}/docker-compose.security.yml up -d + elif [ "$DRUID_INTEGRATION_TEST_GROUP" = "query-retry" ] + then + # Start default Druid services with an additional historical modified for query retry test + # See CliHistoricalForQueryRetryTest. + docker-compose -f ${DOCKERDIR}/docker-compose.query-retry-test.yml up -d + else + # Start default Druid services + docker-compose -f ${DOCKERDIR}/docker-compose.yml up -d + fi else # run druid cluster with override config OVERRIDE_ENV=$DRUID_INTEGRATION_TEST_OVERRIDE_CONFIG_PATH docker-compose -f ${DOCKERDIR}/docker-compose.override-env.yml up -d diff --git a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java index f2ca8995ca05..2be6eb3c9156 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java @@ -223,10 +223,10 @@ public String getTaskErrorMessage(String taskId) public void waitUntilTaskCompletes(final String taskID) { - waitUntilTaskCompletes(taskID, 10000, 60); + waitUntilTaskCompletes(taskID, ITRetryUtil.DEFAULT_RETRY_SLEEP, ITRetryUtil.DEFAULT_RETRY_COUNT); } - public void waitUntilTaskCompletes(final String taskID, final int millisEach, final int numTimes) + public void waitUntilTaskCompletes(final String taskID, final long millisEach, final int numTimes) { ITRetryUtil.retryUntil( new Callable() @@ -254,7 +254,7 @@ public void waitUntilTaskFails(final String taskID) } - public void waitUntilTaskFails(final String taskID, final int millisEach, final int numTimes) + public void waitUntilTaskFails(final String taskID, final long millisEach, final int numTimes) { ITRetryUtil.retryUntil( new Callable() diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/DruidClusterAdminClient.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/DruidClusterAdminClient.java index d3bbd1f0cfc0..b8a6d3ee3534 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/utils/DruidClusterAdminClient.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/DruidClusterAdminClient.java @@ -48,8 +48,8 @@ public class DruidClusterAdminClient private static final Logger LOG = new Logger(DruidClusterAdminClient.class); private static final String COORDINATOR_DOCKER_CONTAINER_NAME = "/druid-coordinator"; private static final String HISTORICAL_DOCKER_CONTAINER_NAME = "/druid-historical"; - private static final String INDEXER_DOCKER_CONTAINER_NAME = "/druid-overlord"; - private static final String BROKERR_DOCKER_CONTAINER_NAME = "/druid-broker"; + private static final String OVERLORD_DOCKER_CONTAINER_NAME = "/druid-overlord"; + private static final String BROKER_DOCKER_CONTAINER_NAME = "/druid-broker"; private static final String ROUTER_DOCKER_CONTAINER_NAME = "/druid-router"; private static final String MIDDLEMANAGER_DOCKER_CONTAINER_NAME = "/druid-middlemanager"; @@ -79,14 +79,14 @@ public void restartHistoricalContainer() restartDockerContainer(HISTORICAL_DOCKER_CONTAINER_NAME); } - public void restartIndexerContainer() + public void restartOverlordContainer() { - restartDockerContainer(INDEXER_DOCKER_CONTAINER_NAME); + restartDockerContainer(OVERLORD_DOCKER_CONTAINER_NAME); } public void restartBrokerContainer() { - restartDockerContainer(BROKERR_DOCKER_CONTAINER_NAME); + restartDockerContainer(BROKER_DOCKER_CONTAINER_NAME); } public void restartRouterContainer() diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/ITRetryUtil.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/ITRetryUtil.java index 356cfdd4fe2d..4be7f4f3be81 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/utils/ITRetryUtil.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/ITRetryUtil.java @@ -30,9 +30,9 @@ public class ITRetryUtil private static final Logger LOG = new Logger(ITRetryUtil.class); - public static final int DEFAULT_RETRY_COUNT = 30; + public static final int DEFAULT_RETRY_COUNT = 150; // 5 minutes - public static final long DEFAULT_RETRY_SLEEP = TimeUnit.SECONDS.toMillis(10); + public static final long DEFAULT_RETRY_SLEEP = TimeUnit.SECONDS.toMillis(2); public static void retryUntilTrue(Callable callable, String task) { diff --git a/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java b/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java index 7974da560d69..2f27c7faa0e1 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java @@ -54,6 +54,8 @@ public class TestNGGroup public static final String QUERY_RETRY = "query-retry"; + public static final String CLI_INDEXER = "cli-indexer"; + public static final String REALTIME_INDEX = "realtime-index"; /** diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java index 713687e60254..7285af1f3473 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java @@ -215,7 +215,7 @@ void doTestIndexDataWithLosingCoordinator(@Nullable Boolean transactionEnabled) void doTestIndexDataWithLosingOverlord(@Nullable Boolean transactionEnabled) throws Exception { testIndexWithLosingNodeHelper( - () -> druidClusterAdminClient.restartIndexerContainer(), + () -> druidClusterAdminClient.restartOverlordContainer(), () -> druidClusterAdminClient.waitUntilIndexerReady(), transactionEnabled ); diff --git a/integration-tests/src/test/resources/indexer/sys_segment_batch_index_queries.json b/integration-tests/src/test/resources/indexer/sys_segment_batch_index_queries.json index 9819ef212290..931ad8957781 100644 --- a/integration-tests/src/test/resources/indexer/sys_segment_batch_index_queries.json +++ b/integration-tests/src/test/resources/indexer/sys_segment_batch_index_queries.json @@ -11,7 +11,7 @@ }, { "query": { - "query": "SELECT server_type FROM sys.servers WHERE tier IS NOT NULL" + "query": "SELECT server_type FROM sys.servers WHERE tier IS NOT NULL AND server_type <> 'indexer'" }, "expectedResults": [ { diff --git a/integration-tests/src/test/resources/queries/sys_segment_queries.json b/integration-tests/src/test/resources/queries/sys_segment_queries.json index 3eef61f5a406..284c60272a68 100644 --- a/integration-tests/src/test/resources/queries/sys_segment_queries.json +++ b/integration-tests/src/test/resources/queries/sys_segment_queries.json @@ -16,7 +16,7 @@ }, { "query": { - "query": "SELECT server_type FROM sys.servers WHERE tier IS NOT NULL" + "query": "SELECT server_type FROM sys.servers WHERE tier IS NOT NULL AND server_type <> 'indexer'" }, "expectedResults": [ { diff --git a/integration-tests/src/test/resources/results/auth_test_sys_schema_servers.json b/integration-tests/src/test/resources/results/auth_test_sys_schema_servers.json index 05091318b585..377cd304f6c5 100644 --- a/integration-tests/src/test/resources/results/auth_test_sys_schema_servers.json +++ b/integration-tests/src/test/resources/results/auth_test_sys_schema_servers.json @@ -10,8 +10,8 @@ "max_size": 5000000000 }, { - "server": "172.172.172.8:8282", - "host": "172.172.172.8", + "server": "172.172.172.9:8282", + "host": "172.172.172.9", "plaintext_port": 8082, "tls_port": 8282, "server_type": "broker", diff --git a/integration-tests/stop_cluster.sh b/integration-tests/stop_cluster.sh index 92f9c411c946..def6fb213f88 100755 --- a/integration-tests/stop_cluster.sh +++ b/integration-tests/stop_cluster.sh @@ -14,16 +14,26 @@ # See the License for the specific language governing permissions and # limitations under the License. +set -e + # Skip stopping docker if flag set (For use during development) if [ -n "$DRUID_INTEGRATION_TEST_SKIP_RUN_DOCKER" ] && [ "$DRUID_INTEGRATION_TEST_SKIP_RUN_DOCKER" == true ] then exit 0 fi -for node in druid-historical druid-historical-for-query-retry-test druid-coordinator druid-overlord druid-router druid-router-permissive-tls druid-router-no-client-auth-tls druid-router-custom-check-tls druid-broker druid-middlemanager druid-zookeeper-kafka druid-metadata-storage druid-it-hadoop; +for node in druid-historical druid-historical-for-query-retry-test druid-coordinator druid-overlord druid-router druid-router-permissive-tls druid-router-no-client-auth-tls druid-router-custom-check-tls druid-broker druid-middlemanager druid-indexer druid-zookeeper-kafka druid-metadata-storage druid-it-hadoop; do - docker stop $node - docker rm $node + CONTAINER="$(docker ps -aq -f name=${node})" + + if [ ! -z "$CONTAINER" ] + then + docker stop $node + docker rm $node + fi done -docker network rm druid-it-net +if [ ! -z "$(docker network ls -q -f name=druid-it-net)" ] +then + docker network rm druid-it-net +fi diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java index 14996949309b..5b2c5cc332df 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java @@ -20,6 +20,7 @@ package org.apache.druid.segment.realtime.appenderator; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Iterables; import org.apache.druid.client.CachingQueryRunner; @@ -295,6 +296,12 @@ public QueryRunner getQueryRunnerForSegments(final Query query, final ); } + @VisibleForTesting + String getDataSource() + { + return dataSource; + } + /** * Decorates a Sink's query runner to emit query/segmentAndCache/time, query/segment/time, query/wait/time once * each for the whole Sink. Also adds CPU time to cpuTimeAccumulator. diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java index 02f75c79900d..087ac2f8534a 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java @@ -34,6 +34,7 @@ import org.apache.druid.indexer.partitions.PartitionsSpec; import org.apache.druid.indexing.worker.config.WorkerConfig; import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.UOE; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.logger.Logger; @@ -42,7 +43,9 @@ import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryRunnerFactoryConglomerate; import org.apache.druid.query.SegmentDescriptor; +import org.apache.druid.query.TableDataSource; import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.IndexMerger; import org.apache.druid.segment.IndexSpec; @@ -258,14 +261,7 @@ public QueryRunner getQueryRunnerForIntervals( Iterable intervals ) { - DatasourceBundle datasourceBundle; - synchronized (this) { - datasourceBundle = datasourceBundles.get(query.getDataSource().toString()); - if (datasourceBundle == null) { - throw new IAE("Could not find segment walker for datasource [%s]", query.getDataSource().toString()); - } - } - return datasourceBundle.getWalker().getQueryRunnerForIntervals(query, intervals); + return getBundle(query).getWalker().getQueryRunnerForIntervals(query, intervals); } @Override @@ -274,14 +270,29 @@ public QueryRunner getQueryRunnerForSegments( Iterable specs ) { - DatasourceBundle datasourceBundle; + return getBundle(query).getWalker().getQueryRunnerForSegments(query, specs); + } + + @VisibleForTesting + DatasourceBundle getBundle(final Query query) + { + final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource()); + + final TableDataSource table = + analysis.getBaseTableDataSource() + .orElseThrow(() -> new ISE("Cannot handle datasource: %s", analysis.getDataSource())); + + final DatasourceBundle bundle; + synchronized (this) { - datasourceBundle = datasourceBundles.get(query.getDataSource().toString()); - if (datasourceBundle == null) { - throw new IAE("Could not find segment walker for datasource [%s]", query.getDataSource().toString()); - } + bundle = datasourceBundles.get(table.getName()); + } + + if (bundle == null) { + throw new IAE("Could not find segment walker for datasource [%s]", table.getName()); } - return datasourceBundle.getWalker().getQueryRunnerForSegments(query, specs); + + return bundle; } @Override diff --git a/server/src/main/java/org/apache/druid/server/coordination/DataSegmentServerAnnouncer.java b/server/src/main/java/org/apache/druid/server/coordination/DataSegmentServerAnnouncer.java index b7ad83cfa436..3b01073bbb80 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/DataSegmentServerAnnouncer.java +++ b/server/src/main/java/org/apache/druid/server/coordination/DataSegmentServerAnnouncer.java @@ -20,7 +20,11 @@ package org.apache.druid.server.coordination; /** - * Use announcement made by {@link org.apache.druid.discovery.DruidNodeAnnouncer} + * We are gradually migrating usages of this to {@link org.apache.druid.discovery.DruidNodeAnnouncer}. + * + * However, it's still required in some cases. As of this writing (2020-12-03) it's required for any process that + * is serving queryable segments via Curator-based segment discovery. (When using Curator for segment discovery, Brokers + * look for these announcements as part of discovering what segments are available.) */ @Deprecated public interface DataSegmentServerAnnouncer diff --git a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java index e9d2f075c34e..33d4df915140 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java +++ b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java @@ -67,6 +67,7 @@ import java.util.concurrent.atomic.AtomicReference; /** + * */ @ManageLifecycle public class SegmentLoadDropHandler implements DataSegmentChangeHandler @@ -85,6 +86,7 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler private final DataSegmentServerAnnouncer serverAnnouncer; private final SegmentManager segmentManager; private final ScheduledExecutorService exec; + private final ServerTypeConfig serverTypeConfig; private final ConcurrentSkipListSet segmentsToDelete; private volatile boolean started = false; @@ -139,8 +141,9 @@ public SegmentLoadDropHandler( this.announcer = announcer; this.serverAnnouncer = serverAnnouncer; this.segmentManager = segmentManager; - this.exec = exec; + this.serverTypeConfig = serverTypeConfig; + this.segmentsToDelete = new ConcurrentSkipListSet<>(); requestStatuses = CacheBuilder.newBuilder().maximumSize(config.getStatusQueueMaxSize()).initialCapacity(8).build(); } @@ -157,6 +160,9 @@ public void start() throws IOException try { if (!config.getLocations().isEmpty()) { loadLocalCache(); + } + + if (shouldAnnounce()) { serverAnnouncer.announce(); } } @@ -179,7 +185,7 @@ public void stop() log.info("Stopping..."); try { - if (!config.getLocations().isEmpty()) { + if (shouldAnnounce()) { serverAnnouncer.unannounce(); } } @@ -258,7 +264,8 @@ private void loadLocalCache() * * @throws SegmentLoadingException if it fails to load the given segment */ - private void loadSegment(DataSegment segment, DataSegmentChangeCallback callback, boolean lazy) throws SegmentLoadingException + private void loadSegment(DataSegment segment, DataSegmentChangeCallback callback, boolean lazy) + throws SegmentLoadingException { final boolean loaded; try { @@ -566,6 +573,21 @@ private void resolveWaitingFutures() } } + /** + * Returns whether or not we should announce ourselves as a data server using {@link DataSegmentServerAnnouncer}. + * + * Returns true if _either_: + * + * (1) Our {@link #serverTypeConfig} indicates we are a segment server. This is necessary for Brokers to be able + * to detect that we exist. + * (2) We have non-empty storage locations in {@link #config}. This is necessary for Coordinators to be able to + * assign segments to us. + */ + private boolean shouldAnnounce() + { + return serverTypeConfig.getServerType().isSegmentServer() || !config.getLocations().isEmpty(); + } + private static class BackgroundSegmentAnnouncer implements AutoCloseable { private static final EmittingLogger log = new EmittingLogger(BackgroundSegmentAnnouncer.class); diff --git a/server/src/main/java/org/apache/druid/server/coordination/ServerType.java b/server/src/main/java/org/apache/druid/server/coordination/ServerType.java index 0b860a1b0afe..b9bf2f81f355 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/ServerType.java +++ b/server/src/main/java/org/apache/druid/server/coordination/ServerType.java @@ -47,14 +47,46 @@ */ public enum ServerType { - HISTORICAL, - BRIDGE, + HISTORICAL { + @Override + public boolean isSegmentReplicationTarget() + { + return true; + } + + @Override + public boolean isSegmentServer() + { + return true; + } + }, + + BRIDGE { + @Override + public boolean isSegmentReplicationTarget() + { + return true; + } + + @Override + public boolean isSegmentServer() + { + return true; + } + }, + INDEXER_EXECUTOR { @Override public boolean isSegmentReplicationTarget() { return false; } + + @Override + public boolean isSegmentServer() + { + return true; + } }, REALTIME { @@ -63,6 +95,12 @@ public boolean isSegmentReplicationTarget() { return false; } + + @Override + public boolean isSegmentServer() + { + return true; + } }, BROKER { @@ -71,6 +109,12 @@ public boolean isSegmentReplicationTarget() { return false; } + + @Override + public boolean isSegmentServer() + { + return false; + } }; /** @@ -80,10 +124,7 @@ public boolean isSegmentReplicationTarget() * * @see org.apache.druid.server.coordinator.rules.LoadRule */ - public boolean isSegmentReplicationTarget() - { - return true; - } + public abstract boolean isSegmentReplicationTarget(); /** * Indicates this type of node is able to be a target of segment broadcast. @@ -95,6 +136,13 @@ public boolean isSegmentBroadcastTarget() return true; } + /** + * Indicates this type of node is serving segments that are meant to be the target of fan-out by a Broker. + * + * Nodes that return "true" here are often referred to as "data servers" or "data server processes". + */ + public abstract boolean isSegmentServer(); + @JsonCreator public static ServerType fromString(String type) { diff --git a/server/src/main/java/org/apache/druid/server/coordination/ZkCoordinator.java b/server/src/main/java/org/apache/druid/server/coordination/ZkCoordinator.java index e1d6620875f8..49356d05975a 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/ZkCoordinator.java +++ b/server/src/main/java/org/apache/druid/server/coordination/ZkCoordinator.java @@ -38,7 +38,10 @@ import java.util.concurrent.ExecutorService; /** - * Use {@link org.apache.druid.server.coordinator.HttpLoadQueuePeon} for segment load/drops. + * We are gradually migrating to {@link org.apache.druid.server.http.SegmentListerResource} for driving segment + * loads/drops on data server processes. + * + * However, this class is still the default mechanism as of this writing (2020-12-03). */ @Deprecated public class ZkCoordinator diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManagerTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManagerTest.java new file mode 100644 index 000000000000..f7c85b2ccd08 --- /dev/null +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManagerTest.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.realtime.appenderator; + +import com.google.common.collect.ImmutableMap; +import org.apache.druid.client.cache.CacheConfig; +import org.apache.druid.client.cache.CachePopulatorStats; +import org.apache.druid.client.cache.MapCache; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.indexing.worker.config.WorkerConfig; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate; +import org.apache.druid.query.Druids; +import org.apache.druid.query.scan.ScanQuery; +import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; +import org.apache.druid.segment.TestHelper; +import org.apache.druid.segment.incremental.NoopRowIngestionMeters; +import org.apache.druid.segment.incremental.ParseExceptionHandler; +import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; +import org.apache.druid.segment.join.NoopJoinableFactory; +import org.apache.druid.segment.loading.NoopDataSegmentPusher; +import org.apache.druid.segment.realtime.FireDepartmentMetrics; +import org.apache.druid.segment.writeout.OnHeapMemorySegmentWriteOutMediumFactory; +import org.apache.druid.server.metrics.NoopServiceEmitter; +import org.easymock.EasyMock; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.util.Collections; + +public class UnifiedIndexerAppenderatorsManagerTest +{ + @Rule + public final ExpectedException expectedException = ExpectedException.none(); + + private final UnifiedIndexerAppenderatorsManager manager = new UnifiedIndexerAppenderatorsManager( + Execs.directExecutor(), + NoopJoinableFactory.INSTANCE, + new WorkerConfig(), + MapCache.create(10), + new CacheConfig(), + new CachePopulatorStats(), + TestHelper.makeJsonMapper(), + new NoopServiceEmitter(), + () -> new DefaultQueryRunnerFactoryConglomerate(ImmutableMap.of()) + ); + + private final Appenderator appenderator = manager.createOfflineAppenderatorForTask( + "taskId", + new DataSchema( + "myDataSource", + new TimestampSpec("__time", "millis", null), + null, + null, + new UniformGranularitySpec(Granularities.HOUR, Granularities.HOUR, false, Collections.emptyList()), + null + ), + EasyMock.createMock(AppenderatorConfig.class), + new FireDepartmentMetrics(), + new NoopDataSegmentPusher(), + TestHelper.makeJsonMapper(), + TestHelper.getTestIndexIO(), + TestHelper.getTestIndexMergerV9(OnHeapMemorySegmentWriteOutMediumFactory.instance()), + new NoopRowIngestionMeters(), + new ParseExceptionHandler(new NoopRowIngestionMeters(), false, 0, 0) + ); + + @Test + public void test_getBundle_knownDataSource() + { + final UnifiedIndexerAppenderatorsManager.DatasourceBundle bundle = manager.getBundle( + Druids.newScanQueryBuilder() + .dataSource(appenderator.getDataSource()) + .intervals(new MultipleIntervalSegmentSpec(Intervals.ONLY_ETERNITY)) + .build() + ); + + Assert.assertEquals("myDataSource", bundle.getWalker().getDataSource()); + } + + @Test + public void test_getBundle_unknownDataSource() + { + final ScanQuery query = Druids.newScanQueryBuilder() + .dataSource("unknown") + .intervals(new MultipleIntervalSegmentSpec(Intervals.ONLY_ETERNITY)) + .build(); + + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("Could not find segment walker for datasource"); + + manager.getBundle(query); + } +}