From fc050fca28df7cb5d3f990d7c603da9b9120d513 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Wed, 26 Dec 2018 11:00:16 -0800 Subject: [PATCH 1/7] Add support maxRowsPerSegment for auto compaction --- docs/content/configuration/index.md | 15 +- docs/content/ingestion/compaction.md | 7 +- docs/content/ingestion/native_tasks.md | 12 +- docs/content/tutorials/tutorial-batch.md | 2 +- docs/content/tutorials/tutorial-compaction.md | 4 +- .../tutorials/tutorial-ingestion-spec.md | 4 +- docs/content/tutorials/tutorial-rollup.md | 2 +- .../tutorials/tutorial-transform-spec.md | 2 +- .../tutorial/compaction-final-index.json | 2 +- .../tutorial/compaction-init-index.json | 2 +- .../quickstart/tutorial/deletion-index.json | 2 +- .../quickstart/tutorial/retention-index.json | 2 +- .../quickstart/tutorial/rollup-index.json | 2 +- .../quickstart/tutorial/transform-index.json | 2 +- .../tutorial/updates-append-index.json | 2 +- .../tutorial/updates-append-index2.json | 2 +- .../tutorial/updates-init-index.json | 2 +- .../tutorial/updates-overwrite-index.json | 2 +- .../quickstart/tutorial/wikipedia-index.json | 2 +- .../kafka/KafkaIndexTaskTuningConfigTest.java | 6 +- .../KafkaSupervisorTuningConfigTest.java | 4 +- .../KinesisIndexTaskTuningConfigTest.java | 6 +- .../KinesisSupervisorTuningConfigTest.java | 4 +- .../RealtimeAppenderatorTuningConfig.java | 2 +- .../indexing/common/task/CompactionTask.java | 208 +++++++++++-- .../druid/indexing/common/task/IndexTask.java | 96 +++--- .../batch/parallel/ParallelIndexSubTask.java | 24 +- .../parallel/ParallelIndexSupervisorTask.java | 3 +- .../parallel/ParallelIndexTuningConfig.java | 5 +- .../SeekableStreamIndexTaskTuningConfig.java | 2 +- .../common/task/CompactionTaskTest.java | 283 +++++++++++++----- .../indexing/common/task/IndexTaskTest.java | 32 +- .../indexing/common/task/TaskSerdeTest.java | 24 +- ...rallelIndexSupervisorTaskResourceTest.java | 1 + .../ParallelIndexSupervisorTaskSerdeTest.java | 1 + .../ParallelIndexSupervisorTaskTest.java | 1 + .../indexing/overlord/TaskLifecycleTest.java | 3 + .../indexer/wikipedia_index_task.json | 2 +- .../client/indexing/ClientCompactQuery.java | 18 +- .../ClientCompactQueryTuningConfig.java | 87 ++++-- .../indexing/HttpIndexingServiceClient.java | 2 +- .../indexing/IndexingServiceClient.java | 2 +- .../appenderator/AppenderatorConfig.java | 3 +- .../AppenderatorDriverAddResult.java | 15 +- .../DataSourceCompactionConfig.java | 114 ++++--- .../DruidCoordinatorSegmentCompactor.java | 6 +- .../helper/NewestSegmentFirstIterator.java | 2 +- .../indexing/NoopIndexingServiceClient.java | 2 +- .../DataSourceCompactionConfigTest.java | 106 +++++++ .../DruidCoordinatorSegmentCompactorTest.java | 4 +- .../helper/NewestSegmentFirstPolicyTest.java | 1 + 51 files changed, 779 insertions(+), 360 deletions(-) create mode 100644 server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java diff --git a/docs/content/configuration/index.md b/docs/content/configuration/index.md index 92942a068cfc..74d5f6d595f9 100644 --- a/docs/content/configuration/index.md +++ b/docs/content/configuration/index.md @@ -806,10 +806,11 @@ A description of the compaction config is: |`keepSegmentGranularity`|Set [keepSegmentGranularity](../ingestion/compaction.html) to true for compactionTask.|no (default = true)| |`taskPriority`|[Priority](../ingestion/tasks.html#task-priorities) of compact task.|no (default = 25)| |`inputSegmentSizeBytes`|Total input segment size of a compactionTask.|no (default = 419430400)| -|`targetCompactionSizeBytes`|The target segment size of compaction. The actual size of a compact segment might be slightly larger or smaller than this value.|no (default = 419430400)| +|`targetCompactionSizeBytes`|The target segment size of compaction. The actual size of a compact segment might be slightly larger or smaller than this value. This configuration cannot be used together with `maxRowsPerSegment`.|no (default = 419430400 if `maxRowsPerSegment` is not specified)| +|`maxRowsPerSegment`|Max number of rows per segment after compaction. This configuration cannot be used together with `targetCompactionSizeBytes`.|no| |`maxNumSegmentsToCompact`|Max number of segments to compact together.|no (default = 150)| |`skipOffsetFromLatest`|The offset for searching segments to be compacted. Strongly recommended to set for realtime dataSources. |no (default = "P1D")| -|`tuningConfig`|Tuning config for compact tasks. See below [Compact Task TuningConfig](#compact-task-tuningconfig).|no| +|`tuningConfig`|Tuning config for compact tasks. See [Compaction TuningConfig](#compaction-tuningconfig).|no| |`taskContext`|[Task context](../ingestion/tasks.html#task-context) for compact tasks.|no| An example of compaction config is: @@ -822,6 +823,16 @@ An example of compaction config is: For realtime dataSources, it's recommended to set `skipOffsetFromLatest` to some sufficiently large value to avoid frequent compact task failures. +##### Compaction TuningConfig + +|Property|Description|Required| +|--------|-----------|--------| +|`maxRowsInMemory`|See [tuningConfig for indexTask](../ingestion/native_tasks.html#tuningconfig)|no (default = 1000000)| +|`maxTotalRows`|See [tuningConfig for indexTask](../ingestion/native_tasks.html#tuningconfig)|no (default = 20000000)| +|`indexSpec`|See [IndexSpec](../ingestion/native_tasks.html#indexspec)|no| +|`maxPendingPersists`|See [tuningConfig for indexTask](../ingestion/native_tasks.html#tuningconfig)|no (default = 0 (meaning one persist can be running concurrently with ingestion, and none can be queued up))| +|`pushTimeout`|See [tuningConfig for indexTask](../ingestion/native_tasks.html#tuningconfig)|no (default = 0)| + ## Overlord For general Overlord Node information, see [here](../design/indexing-service.html). diff --git a/docs/content/ingestion/compaction.md b/docs/content/ingestion/compaction.md index cd7345f04f8b..2b7b4dd0f000 100644 --- a/docs/content/ingestion/compaction.md +++ b/docs/content/ingestion/compaction.md @@ -46,9 +46,10 @@ Compaction tasks merge all segments of the given interval. The syntax is: |`id`|Task id|No| |`dataSource`|DataSource name to be compacted|Yes| |`interval`|Interval of segments to be compacted|Yes| -|`dimensions`|Custom dimensionsSpec. compaction task will use this dimensionsSpec if exist instead of generating one. See below for more details.|No| +|`dimensionsSpec`|Custom dimensionsSpec. Compaction task will use this dimensionsSpec if exist instead of generating one. See below for more details.|No| +|`metricsSpec`|Custom metricsSpec. Compaction task will use this metricsSpec if specified rather than generating one.|No| |`keepSegmentGranularity`|If set to true, compactionTask will keep the time chunk boundaries and merge segments only if they fall into the same time chunk.|No (default = true)| -|`targetCompactionSizeBytes`|Target segment size after comapction. Cannot be used with `targetPartitionSize`, `maxTotalRows`, and `numShards` in tuningConfig.|No| +|`targetCompactionSizeBytes`|Target segment size after comapction. Cannot be used with `maxRowsPerSegment`, `maxTotalRows`, and `numShards` in tuningConfig.|No| |`tuningConfig`|[Index task tuningConfig](../ingestion/native_tasks.html#tuningconfig)|No| |`context`|[Task context](../ingestion/locking-and-priority.html#task-context)|No| @@ -64,7 +65,7 @@ An example of compaction task is This compaction task reads _all segments_ of the interval `2017-01-01/2018-01-01` and results in new segments. Note that intervals of the input segments are merged into a single interval of `2017-01-01/2018-01-01` no matter what the segmentGranularity was. -To control the number of result segments, you can set `targetPartitionSize` or `numShards`. See [indexTuningConfig](../ingestion/native_tasks.html#tuningconfig) for more details. +To control the number of result segments, you can set `maxRowsPerSegment` or `numShards`. See [indexTuningConfig](../ingestion/native_tasks.html#tuningconfig) for more details. To merge each day's worth of data into separate segments, you can submit multiple `compact` tasks, one for each day. They will run in parallel. A compaction task internally generates an `index` task spec for performing compaction work with some fixed parameters. diff --git a/docs/content/ingestion/native_tasks.md b/docs/content/ingestion/native_tasks.md index f4d4061c5752..1f2c45319147 100644 --- a/docs/content/ingestion/native_tasks.md +++ b/docs/content/ingestion/native_tasks.md @@ -152,11 +152,11 @@ The tuningConfig is optional and default parameters will be used if no tuningCon |property|description|default|required?| |--------|-----------|-------|---------| |type|The task type, this should always be `index_parallel`.|none|yes| -|targetPartitionSize|Used in sharding. Determines how many rows are in each segment.|5000000|no| +|maxRowsPerSegment|Used in sharding. Determines how many rows are in each segment.|5000000|no| |maxRowsInMemory|Used in determining when intermediate persists to disk should occur. Normally user does not need to set this, but depending on the nature of data, if rows are short in terms of bytes, user may not want to store a million rows in memory and this value should be set.|1000000|no| |maxBytesInMemory|Used in determining when intermediate persists to disk should occur. Normally this is computed internally and user does not need to set it. This value represents number of bytes to aggregate in heap memory before persisting. This is based on a rough estimate of memory usage and not actual usage. The maximum heap memory usage for indexing is maxBytesInMemory * (2 + maxPendingPersists)|1/6 of max JVM memory|no| |maxTotalRows|Total number of rows in segments waiting for being pushed. Used in determining when intermediate pushing should occur.|20000000|no| -|numShards|Directly specify the number of shards to create. If this is specified and 'intervals' is specified in the granularitySpec, the index task can skip the determine intervals/partitions pass through the data. numShards cannot be specified if targetPartitionSize is set.|null|no| +|numShards|Directly specify the number of shards to create. If this is specified and 'intervals' is specified in the granularitySpec, the index task can skip the determine intervals/partitions pass through the data. numShards cannot be specified if maxRowsPerSegment is set.|null|no| |indexSpec|defines segment storage format options to be used at indexing time, see [IndexSpec](#indexspec)|null|no| |maxPendingPersists|Maximum number of persists that can be pending but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|0 (meaning one persist can be running concurrently with ingestion, and none can be queued up)|no| |forceExtendableShardSpecs|Forces use of extendable shardSpecs. Experimental feature intended for use with the [Kafka indexing service extension](../development/extensions-core/kafka-ingestion.html).|false|no| @@ -337,7 +337,7 @@ An example of the result is }, "tuningConfig": { "type": "index_parallel", - "targetPartitionSize": 5000000, + "maxRowsPerSegment": 5000000, "maxRowsInMemory": 1000000, "maxTotalRows": 20000000, "numShards": null, @@ -454,7 +454,7 @@ The Local Index Task is designed to be used for smaller data sets. The task exec }, "tuningConfig" : { "type" : "index", - "targetPartitionSize" : 5000000, + "maxRowsPerSegment" : 5000000, "maxRowsInMemory" : 1000000 } } @@ -491,11 +491,11 @@ The tuningConfig is optional and default parameters will be used if no tuningCon |property|description|default|required?| |--------|-----------|-------|---------| |type|The task type, this should always be "index".|none|yes| -|targetPartitionSize|Used in sharding. Determines how many rows are in each segment.|5000000|no| +|maxRowsPerSegment|Used in sharding. Determines how many rows are in each segment.|5000000|no| |maxRowsInMemory|Used in determining when intermediate persists to disk should occur. Normally user does not need to set this, but depending on the nature of data, if rows are short in terms of bytes, user may not want to store a million rows in memory and this value should be set.|1000000|no| |maxBytesInMemory|Used in determining when intermediate persists to disk should occur. Normally this is computed internally and user does not need to set it. This value represents number of bytes to aggregate in heap memory before persisting. This is based on a rough estimate of memory usage and not actual usage. The maximum heap memory usage for indexing is maxBytesInMemory * (2 + maxPendingPersists)|1/6 of max JVM memory|no| |maxTotalRows|Total number of rows in segments waiting for being pushed. Used in determining when intermediate pushing should occur.|20000000|no| -|numShards|Directly specify the number of shards to create. If this is specified and 'intervals' is specified in the granularitySpec, the index task can skip the determine intervals/partitions pass through the data. numShards cannot be specified if targetPartitionSize is set.|null|no| +|numShards|Directly specify the number of shards to create. If this is specified and 'intervals' is specified in the granularitySpec, the index task can skip the determine intervals/partitions pass through the data. numShards cannot be specified if maxRowsPerSegment is set.|null|no| |partitionDimensions|The dimensions to partition on. Leave blank to select all dimensions. Only used with `forceGuaranteedRollup` = true, will be ignored otherwise.|null|no| |indexSpec|defines segment storage format options to be used at indexing time, see [IndexSpec](#indexspec)|null|no| |maxPendingPersists|Maximum number of persists that can be pending but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|0 (meaning one persist can be running concurrently with ingestion, and none can be queued up)|no| diff --git a/docs/content/tutorials/tutorial-batch.md b/docs/content/tutorials/tutorial-batch.md index ebe10bb39b05..972569f49d1b 100644 --- a/docs/content/tutorials/tutorial-batch.md +++ b/docs/content/tutorials/tutorial-batch.md @@ -98,7 +98,7 @@ which has been configured to read the `quickstart/tutorial/wikiticker-2015-09-12 }, "tuningConfig" : { "type" : "index", - "targetPartitionSize" : 5000000, + "maxRowsPerSegment" : 5000000, "maxRowsInMemory" : 25000, "forceExtendableShardSpecs" : true } diff --git a/docs/content/tutorials/tutorial-compaction.md b/docs/content/tutorials/tutorial-compaction.md index ac4445e91ca5..daf1f3c851f8 100644 --- a/docs/content/tutorials/tutorial-compaction.md +++ b/docs/content/tutorials/tutorial-compaction.md @@ -74,7 +74,7 @@ We have included a compaction task spec for this tutorial datasource at `quickst "interval": "2015-09-12/2015-09-13", "tuningConfig" : { "type" : "index", - "targetPartitionSize" : 5000000, + "maxRowsPerSegment" : 5000000, "maxRowsInMemory" : 25000, "forceExtendableShardSpecs" : true } @@ -85,7 +85,7 @@ This will compact all segments for the interval `2015-09-12/2015-09-13` in the ` The parameters in the `tuningConfig` control how many segments will be present in the compacted set of segments. -In this tutorial example, only one compacted segment will be created, as the 39244 rows in the input is less than the 5000000 `targetPartitionSize`. +In this tutorial example, only one compacted segment will be created, as the 39244 rows in the input is less than the 5000000 `maxRowsPerSegment`. Let's submit this task now: diff --git a/docs/content/tutorials/tutorial-ingestion-spec.md b/docs/content/tutorials/tutorial-ingestion-spec.md index 1c939aee900e..3311a6f22ffe 100644 --- a/docs/content/tutorials/tutorial-ingestion-spec.md +++ b/docs/content/tutorials/tutorial-ingestion-spec.md @@ -564,7 +564,7 @@ As an example, let's add a `tuningConfig` that sets a target segment size for th ```json "tuningConfig" : { "type" : "index", - "targetPartitionSize" : 5000000 + "maxRowsPerSegment" : 5000000 } ``` @@ -623,7 +623,7 @@ We've finished defining the ingestion spec, it should now look like the followin }, "tuningConfig" : { "type" : "index", - "targetPartitionSize" : 5000000 + "maxRowsPerSegment" : 5000000 } } } diff --git a/docs/content/tutorials/tutorial-rollup.md b/docs/content/tutorials/tutorial-rollup.md index 1828dba76e0f..013fa0ecadab 100644 --- a/docs/content/tutorials/tutorial-rollup.md +++ b/docs/content/tutorials/tutorial-rollup.md @@ -99,7 +99,7 @@ We'll ingest this data using the following ingestion task spec, located at `quic }, "tuningConfig" : { "type" : "index", - "targetPartitionSize" : 5000000, + "maxRowsPerSegment" : 5000000, "maxRowsInMemory" : 25000, "forceExtendableShardSpecs" : true } diff --git a/docs/content/tutorials/tutorial-transform-spec.md b/docs/content/tutorials/tutorial-transform-spec.md index ba9933c4a231..60d7bf868487 100644 --- a/docs/content/tutorials/tutorial-transform-spec.md +++ b/docs/content/tutorials/tutorial-transform-spec.md @@ -114,7 +114,7 @@ We will ingest the sample data using the following spec, which demonstrates the }, "tuningConfig" : { "type" : "index", - "targetPartitionSize" : 5000000, + "maxRowsPerSegment" : 5000000, "maxRowsInMemory" : 25000, "forceExtendableShardSpecs" : true } diff --git a/examples/quickstart/tutorial/compaction-final-index.json b/examples/quickstart/tutorial/compaction-final-index.json index b84e2ed78d50..a7ff42253c4b 100644 --- a/examples/quickstart/tutorial/compaction-final-index.json +++ b/examples/quickstart/tutorial/compaction-final-index.json @@ -4,7 +4,7 @@ "interval": "2015-09-12/2015-09-13", "tuningConfig" : { "type" : "index", - "targetPartitionSize" : 5000000, + "maxRowsPerSegment" : 5000000, "maxRowsInMemory" : 25000, "forceExtendableShardSpecs" : true } diff --git a/examples/quickstart/tutorial/compaction-init-index.json b/examples/quickstart/tutorial/compaction-init-index.json index 74a00df2d83a..90ee82657568 100644 --- a/examples/quickstart/tutorial/compaction-init-index.json +++ b/examples/quickstart/tutorial/compaction-init-index.json @@ -56,7 +56,7 @@ }, "tuningConfig" : { "type" : "index", - "targetPartitionSize" : 5000000, + "maxRowsPerSegment" : 5000000, "maxRowsInMemory" : 25000, "forceExtendableShardSpecs" : true } diff --git a/examples/quickstart/tutorial/deletion-index.json b/examples/quickstart/tutorial/deletion-index.json index 57d1ccd351cc..d84dc21e4c45 100644 --- a/examples/quickstart/tutorial/deletion-index.json +++ b/examples/quickstart/tutorial/deletion-index.json @@ -56,7 +56,7 @@ }, "tuningConfig" : { "type" : "index", - "targetPartitionSize" : 5000000, + "maxRowsPerSegment" : 5000000, "maxRowsInMemory" : 25000, "forceExtendableShardSpecs" : true } diff --git a/examples/quickstart/tutorial/retention-index.json b/examples/quickstart/tutorial/retention-index.json index 613ddcfb562d..9aee62a48d8c 100644 --- a/examples/quickstart/tutorial/retention-index.json +++ b/examples/quickstart/tutorial/retention-index.json @@ -56,7 +56,7 @@ }, "tuningConfig" : { "type" : "index", - "targetPartitionSize" : 5000000, + "maxRowsPerSegment" : 5000000, "maxRowsInMemory" : 25000, "forceExtendableShardSpecs" : true } diff --git a/examples/quickstart/tutorial/rollup-index.json b/examples/quickstart/tutorial/rollup-index.json index 482c75104aca..b4d96aad26e4 100644 --- a/examples/quickstart/tutorial/rollup-index.json +++ b/examples/quickstart/tutorial/rollup-index.json @@ -43,7 +43,7 @@ }, "tuningConfig" : { "type" : "index", - "targetPartitionSize" : 5000000, + "maxRowsPerSegment" : 5000000, "maxRowsInMemory" : 25000, "forceExtendableShardSpecs" : true } diff --git a/examples/quickstart/tutorial/transform-index.json b/examples/quickstart/tutorial/transform-index.json index 0dfcef3e8603..ea8ead758e99 100644 --- a/examples/quickstart/tutorial/transform-index.json +++ b/examples/quickstart/tutorial/transform-index.json @@ -65,7 +65,7 @@ }, "tuningConfig" : { "type" : "index", - "targetPartitionSize" : 5000000, + "maxRowsPerSegment" : 5000000, "maxRowsInMemory" : 25000, "forceExtendableShardSpecs" : true } diff --git a/examples/quickstart/tutorial/updates-append-index.json b/examples/quickstart/tutorial/updates-append-index.json index dfa9887d75f6..b8de06ae439a 100644 --- a/examples/quickstart/tutorial/updates-append-index.json +++ b/examples/quickstart/tutorial/updates-append-index.json @@ -51,7 +51,7 @@ }, "tuningConfig" : { "type" : "index", - "targetPartitionSize" : 5000000, + "maxRowsPerSegment" : 5000000, "maxRowsInMemory" : 25000, "forceExtendableShardSpecs" : true } diff --git a/examples/quickstart/tutorial/updates-append-index2.json b/examples/quickstart/tutorial/updates-append-index2.json index 0e7404ae9556..a5c49c494066 100644 --- a/examples/quickstart/tutorial/updates-append-index2.json +++ b/examples/quickstart/tutorial/updates-append-index2.json @@ -41,7 +41,7 @@ }, "tuningConfig" : { "type" : "index", - "targetPartitionSize" : 5000000, + "maxRowsPerSegment" : 5000000, "maxRowsInMemory" : 25000, "forceExtendableShardSpecs" : true } diff --git a/examples/quickstart/tutorial/updates-init-index.json b/examples/quickstart/tutorial/updates-init-index.json index 52a4aef7e8f3..3620ff16d886 100644 --- a/examples/quickstart/tutorial/updates-init-index.json +++ b/examples/quickstart/tutorial/updates-init-index.json @@ -41,7 +41,7 @@ }, "tuningConfig" : { "type" : "index", - "targetPartitionSize" : 5000000, + "maxRowsPerSegment" : 5000000, "maxRowsInMemory" : 25000, "forceExtendableShardSpecs" : true } diff --git a/examples/quickstart/tutorial/updates-overwrite-index.json b/examples/quickstart/tutorial/updates-overwrite-index.json index ac4785e749b8..95f2e2dbd773 100644 --- a/examples/quickstart/tutorial/updates-overwrite-index.json +++ b/examples/quickstart/tutorial/updates-overwrite-index.json @@ -41,7 +41,7 @@ }, "tuningConfig" : { "type" : "index", - "targetPartitionSize" : 5000000, + "maxRowsPerSegment" : 5000000, "maxRowsInMemory" : 25000, "forceExtendableShardSpecs" : true } diff --git a/examples/quickstart/tutorial/wikipedia-index.json b/examples/quickstart/tutorial/wikipedia-index.json index 02177d142c23..888e08ef5301 100644 --- a/examples/quickstart/tutorial/wikipedia-index.json +++ b/examples/quickstart/tutorial/wikipedia-index.json @@ -56,7 +56,7 @@ }, "tuningConfig" : { "type" : "index", - "targetPartitionSize" : 5000000, + "maxRowsPerSegment" : 5000000, "maxRowsInMemory" : 25000, "forceExtendableShardSpecs" : true } diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java index abcf07f6c10b..14ecd4e90e82 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java @@ -58,7 +58,7 @@ public void testSerdeWithDefaults() throws Exception Assert.assertNotNull(config.getBasePersistDirectory()); Assert.assertEquals(1000000, config.getMaxRowsInMemory()); - Assert.assertEquals(5_000_000, config.getMaxRowsPerSegment()); + Assert.assertEquals(5_000_000, config.getMaxRowsPerSegment().intValue()); Assert.assertEquals(null, config.getMaxTotalRows()); Assert.assertEquals(new Period("PT10M"), config.getIntermediatePersistPeriod()); Assert.assertEquals(0, config.getMaxPendingPersists()); @@ -94,7 +94,7 @@ public void testSerdeWithNonDefaults() throws Exception Assert.assertEquals(new File("/tmp/xxx"), config.getBasePersistDirectory()); Assert.assertEquals(100, config.getMaxRowsInMemory()); - Assert.assertEquals(100, config.getMaxRowsPerSegment()); + Assert.assertEquals(100, config.getMaxRowsPerSegment().intValue()); Assert.assertNotEquals(null, config.getMaxTotalRows()); Assert.assertEquals(1000, config.getMaxTotalRows().longValue()); Assert.assertEquals(new Period("PT1H"), config.getIntermediatePersistPeriod()); @@ -134,7 +134,7 @@ public void testConvert() KafkaIndexTaskTuningConfig copy = (KafkaIndexTaskTuningConfig) original.convertToTaskTuningConfig(); Assert.assertEquals(1, copy.getMaxRowsInMemory()); - Assert.assertEquals(2, copy.getMaxRowsPerSegment()); + Assert.assertEquals(2, copy.getMaxRowsPerSegment().intValue()); Assert.assertNotEquals(null, copy.getMaxTotalRows()); Assert.assertEquals(10L, copy.getMaxTotalRows().longValue()); Assert.assertEquals(new Period("PT3S"), copy.getIntermediatePersistPeriod()); diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java index 45470ffe10de..3312a10aa944 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java @@ -59,7 +59,7 @@ public void testSerdeWithDefaults() throws Exception Assert.assertNotNull(config.getBasePersistDirectory()); Assert.assertEquals(1000000, config.getMaxRowsInMemory()); - Assert.assertEquals(5_000_000, config.getMaxRowsPerSegment()); + Assert.assertEquals(5_000_000, config.getMaxRowsPerSegment().intValue()); Assert.assertEquals(new Period("PT10M"), config.getIntermediatePersistPeriod()); Assert.assertEquals(0, config.getMaxPendingPersists()); Assert.assertEquals(new IndexSpec(), config.getIndexSpec()); @@ -105,7 +105,7 @@ public void testSerdeWithNonDefaults() throws Exception Assert.assertEquals(new File("/tmp/xxx"), config.getBasePersistDirectory()); Assert.assertEquals(100, config.getMaxRowsInMemory()); - Assert.assertEquals(100, config.getMaxRowsPerSegment()); + Assert.assertEquals(100, config.getMaxRowsPerSegment().intValue()); Assert.assertEquals(new Period("PT1H"), config.getIntermediatePersistPeriod()); Assert.assertEquals(100, config.getMaxPendingPersists()); Assert.assertEquals(true, config.isReportParseExceptions()); diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java index 4e967c414a9f..2983324b0ce4 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java @@ -65,7 +65,7 @@ public void testSerdeWithDefaults() throws Exception Assert.assertNotNull(config.getBasePersistDirectory()); Assert.assertEquals(1000000, config.getMaxRowsInMemory()); - Assert.assertEquals(5_000_000, config.getMaxRowsPerSegment()); + Assert.assertEquals(5_000_000, config.getMaxRowsPerSegment().intValue()); Assert.assertEquals(new Period("PT10M"), config.getIntermediatePersistPeriod()); Assert.assertEquals(0, config.getMaxPendingPersists()); Assert.assertEquals(new IndexSpec(), config.getIndexSpec()); @@ -115,7 +115,7 @@ public void testSerdeWithNonDefaults() throws Exception Assert.assertEquals(new File("/tmp/xxx"), config.getBasePersistDirectory()); Assert.assertEquals(100, config.getMaxRowsInMemory()); - Assert.assertEquals(100, config.getMaxRowsPerSegment()); + Assert.assertEquals(100, config.getMaxRowsPerSegment().intValue()); Assert.assertEquals(new Period("PT1H"), config.getIntermediatePersistPeriod()); Assert.assertEquals(100, config.getMaxPendingPersists()); Assert.assertTrue(config.getBuildV9Directly()); @@ -197,7 +197,7 @@ public void testConvert() Assert.assertEquals(1, copy.getMaxRowsInMemory()); Assert.assertEquals(3, copy.getMaxBytesInMemory()); - Assert.assertEquals(2, copy.getMaxRowsPerSegment()); + Assert.assertEquals(2, copy.getMaxRowsPerSegment().intValue()); Assert.assertEquals(100L, (long) copy.getMaxTotalRows()); Assert.assertEquals(new Period("PT3S"), copy.getIntermediatePersistPeriod()); Assert.assertEquals(new File("/tmp/xxx"), copy.getBasePersistDirectory()); diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfigTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfigTest.java index 370f3ea2ea0d..45151b51bdb7 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfigTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfigTest.java @@ -59,7 +59,7 @@ public void testSerdeWithDefaults() throws Exception Assert.assertNotNull(config.getBasePersistDirectory()); Assert.assertEquals(1000000, config.getMaxRowsInMemory()); - Assert.assertEquals(5_000_000, config.getMaxRowsPerSegment()); + Assert.assertEquals(5_000_000, config.getMaxRowsPerSegment().intValue()); Assert.assertEquals(new Period("PT10M"), config.getIntermediatePersistPeriod()); Assert.assertEquals(0, config.getMaxPendingPersists()); Assert.assertEquals(new IndexSpec(), config.getIndexSpec()); @@ -105,7 +105,7 @@ public void testSerdeWithNonDefaults() throws Exception Assert.assertEquals(new File("/tmp/xxx"), config.getBasePersistDirectory()); Assert.assertEquals(100, config.getMaxRowsInMemory()); - Assert.assertEquals(100, config.getMaxRowsPerSegment()); + Assert.assertEquals(100, config.getMaxRowsPerSegment().intValue()); Assert.assertEquals(new Period("PT1H"), config.getIntermediatePersistPeriod()); Assert.assertEquals(100, config.getMaxPendingPersists()); Assert.assertEquals(true, config.getBuildV9Directly()); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java index 6f2052dbef72..c05520267585 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java @@ -149,7 +149,7 @@ public long getMaxBytesInMemory() @Override @JsonProperty - public int getMaxRowsPerSegment() + public Integer getMaxRowsPerSegment() { return maxRowsPerSegment; } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index 3c6992374f9c..018a85b1861b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -103,7 +103,10 @@ public class CompactionTask extends AbstractTask private final Interval interval; private final List segments; + @Nullable private final DimensionsSpec dimensionsSpec; + @Nullable + private final AggregatorFactory[] metricsSpec; private final boolean keepSegmentGranularity; @Nullable private final Long targetCompactionSizeBytes; @@ -134,7 +137,9 @@ public CompactionTask( @JsonProperty("dataSource") final String dataSource, @Nullable @JsonProperty("interval") final Interval interval, @Nullable @JsonProperty("segments") final List segments, - @Nullable @JsonProperty("dimensions") final DimensionsSpec dimensionsSpec, + @Deprecated @Nullable @JsonProperty("dimensions") final DimensionsSpec dimensions, + @Nullable @JsonProperty("dimensionsSpec") final DimensionsSpec dimensionsSpec, + @Nullable @JsonProperty("metricsSpec") final AggregatorFactory[] metricsSpec, @Nullable @JsonProperty("keepSegmentGranularity") final Boolean keepSegmentGranularity, @Nullable @JsonProperty("targetCompactionSizeBytes") final Long targetCompactionSizeBytes, @Nullable @JsonProperty("tuningConfig") final IndexTuningConfig tuningConfig, @@ -155,7 +160,8 @@ public CompactionTask( this.interval = interval; this.segments = segments; - this.dimensionsSpec = dimensionsSpec; + this.dimensionsSpec = dimensionsSpec == null ? dimensions : dimensionsSpec; + this.metricsSpec = metricsSpec; this.keepSegmentGranularity = keepSegmentGranularity == null ? DEFAULT_KEEP_SEGMENT_GRANULARITY : keepSegmentGranularity; @@ -163,7 +169,7 @@ public CompactionTask( this.tuningConfig = tuningConfig; this.jsonMapper = jsonMapper; this.segmentProvider = segments == null ? new SegmentProvider(dataSource, interval) : new SegmentProvider(segments); - this.partitionConfigurationManager = new PartitionConfigurationManager(this.targetCompactionSizeBytes, tuningConfig); + this.partitionConfigurationManager = new PartitionConfigurationManager(targetCompactionSizeBytes, tuningConfig); this.authorizerMapper = authorizerMapper; this.chatHandlerProvider = chatHandlerProvider; this.rowIngestionMetersFactory = rowIngestionMetersFactory; @@ -187,6 +193,12 @@ public DimensionsSpec getDimensionsSpec() return dimensionsSpec; } + @JsonProperty + public AggregatorFactory[] getMetricsSpec() + { + return metricsSpec; + } + @JsonProperty public boolean isKeepSegmentGranularity() { @@ -242,6 +254,7 @@ public TaskStatus run(final TaskToolbox toolbox) throws Exception segmentProvider, partitionConfigurationManager, dimensionsSpec, + metricsSpec, keepSegmentGranularity, jsonMapper ).stream() @@ -299,7 +312,8 @@ static List createIngestionSchema( final TaskToolbox toolbox, final SegmentProvider segmentProvider, final PartitionConfigurationManager partitionConfigurationManager, - final DimensionsSpec dimensionsSpec, + @Nullable final DimensionsSpec dimensionsSpec, + @Nullable final AggregatorFactory[] metricsSpec, final boolean keepSegmentGranularity, final ObjectMapper jsonMapper ) throws IOException, SegmentLoadingException @@ -353,6 +367,7 @@ static List createIngestionSchema( interval, segmentsToCompact, dimensionsSpec, + metricsSpec, jsonMapper ); @@ -372,6 +387,7 @@ static List createIngestionSchema( segmentProvider.interval, queryableIndexAndSegments, dimensionsSpec, + metricsSpec, jsonMapper ); @@ -418,26 +434,18 @@ private static DataSchema createDataSchema( String dataSource, Interval totalInterval, List> queryableIndexAndSegments, - DimensionsSpec dimensionsSpec, + @Nullable DimensionsSpec dimensionsSpec, + @Nullable AggregatorFactory[] metricsSpec, ObjectMapper jsonMapper ) { - // find merged aggregators + // check index metadata for (Pair pair : queryableIndexAndSegments) { final QueryableIndex index = pair.lhs; if (index.getMetadata() == null) { throw new RE("Index metadata doesn't exist for segment[%s]", pair.rhs.getIdentifier()); } } - final List aggregatorFactories = queryableIndexAndSegments - .stream() - .map(pair -> pair.lhs.getMetadata().getAggregators()) // We have already done null check on index.getMetadata() - .collect(Collectors.toList()); - final AggregatorFactory[] mergedAggregators = AggregatorFactory.mergeAggregators(aggregatorFactories); - - if (mergedAggregators == null) { - throw new ISE("Failed to merge aggregators[%s]", aggregatorFactories); - } // find granularity spec // set rollup only if rollup is set for all segments @@ -454,21 +462,47 @@ private static DataSchema createDataSchema( ); // find unique dimensions - final DimensionsSpec finalDimensionsSpec = dimensionsSpec == null ? - createDimensionsSpec(queryableIndexAndSegments) : - dimensionsSpec; + final DimensionsSpec finalDimensionsSpec = dimensionsSpec == null + ? createDimensionsSpec(queryableIndexAndSegments) + : dimensionsSpec; + final AggregatorFactory[] finalMetricsSpec = metricsSpec == null + ? createMetricsSpec(queryableIndexAndSegments) + : convertToCombiningFactories(metricsSpec); final InputRowParser parser = new NoopInputRowParser(new TimeAndDimsParseSpec(null, finalDimensionsSpec)); return new DataSchema( dataSource, jsonMapper.convertValue(parser, JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT), - mergedAggregators, + finalMetricsSpec, granularitySpec, null, jsonMapper ); } + private static AggregatorFactory[] createMetricsSpec( + List> queryableIndexAndSegments + ) + { + final List aggregatorFactories = queryableIndexAndSegments + .stream() + .map(pair -> pair.lhs.getMetadata().getAggregators()) // We have already done null check on index.getMetadata() + .collect(Collectors.toList()); + final AggregatorFactory[] mergedAggregators = AggregatorFactory.mergeAggregators(aggregatorFactories); + + if (mergedAggregators == null) { + throw new ISE("Failed to merge aggregators[%s]", aggregatorFactories); + } + return mergedAggregators; + } + + private static AggregatorFactory[] convertToCombiningFactories(AggregatorFactory[] metricsSpec) + { + return Arrays.stream(metricsSpec) + .map(AggregatorFactory::getCombiningFactory) + .toArray(AggregatorFactory[]::new); + } + private static DimensionsSpec createDimensionsSpec(List> queryableIndices) { final BiMap uniqueDims = HashBiMap.create(); @@ -693,7 +727,7 @@ IndexTuningConfig computeTuningConfig(List> qu targetCompactionSizeBytes, "targetCompactionSizeBytes" ); - // Find IndexTuningConfig.targetPartitionSize which is the number of rows per segment. + // Find IndexTuningConfig.maxRowsPerSegment which is the number of rows per segment. // Assume that the segment size is proportional to the number of rows. We can improve this later. final long totalNumRows = queryableIndexAndSegments .stream() @@ -709,17 +743,17 @@ IndexTuningConfig computeTuningConfig(List> qu } final double avgRowsPerByte = totalNumRows / (double) totalSizeBytes; - final int targetPartitionSize = Math.toIntExact(Math.round(avgRowsPerByte * nonNullTargetCompactionSizeBytes)); - Preconditions.checkState(targetPartitionSize > 0, "Negative targetPartitionSize[%s]", targetPartitionSize); + final int maxRowsPerSegment = Math.toIntExact(Math.round(avgRowsPerByte * nonNullTargetCompactionSizeBytes)); + Preconditions.checkState(maxRowsPerSegment > 0, "Negative maxRowsPerSegment[%s]", maxRowsPerSegment); log.info( - "Estimated targetPartitionSize[%d] = avgRowsPerByte[%f] * targetCompactionSizeBytes[%d]", - targetPartitionSize, + "Estimated maxRowsPerSegment[%d] = avgRowsPerByte[%f] * targetCompactionSizeBytes[%d]", + maxRowsPerSegment, avgRowsPerByte, nonNullTargetCompactionSizeBytes ); return (tuningConfig == null ? IndexTuningConfig.createDefault() : tuningConfig) - .withTargetPartitionSize(targetPartitionSize); + .withMaxRowsPerSegment(maxRowsPerSegment); } else { return tuningConfig; } @@ -727,7 +761,7 @@ IndexTuningConfig computeTuningConfig(List> qu /** * Check the validity of {@link #targetCompactionSizeBytes} and return a valid value. Note that - * targetCompactionSizeBytes cannot be used with {@link IndexTuningConfig#targetPartitionSize}, + * targetCompactionSizeBytes cannot be used with {@link IndexTuningConfig#maxRowsPerSegment}, * {@link IndexTuningConfig#maxTotalRows}, or {@link IndexTuningConfig#numShards} together. * {@link #hasPartitionConfig} checks one of those configs is set. * @@ -742,15 +776,15 @@ private static Long getValidTargetCompactionSizeBytes( @Nullable IndexTuningConfig tuningConfig ) { - if (targetCompactionSizeBytes != null) { + if (targetCompactionSizeBytes != null && tuningConfig != null) { Preconditions.checkArgument( !hasPartitionConfig(tuningConfig), - "targetCompactionSizeBytes[%s] cannot be used with targetPartitionSize[%s], maxTotalRows[%s]," + "targetCompactionSizeBytes[%s] cannot be used with maxRowsPerSegment[%s], maxTotalRows[%s]," + " or numShards[%s] of tuningConfig", targetCompactionSizeBytes, - tuningConfig == null ? null : tuningConfig.getTargetPartitionSize(), - tuningConfig == null ? null : tuningConfig.getMaxTotalRows(), - tuningConfig == null ? null : tuningConfig.getNumShards() + tuningConfig.getMaxRowsPerSegment(), + tuningConfig.getMaxTotalRows(), + tuningConfig.getNumShards() ); return targetCompactionSizeBytes; } else { @@ -763,7 +797,7 @@ private static Long getValidTargetCompactionSizeBytes( private static boolean hasPartitionConfig(@Nullable IndexTuningConfig tuningConfig) { if (tuningConfig != null) { - return tuningConfig.getTargetPartitionSize() != null + return tuningConfig.getMaxRowsPerSegment() != null || tuningConfig.getMaxTotalRows() != null || tuningConfig.getNumShards() != null; } else { @@ -771,4 +805,114 @@ private static boolean hasPartitionConfig(@Nullable IndexTuningConfig tuningConf } } } + + public static class Builder + { + private final String dataSource; + private final ObjectMapper jsonMapper; + private final AuthorizerMapper authorizerMapper; + private final ChatHandlerProvider chatHandlerProvider; + private final RowIngestionMetersFactory rowIngestionMetersFactory; + + @Nullable + private Interval interval; + @Nullable + private List segments; + @Nullable + private DimensionsSpec dimensionsSpec; + @Nullable + private AggregatorFactory[] metricsSpec; + private boolean keepSegmentGranularity; + @Nullable + private Long targetCompactionSizeBytes; + @Nullable + private IndexTuningConfig tuningConfig; + @Nullable + private Map context; + + public Builder( + String dataSource, + ObjectMapper jsonMapper, + AuthorizerMapper authorizerMapper, + ChatHandlerProvider chatHandlerProvider, + RowIngestionMetersFactory rowIngestionMetersFactory + ) + { + this.dataSource = dataSource; + this.jsonMapper = jsonMapper; + this.authorizerMapper = authorizerMapper; + this.chatHandlerProvider = chatHandlerProvider; + this.rowIngestionMetersFactory = rowIngestionMetersFactory; + } + + public Builder interval(Interval interval) + { + this.interval = interval; + return this; + } + + public Builder segments(List segments) + { + this.segments = segments; + return this; + } + + public Builder dimensionsSpec(DimensionsSpec dimensionsSpec) + { + this.dimensionsSpec = dimensionsSpec; + return this; + } + + public Builder metricsSpec(AggregatorFactory[] metricsSpec) + { + this.metricsSpec = metricsSpec; + return this; + } + + public Builder keepSegmentGranularity(boolean keepSegmentGranularity) + { + this.keepSegmentGranularity = keepSegmentGranularity; + return this; + } + + public Builder targetCompactionSizeBytes(long targetCompactionSizeBytes) + { + this.targetCompactionSizeBytes = targetCompactionSizeBytes; + return this; + } + + public Builder tuningConfig(IndexTuningConfig tuningConfig) + { + this.tuningConfig = tuningConfig; + return this; + } + + public Builder context(Map context) + { + this.context = context; + return this; + } + + public CompactionTask build() + { + return new CompactionTask( + null, + null, + dataSource, + interval, + segments, + null, + dimensionsSpec, + metricsSpec, + keepSegmentGranularity, + targetCompactionSizeBytes, + tuningConfig, + context, + jsonMapper, + authorizerMapper, + chatHandlerProvider, + rowIngestionMetersFactory + ); + } + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index c1e1397d1b01..42b45fdad899 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -430,9 +430,9 @@ public TaskStatus run(final TaskToolbox toolbox) // Initialize maxRowsPerSegment and maxTotalRows lazily final IndexTuningConfig tuningConfig = ingestionSchema.tuningConfig; - @Nullable final Integer targetPartitionSize = getValidTargetPartitionSize(tuningConfig); + @Nullable final Integer maxRowsPerSegment = getValidMaxRowsPerSegment(tuningConfig); @Nullable final Long maxTotalRows = getValidMaxTotalRows(tuningConfig); - final ShardSpecs shardSpecs = determineShardSpecs(toolbox, firehoseFactory, firehoseTempDir, targetPartitionSize); + final ShardSpecs shardSpecs = determineShardSpecs(toolbox, firehoseFactory, firehoseTempDir, maxRowsPerSegment); final DataSchema dataSchema; final Map versions; if (determineIntervals) { @@ -469,7 +469,7 @@ public TaskStatus run(final TaskToolbox toolbox) versions, firehoseFactory, firehoseTempDir, - targetPartitionSize, + maxRowsPerSegment, maxTotalRows ); } @@ -598,7 +598,7 @@ private ShardSpecs determineShardSpecs( final TaskToolbox toolbox, final FirehoseFactory firehoseFactory, final File firehoseTempDir, - @Nullable final Integer targetPartitionSize + @Nullable final Integer maxRowsPerSegment ) throws IOException { final ObjectMapper jsonMapper = toolbox.getObjectMapper(); @@ -634,7 +634,7 @@ private ShardSpecs determineShardSpecs( tuningConfig, determineIntervals, determineNumPartitions, - targetPartitionSize + maxRowsPerSegment ); } } @@ -684,7 +684,7 @@ private ShardSpecs createShardSpecsFromInput( IndexTuningConfig tuningConfig, boolean determineIntervals, boolean determineNumPartitions, - @Nullable Integer targetPartitionSize + @Nullable Integer maxRowsPerSegment ) throws IOException { log.info("Determining intervals and shardSpecs"); @@ -710,7 +710,7 @@ private ShardSpecs createShardSpecsFromInput( if (determineNumPartitions) { final long numRows = Preconditions.checkNotNull(collector, "HLL collector").estimateCardinalityRound(); numShards = (int) Math.ceil( - (double) numRows / Preconditions.checkNotNull(targetPartitionSize, "targetPartitionSize") + (double) numRows / Preconditions.checkNotNull(maxRowsPerSegment, "maxRowsPerSegment") ); log.info("Estimated [%,d] rows of data for interval [%s], creating [%,d] shards", numRows, interval, numShards); } else { @@ -866,7 +866,7 @@ private static BiFunction getShardSpecCreateFunctio * *
    *
  • - * If the number of rows in a segment exceeds {@link IndexTuningConfig#targetPartitionSize} + * If the number of rows in a segment exceeds {@link IndexTuningConfig#maxRowsPerSegment} *
  • *
  • * If the number of rows added to {@link BaseAppenderatorDriver} so far exceeds {@link IndexTuningConfig#maxTotalRows} @@ -884,7 +884,7 @@ private TaskStatus generateAndPublishSegments( final Map versions, final FirehoseFactory firehoseFactory, final File firehoseTempDir, - @Nullable final Integer targetPartitionSize, + @Nullable final Integer maxRowsPerSegment, @Nullable final Long maxTotalRows ) throws IOException, InterruptedException { @@ -1031,9 +1031,7 @@ private TaskStatus generateAndPublishSegments( if (addResult.isOk()) { // incremental segment publishment is allowed only when rollup don't have to be perfect. - if (!isGuaranteedRollup && - (exceedMaxRowsInSegment(targetPartitionSize, addResult.getNumRowsInSegment()) || - exceedMaxRowsInAppenderator(maxTotalRows, addResult.getTotalNumRowsInAppenderator()))) { + if (!isGuaranteedRollup && addResult.isPushRequired(maxRowsPerSegment, maxTotalRows)) { // There can be some segments waiting for being published even though any rows won't be added to them. // If those segments are not published here, the available space in appenderator will be kept to be small // which makes the size of segments smaller. @@ -1099,17 +1097,17 @@ private TaskStatus generateAndPublishSegments( /** * Return the valid target partition size. If {@link IndexTuningConfig#numShards} is valid, this returns null. - * Otherwise, this returns {@link IndexTuningConfig#DEFAULT_TARGET_PARTITION_SIZE} or the given - * {@link IndexTuningConfig#targetPartitionSize}. + * Otherwise, this returns {@link IndexTuningConfig#DEFAULT_MAX_ROWS_PER_SEGMENT} or the given + * {@link IndexTuningConfig#maxRowsPerSegment}. */ - public static Integer getValidTargetPartitionSize(IndexTuningConfig tuningConfig) + public static Integer getValidMaxRowsPerSegment(IndexTuningConfig tuningConfig) { @Nullable final Integer numShards = tuningConfig.numShards; - @Nullable final Integer targetPartitionSize = tuningConfig.targetPartitionSize; + @Nullable final Integer maxRowsPerSegment = tuningConfig.maxRowsPerSegment; if (numShards == null || numShards == -1) { - return targetPartitionSize == null || targetPartitionSize.equals(-1) - ? IndexTuningConfig.DEFAULT_TARGET_PARTITION_SIZE - : targetPartitionSize; + return maxRowsPerSegment == null || maxRowsPerSegment.equals(-1) + ? IndexTuningConfig.DEFAULT_MAX_ROWS_PER_SEGMENT + : maxRowsPerSegment; } else { return null; } @@ -1154,23 +1152,6 @@ private void handleParseException(ParseException e) } } - private static boolean exceedMaxRowsInSegment( - @Nullable Integer maxRowsInSegment, // maxRowsInSegment can be null if numShards is set in indexTuningConfig - int numRowsInSegment - ) - { - return maxRowsInSegment != null && maxRowsInSegment <= numRowsInSegment; - } - - private static boolean exceedMaxRowsInAppenderator( - // maxRowsInAppenderator can be null if numShards is set in indexTuningConfig - @Nullable final Long maxRowsInAppenderator, - long numRowsInAppenderator - ) - { - return maxRowsInAppenderator != null && maxRowsInAppenderator <= numRowsInAppenderator; - } - private static SegmentsAndMetadata awaitPublish( ListenableFuture publishFuture, long publishTimeout @@ -1336,7 +1317,7 @@ public boolean isAppendToExisting() @JsonTypeName("index") public static class IndexTuningConfig implements TuningConfig, AppenderatorConfig { - static final int DEFAULT_TARGET_PARTITION_SIZE = 5_000_000; + static final int DEFAULT_MAX_ROWS_PER_SEGMENT = 5_000_000; static final int DEFAULT_MAX_TOTAL_ROWS = 20_000_000; private static final IndexSpec DEFAULT_INDEX_SPEC = new IndexSpec(); @@ -1347,7 +1328,7 @@ public static class IndexTuningConfig implements TuningConfig, AppenderatorConfi private static final long DEFAULT_PUSH_TIMEOUT = 0; @Nullable - private final Integer targetPartitionSize; + private final Integer maxRowsPerSegment; private final int maxRowsInMemory; private final long maxBytesInMemory; @Nullable @@ -1388,7 +1369,8 @@ public static IndexTuningConfig createDefault() @JsonCreator public IndexTuningConfig( - @JsonProperty("targetPartitionSize") @Nullable Integer targetPartitionSize, + @JsonProperty("targetPartitionSize") @Deprecated @Nullable Integer targetPartitionSize, + @JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment, @JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory, @JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory, @JsonProperty("maxTotalRows") @Nullable Long maxTotalRows, @@ -1412,7 +1394,7 @@ public IndexTuningConfig( ) { this( - targetPartitionSize, + maxRowsPerSegment == null ? targetPartitionSize : maxRowsPerSegment, maxRowsInMemory != null ? maxRowsInMemory : rowFlushBoundary_forBackCompatibility, maxBytesInMemory != null ? maxBytesInMemory : 0, maxTotalRows, @@ -1430,6 +1412,11 @@ public IndexTuningConfig( maxParseExceptions, maxSavedParseExceptions ); + + Preconditions.checkArgument( + targetPartitionSize == null || maxRowsPerSegment == null, + "Can't use targetPartitionSize and maxRowsPerSegment together" + ); } private IndexTuningConfig() @@ -1438,7 +1425,7 @@ private IndexTuningConfig() } private IndexTuningConfig( - @Nullable Integer targetPartitionSize, + @Nullable Integer maxRowsPerSegment, @Nullable Integer maxRowsInMemory, @Nullable Long maxBytesInMemory, @Nullable Long maxTotalRows, @@ -1458,13 +1445,13 @@ private IndexTuningConfig( ) { Preconditions.checkArgument( - targetPartitionSize == null || targetPartitionSize.equals(-1) || numShards == null || numShards.equals(-1), - "targetPartitionSize and numShards cannot both be set" + maxRowsPerSegment == null || maxRowsPerSegment.equals(-1) || numShards == null || numShards.equals(-1), + "maxRowsPerSegment and numShards cannot both be set" ); - this.targetPartitionSize = (targetPartitionSize != null && targetPartitionSize == -1) + this.maxRowsPerSegment = (maxRowsPerSegment != null && maxRowsPerSegment == -1) ? null - : targetPartitionSize; + : maxRowsPerSegment; this.maxRowsInMemory = maxRowsInMemory == null ? TuningConfig.DEFAULT_MAX_ROWS_IN_MEMORY : maxRowsInMemory; // initializing this to 0, it will be lazily initialized to a value // @see server.src.main.java.org.apache.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long) @@ -1505,7 +1492,7 @@ private IndexTuningConfig( public IndexTuningConfig withBasePersistDirectory(File dir) { return new IndexTuningConfig( - targetPartitionSize, + maxRowsPerSegment, maxRowsInMemory, maxBytesInMemory, maxTotalRows, @@ -1525,10 +1512,10 @@ public IndexTuningConfig withBasePersistDirectory(File dir) ); } - public IndexTuningConfig withTargetPartitionSize(int targetPartitionSize) + public IndexTuningConfig withMaxRowsPerSegment(int maxRowsPerSegment) { return new IndexTuningConfig( - targetPartitionSize, + maxRowsPerSegment, maxRowsInMemory, maxBytesInMemory, maxTotalRows, @@ -1550,13 +1537,14 @@ public IndexTuningConfig withTargetPartitionSize(int targetPartitionSize) /** * Return the target number of rows per segment. This returns null if it's not specified in tuningConfig. - * Please use {@link IndexTask#getValidTargetPartitionSize} instead to get the valid value. + * Please use {@link IndexTask#getValidMaxRowsPerSegment} instead to get the valid value. */ @Nullable @JsonProperty - public Integer getTargetPartitionSize() + @Override + public Integer getMaxRowsPerSegment() { - return targetPartitionSize; + return maxRowsPerSegment; } @JsonProperty @@ -1701,7 +1689,7 @@ public boolean equals(Object o) forceGuaranteedRollup == that.forceGuaranteedRollup && reportParseExceptions == that.reportParseExceptions && pushTimeout == that.pushTimeout && - Objects.equals(targetPartitionSize, that.targetPartitionSize) && + Objects.equals(maxRowsPerSegment, that.maxRowsPerSegment) && Objects.equals(numShards, that.numShards) && Objects.equals(indexSpec, that.indexSpec) && Objects.equals(basePersistDirectory, that.basePersistDirectory) && @@ -1715,7 +1703,7 @@ public boolean equals(Object o) public int hashCode() { return Objects.hash( - targetPartitionSize, + maxRowsPerSegment, maxRowsInMemory, maxTotalRows, numShards, @@ -1737,7 +1725,7 @@ public int hashCode() public String toString() { return "IndexTuningConfig{" + - "targetPartitionSize=" + targetPartitionSize + + "maxRowsPerSegment=" + maxRowsPerSegment + ", maxRowsInMemory=" + maxRowsInMemory + ", maxBytesInMemory=" + maxBytesInMemory + ", maxTotalRows=" + maxTotalRows + diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java index be72fc4b7561..18e87d758b22 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java @@ -269,7 +269,7 @@ private SegmentAllocator createSegmentAllocator( * *
      *
    • - * If the number of rows in a segment exceeds {@link ParallelIndexTuningConfig#targetPartitionSize} + * If the number of rows in a segment exceeds {@link ParallelIndexTuningConfig#maxRowsPerSegment} *
    • *
    • * If the number of rows added to {@link BaseAppenderatorDriver} so far exceeds {@link ParallelIndexTuningConfig#maxTotalRows} @@ -304,7 +304,7 @@ private List generateAndPushSegments( // Initialize maxRowsPerSegment and maxTotalRows lazily final ParallelIndexTuningConfig tuningConfig = ingestionSchema.getTuningConfig(); - @Nullable final Integer targetPartitionSize = IndexTask.getValidTargetPartitionSize(tuningConfig); + @Nullable final Integer maxRowsPerSegment = IndexTask.getValidMaxRowsPerSegment(tuningConfig); @Nullable final Long maxTotalRows = IndexTask.getValidMaxTotalRows(tuningConfig); final long pushTimeout = tuningConfig.getPushTimeout(); final boolean explicitIntervals = granularitySpec.bucketIntervals().isPresent(); @@ -350,8 +350,7 @@ private List generateAndPushSegments( final AppenderatorDriverAddResult addResult = driver.add(inputRow, sequenceName); if (addResult.isOk()) { - if (exceedMaxRowsInSegment(targetPartitionSize, addResult.getNumRowsInSegment()) || - exceedMaxRowsInAppenderator(maxTotalRows, addResult.getTotalNumRowsInAppenderator())) { + if (addResult.isPushRequired(maxRowsPerSegment, maxTotalRows)) { // There can be some segments waiting for being published even though any rows won't be added to them. // If those segments are not published here, the available space in appenderator will be kept to be small // which makes the size of segments smaller. @@ -385,23 +384,6 @@ private List generateAndPushSegments( } } - private static boolean exceedMaxRowsInSegment( - @Nullable Integer maxRowsInSegment, // maxRowsInSegment can be null if numShards is set in indexTuningConfig - int numRowsInSegment - ) - { - return maxRowsInSegment != null && maxRowsInSegment <= numRowsInSegment; - } - - private static boolean exceedMaxRowsInAppenderator( - // maxRowsInAppenderator can be null if numShards is set in indexTuningConfig - @Nullable Long maxRowsInAppenderator, - long numRowsInAppenderator - ) - { - return maxRowsInAppenderator != null && maxRowsInAppenderator <= numRowsInAppenderator; - } - private static Appenderator newAppenderator( FireDepartmentMetrics metrics, TaskToolbox toolbox, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index aecbbc006893..53f05c635471 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -290,7 +290,8 @@ private TaskStatus runSequential(TaskToolbox toolbox) private static IndexTuningConfig convertToIndexTuningConfig(ParallelIndexTuningConfig tuningConfig) { return new IndexTuningConfig( - tuningConfig.getTargetPartitionSize(), + null, + tuningConfig.getMaxRowsPerSegment(), tuningConfig.getMaxRowsInMemory(), tuningConfig.getMaxBytesInMemory(), tuningConfig.getMaxTotalRows(), diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java index 8f6239dbca2c..85929dbd880c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java @@ -70,13 +70,15 @@ public static ParallelIndexTuningConfig defaultConfig() null, null, null, + null, null ); } @JsonCreator public ParallelIndexTuningConfig( - @JsonProperty("targetPartitionSize") @Nullable Integer targetPartitionSize, + @JsonProperty("targetPartitionSize") @Deprecated @Nullable Integer targetPartitionSize, + @JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment, @JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory, @JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory, @JsonProperty("maxTotalRows") @Nullable Long maxTotalRows, @@ -100,6 +102,7 @@ public ParallelIndexTuningConfig( { super( targetPartitionSize, + maxRowsPerSegment, maxRowsInMemory, maxBytesInMemory, maxTotalRows, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java index 816949ce4e16..3ebfe5915aa8 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java @@ -147,7 +147,7 @@ public long getMaxBytesInMemory() @Override @JsonProperty - public int getMaxRowsPerSegment() + public Integer getMaxRowsPerSegment() { return maxRowsPerSegment; } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java index 5f0946071459..8a859ee76e12 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java @@ -47,6 +47,7 @@ import org.apache.druid.indexing.common.actions.TaskAction; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory; +import org.apache.druid.indexing.common.task.CompactionTask.Builder; import org.apache.druid.indexing.common.task.CompactionTask.PartitionConfigurationManager; import org.apache.druid.indexing.common.task.CompactionTask.SegmentProvider; import org.apache.druid.indexing.common.task.IndexTask.IndexIOConfig; @@ -61,6 +62,8 @@ import org.apache.druid.java.util.common.guava.Comparators; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.CountAggregatorFactory; +import org.apache.druid.query.aggregation.DoubleMaxAggregatorFactory; +import org.apache.druid.query.aggregation.FloatMinAggregatorFactory; import org.apache.druid.query.aggregation.LongMaxAggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; import org.apache.druid.query.aggregation.first.FloatFirstAggregatorFactory; @@ -118,7 +121,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.Set; +import java.util.Optional; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -153,7 +156,7 @@ public class CompactionTaskTest private static final IndexTuningConfig TUNING_CONFIG = createTuningConfig(); private static Map DIMENSIONS; - private static Map AGGREGATORS; + private static List AGGREGATORS; private static List SEGMENTS; private static RowIngestionMetersFactory rowIngestionMetersFactory = new TestUtils().getRowIngestionMetersFactory(); private static ObjectMapper objectMapper = setupInjectablesInObjectMapper(new DefaultObjectMapper()); @@ -167,7 +170,7 @@ public class CompactionTaskTest public static void setupClass() { DIMENSIONS = new HashMap<>(); - AGGREGATORS = new HashMap<>(); + AGGREGATORS = new ArrayList<>(); DIMENSIONS.put(ColumnHolder.TIME_COLUMN_NAME, new LongDimensionSchema(ColumnHolder.TIME_COLUMN_NAME)); DIMENSIONS.put(TIMESTAMP_COLUMN, new LongDimensionSchema(TIMESTAMP_COLUMN)); @@ -192,11 +195,11 @@ public static void setupClass() DIMENSIONS.put(schema.getName(), schema); } - AGGREGATORS.put("agg_0", new CountAggregatorFactory("agg_0")); - AGGREGATORS.put("agg_1", new LongSumAggregatorFactory("agg_1", "long_dim_1")); - AGGREGATORS.put("agg_2", new LongMaxAggregatorFactory("agg_2", "long_dim_2")); - AGGREGATORS.put("agg_3", new FloatFirstAggregatorFactory("agg_3", "float_dim_3")); - AGGREGATORS.put("agg_4", new DoubleLastAggregatorFactory("agg_4", "double_dim_4")); + AGGREGATORS.add(new CountAggregatorFactory("agg_0")); + AGGREGATORS.add(new LongSumAggregatorFactory("agg_1", "long_dim_1")); + AGGREGATORS.add(new LongMaxAggregatorFactory("agg_2", "long_dim_2")); + AGGREGATORS.add(new FloatFirstAggregatorFactory("agg_3", "float_dim_3")); + AGGREGATORS.add(new DoubleLastAggregatorFactory("agg_4", "double_dim_4")); segmentMap = new HashMap<>(5); for (int i = 0; i < 5; i++) { @@ -208,7 +211,7 @@ public static void setupClass() "version", ImmutableMap.of(), findDimensions(i, segmentInterval), - new ArrayList<>(AGGREGATORS.keySet()), + AGGREGATORS.stream().map(AggregatorFactory::getName).collect(Collectors.toList()), new NumberedShardSpec(0, 1), 0, SEGMENT_SIZE_BYTES @@ -269,7 +272,8 @@ private static List findDimensions(int startIndex, Interval segmentInter private static IndexTuningConfig createTuningConfig() { return new IndexTuningConfig( - null, // null to compute targetPartitionSize automatically + null, + null, // null to compute maxRowsPerSegment automatically 500000, 1000000L, null, @@ -326,64 +330,88 @@ public CompactionTaskTest(boolean keepSegmentGranularity) @Test public void testSerdeWithInterval() throws IOException { - final CompactionTask task = new CompactionTask( - null, - null, + final Builder builder = new Builder( DATA_SOURCE, - COMPACTION_INTERVAL, - null, - null, - null, - null, - createTuningConfig(), - ImmutableMap.of("testKey", "testContext"), objectMapper, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, rowIngestionMetersFactory ); + final CompactionTask task = builder + .interval(COMPACTION_INTERVAL) + .tuningConfig(createTuningConfig()) + .context(ImmutableMap.of("testKey", "testContext")) + .build(); + final byte[] bytes = objectMapper.writeValueAsBytes(task); final CompactionTask fromJson = objectMapper.readValue(bytes, CompactionTask.class); - Assert.assertEquals(task.getType(), fromJson.getType()); - Assert.assertEquals(task.getDataSource(), fromJson.getDataSource()); - Assert.assertEquals(task.getInterval(), fromJson.getInterval()); - Assert.assertEquals(task.getSegments(), fromJson.getSegments()); - Assert.assertEquals(task.getDimensionsSpec(), fromJson.getDimensionsSpec()); - Assert.assertEquals(task.getTuningConfig(), fromJson.getTuningConfig()); - Assert.assertEquals(task.getContext(), fromJson.getContext()); - Assert.assertNull(fromJson.getSegmentProvider().getSegments()); + assertEquals(task, fromJson); } @Test public void testSerdeWithSegments() throws IOException { - final CompactionTask task = new CompactionTask( - null, - null, + final Builder builder = new Builder( DATA_SOURCE, + objectMapper, + AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, - SEGMENTS, - null, - null, - null, - createTuningConfig(), - ImmutableMap.of("testKey", "testContext"), + rowIngestionMetersFactory + ); + final CompactionTask task = builder + .segments(SEGMENTS) + .tuningConfig(createTuningConfig()) + .context(ImmutableMap.of("testKey", "testContext")) + .build(); + + final byte[] bytes = objectMapper.writeValueAsBytes(task); + final CompactionTask fromJson = objectMapper.readValue(bytes, CompactionTask.class); + assertEquals(task, fromJson); + } + + @Test + public void testSerdeWithDimensions() throws IOException + { + final Builder builder = new Builder( + DATA_SOURCE, objectMapper, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, rowIngestionMetersFactory ); + + final CompactionTask task = builder + .segments(SEGMENTS) + .dimensionsSpec( + new DimensionsSpec( + ImmutableList.of( + new StringDimensionSchema("dim1"), + new StringDimensionSchema("dim2"), + new StringDimensionSchema("dim3") + ) + ) + ) + .tuningConfig(createTuningConfig()) + .context(ImmutableMap.of("testKey", "testVal")) + .build(); + final byte[] bytes = objectMapper.writeValueAsBytes(task); final CompactionTask fromJson = objectMapper.readValue(bytes, CompactionTask.class); - Assert.assertEquals(task.getType(), fromJson.getType()); - Assert.assertEquals(task.getDataSource(), fromJson.getDataSource()); - Assert.assertEquals(task.getInterval(), fromJson.getInterval()); - Assert.assertEquals(task.getSegments(), fromJson.getSegments()); - Assert.assertEquals(task.getDimensionsSpec(), fromJson.getDimensionsSpec()); - Assert.assertEquals(task.isKeepSegmentGranularity(), fromJson.isKeepSegmentGranularity()); - Assert.assertEquals(task.getTargetCompactionSizeBytes(), fromJson.getTargetCompactionSizeBytes()); - Assert.assertEquals(task.getTuningConfig(), fromJson.getTuningConfig()); - Assert.assertEquals(task.getContext(), fromJson.getContext()); + assertEquals(task, fromJson); + } + + private static void assertEquals(CompactionTask expected, CompactionTask actual) + { + Assert.assertEquals(expected.getType(), actual.getType()); + Assert.assertEquals(expected.getDataSource(), actual.getDataSource()); + Assert.assertEquals(expected.getInterval(), actual.getInterval()); + Assert.assertEquals(expected.getSegments(), actual.getSegments()); + Assert.assertEquals(expected.getDimensionsSpec(), actual.getDimensionsSpec()); + Assert.assertTrue(Arrays.equals(expected.getMetricsSpec(), actual.getMetricsSpec())); + Assert.assertEquals(expected.isKeepSegmentGranularity(), actual.isKeepSegmentGranularity()); + Assert.assertEquals(expected.getTargetCompactionSizeBytes(), actual.getTargetCompactionSizeBytes()); + Assert.assertEquals(expected.getTuningConfig(), actual.getTuningConfig()); + Assert.assertEquals(expected.getContext(), actual.getContext()); } @Test @@ -394,6 +422,7 @@ public void testCreateIngestionSchema() throws IOException, SegmentLoadingExcept new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL), new PartitionConfigurationManager(null, TUNING_CONFIG), null, + null, keepSegmentGranularity, objectMapper ); @@ -409,10 +438,15 @@ public void testCreateIngestionSchema() throws IOException, SegmentLoadingExcept ) ); Assert.assertEquals(5, ingestionSpecs.size()); - assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, SEGMENT_INTERVALS); + assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, AGGREGATORS, SEGMENT_INTERVALS); } else { Assert.assertEquals(1, ingestionSpecs.size()); - assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, Collections.singletonList(COMPACTION_INTERVAL)); + assertIngestionSchema( + ingestionSpecs, + expectedDimensionsSpec, + AGGREGATORS, + Collections.singletonList(COMPACTION_INTERVAL) + ); } } @@ -420,6 +454,7 @@ public void testCreateIngestionSchema() throws IOException, SegmentLoadingExcept public void testCreateIngestionSchemaWithTargetPartitionSize() throws IOException, SegmentLoadingException { final IndexTuningConfig tuningConfig = new IndexTuningConfig( + null, 5, 500000, 1000000L, @@ -450,6 +485,7 @@ public void testCreateIngestionSchemaWithTargetPartitionSize() throws IOExceptio new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL), new PartitionConfigurationManager(null, tuningConfig), null, + null, keepSegmentGranularity, objectMapper ); @@ -465,12 +501,19 @@ public void testCreateIngestionSchemaWithTargetPartitionSize() throws IOExceptio ) ); Assert.assertEquals(5, ingestionSpecs.size()); - assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, SEGMENT_INTERVALS, tuningConfig); + assertIngestionSchema( + ingestionSpecs, + expectedDimensionsSpec, + AGGREGATORS, + SEGMENT_INTERVALS, + tuningConfig + ); } else { Assert.assertEquals(1, ingestionSpecs.size()); assertIngestionSchema( ingestionSpecs, expectedDimensionsSpec, + AGGREGATORS, Collections.singletonList(COMPACTION_INTERVAL), tuningConfig ); @@ -481,6 +524,7 @@ public void testCreateIngestionSchemaWithTargetPartitionSize() throws IOExceptio public void testCreateIngestionSchemaWithMaxTotalRows() throws IOException, SegmentLoadingException { final IndexTuningConfig tuningConfig = new IndexTuningConfig( + null, null, 500000, 1000000L, @@ -511,6 +555,7 @@ public void testCreateIngestionSchemaWithMaxTotalRows() throws IOException, Segm new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL), new PartitionConfigurationManager(null, tuningConfig), null, + null, keepSegmentGranularity, objectMapper ); @@ -526,12 +571,19 @@ public void testCreateIngestionSchemaWithMaxTotalRows() throws IOException, Segm ) ); Assert.assertEquals(5, ingestionSpecs.size()); - assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, SEGMENT_INTERVALS, tuningConfig); + assertIngestionSchema( + ingestionSpecs, + expectedDimensionsSpec, + AGGREGATORS, + SEGMENT_INTERVALS, + tuningConfig + ); } else { Assert.assertEquals(1, ingestionSpecs.size()); assertIngestionSchema( ingestionSpecs, expectedDimensionsSpec, + AGGREGATORS, Collections.singletonList(COMPACTION_INTERVAL), tuningConfig ); @@ -542,6 +594,7 @@ public void testCreateIngestionSchemaWithMaxTotalRows() throws IOException, Segm public void testCreateIngestionSchemaWithNumShards() throws IOException, SegmentLoadingException { final IndexTuningConfig tuningConfig = new IndexTuningConfig( + null, null, 500000, 1000000L, @@ -572,6 +625,7 @@ public void testCreateIngestionSchemaWithNumShards() throws IOException, Segment new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL), new PartitionConfigurationManager(null, tuningConfig), null, + null, keepSegmentGranularity, objectMapper ); @@ -587,12 +641,19 @@ public void testCreateIngestionSchemaWithNumShards() throws IOException, Segment ) ); Assert.assertEquals(5, ingestionSpecs.size()); - assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, SEGMENT_INTERVALS, tuningConfig); + assertIngestionSchema( + ingestionSpecs, + expectedDimensionsSpec, + AGGREGATORS, + SEGMENT_INTERVALS, + tuningConfig + ); } else { Assert.assertEquals(1, ingestionSpecs.size()); assertIngestionSchema( ingestionSpecs, expectedDimensionsSpec, + AGGREGATORS, Collections.singletonList(COMPACTION_INTERVAL), tuningConfig ); @@ -634,6 +695,7 @@ public void testCreateIngestionSchemaWithCustomDimensionsSpec() throws IOExcepti new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL), new PartitionConfigurationManager(null, TUNING_CONFIG), customSpec, + null, keepSegmentGranularity, objectMapper ); @@ -651,6 +713,7 @@ public void testCreateIngestionSchemaWithCustomDimensionsSpec() throws IOExcepti assertIngestionSchema( ingestionSpecs, dimensionsSpecs, + AGGREGATORS, SEGMENT_INTERVALS ); } else { @@ -658,6 +721,56 @@ public void testCreateIngestionSchemaWithCustomDimensionsSpec() throws IOExcepti assertIngestionSchema( ingestionSpecs, Collections.singletonList(customSpec), + AGGREGATORS, + Collections.singletonList(COMPACTION_INTERVAL) + ); + } + } + + @Test + public void testCreateIngestionSchemaWithCustomMetricsSpec() throws IOException, SegmentLoadingException + { + final AggregatorFactory[] customMetricsSpec = new AggregatorFactory[]{ + new CountAggregatorFactory("custom_count"), + new LongSumAggregatorFactory("custom_long_sum", "agg_1"), + new FloatMinAggregatorFactory("custom_float_min", "agg_3"), + new DoubleMaxAggregatorFactory("custom_double_max", "agg_4") + }; + + final List ingestionSpecs = CompactionTask.createIngestionSchema( + toolbox, + new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL), + new PartitionConfigurationManager(null, TUNING_CONFIG), + null, + customMetricsSpec, + keepSegmentGranularity, + objectMapper + ); + + final List expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration( + keepSegmentGranularity + ); + + if (keepSegmentGranularity) { + ingestionSpecs.sort( + (s1, s2) -> Comparators.intervalsByStartThenEnd().compare( + s1.getDataSchema().getGranularitySpec().inputIntervals().get(0), + s2.getDataSchema().getGranularitySpec().inputIntervals().get(0) + ) + ); + Assert.assertEquals(5, ingestionSpecs.size()); + assertIngestionSchema( + ingestionSpecs, + expectedDimensionsSpec, + Lists.newArrayList(customMetricsSpec), + SEGMENT_INTERVALS + ); + } else { + Assert.assertEquals(1, ingestionSpecs.size()); + assertIngestionSchema( + ingestionSpecs, + expectedDimensionsSpec, + Lists.newArrayList(customMetricsSpec), Collections.singletonList(COMPACTION_INTERVAL) ); } @@ -671,6 +784,7 @@ public void testCreateIngestionSchemaWithCustomSegments() throws IOException, Se new SegmentProvider(SEGMENTS), new PartitionConfigurationManager(null, TUNING_CONFIG), null, + null, keepSegmentGranularity, objectMapper ); @@ -686,10 +800,15 @@ public void testCreateIngestionSchemaWithCustomSegments() throws IOException, Se ) ); Assert.assertEquals(5, ingestionSpecs.size()); - assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, SEGMENT_INTERVALS); + assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, AGGREGATORS, SEGMENT_INTERVALS); } else { Assert.assertEquals(1, ingestionSpecs.size()); - assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, Collections.singletonList(COMPACTION_INTERVAL)); + assertIngestionSchema( + ingestionSpecs, + expectedDimensionsSpec, + AGGREGATORS, + Collections.singletonList(COMPACTION_INTERVAL) + ); } } @@ -708,6 +827,7 @@ public void testCreateIngestionSchemaWithDifferentSegmentSet() throws IOExceptio new SegmentProvider(segments), new PartitionConfigurationManager(null, TUNING_CONFIG), null, + null, keepSegmentGranularity, objectMapper ); @@ -727,6 +847,7 @@ public void testMissingMetadata() throws IOException, SegmentLoadingException new SegmentProvider(segments), new PartitionConfigurationManager(null, TUNING_CONFIG), null, + null, keepSegmentGranularity, objectMapper ); @@ -738,28 +859,24 @@ public void testEmptyInterval() expectedException.expect(IllegalArgumentException.class); expectedException.expectMessage(CoreMatchers.containsString("must specify a nonempty interval")); - final CompactionTask task = new CompactionTask( - null, - null, - "foo", - Intervals.of("2000-01-01/2000-01-01"), - null, - null, - null, - null, - null, - null, + final Builder builder = new Builder( + DATA_SOURCE, objectMapper, AuthTestUtils.TEST_AUTHORIZER_MAPPER, - new NoopChatHandlerProvider(), - null + null, + rowIngestionMetersFactory ); + + final CompactionTask task = builder + .interval(Intervals.of("2000-01-01/2000-01-01")) + .build(); } @Test public void testTargetPartitionSizeWithPartitionConfig() throws IOException, SegmentLoadingException { final IndexTuningConfig tuningConfig = new IndexTuningConfig( + null, 5, 500000, 1000000L, @@ -792,6 +909,7 @@ public void testTargetPartitionSizeWithPartitionConfig() throws IOException, Seg new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL), new PartitionConfigurationManager(5L, tuningConfig), null, + null, keepSegmentGranularity, objectMapper ); @@ -845,14 +963,17 @@ private static List getDimensionSchema(DimensionSchema mixedTyp private static void assertIngestionSchema( List ingestionSchemas, List expectedDimensionsSpecs, + List expectedMetricsSpec, List expectedSegmentIntervals ) { assertIngestionSchema( ingestionSchemas, expectedDimensionsSpecs, + expectedMetricsSpec, expectedSegmentIntervals, new IndexTuningConfig( + null, 41943040, // automatically computed targetPartitionSize 500000, 1000000L, @@ -884,6 +1005,7 @@ private static void assertIngestionSchema( private static void assertIngestionSchema( List ingestionSchemas, List expectedDimensionsSpecs, + List expectedMetricsSpec, List expectedSegmentIntervals, IndexTuningConfig expectedTuningConfig ) @@ -911,11 +1033,13 @@ private static void assertIngestionSchema( new HashSet<>(expectedDimensionsSpec.getDimensions()), new HashSet<>(parser.getParseSpec().getDimensionsSpec().getDimensions()) ); - final Set expectedAggregators = AGGREGATORS.values() - .stream() - .map(AggregatorFactory::getCombiningFactory) - .collect(Collectors.toSet()); - Assert.assertEquals(expectedAggregators, new HashSet<>(Arrays.asList(dataSchema.getAggregators()))); + + // metrics + final List expectedAggregators = expectedMetricsSpec + .stream() + .map(AggregatorFactory::getCombiningFactory) + .collect(Collectors.toList()); + Assert.assertEquals(expectedAggregators, Arrays.asList(dataSchema.getAggregators())); Assert.assertEquals( new ArbitraryGranularitySpec( Granularities.NONE, @@ -935,16 +1059,10 @@ private static void assertIngestionSchema( Assert.assertEquals(expectedSegmentIntervals.get(i), ingestSegmentFirehoseFactory.getInterval()); Assert.assertNull(ingestSegmentFirehoseFactory.getDimensionsFilter()); - // check the order of dimensions Assert.assertEquals( new HashSet<>(expectedDimensionsSpec.getDimensionNames()), new HashSet<>(ingestSegmentFirehoseFactory.getDimensions()) ); - // check the order of metrics - Assert.assertEquals( - Lists.newArrayList("agg_4", "agg_3", "agg_2", "agg_1", "agg_0"), - ingestSegmentFirehoseFactory.getMetrics() - ); // assert tuningConfig Assert.assertEquals(expectedTuningConfig, ingestionSchema.getTuningConfig()); @@ -1049,9 +1167,14 @@ private static class TestIndexIO extends IndexIO columnMap.put(columnName, createColumn(MIXED_TYPE_COLUMN_MAP.get(segment.getInterval()))); } else if (DIMENSIONS.containsKey(columnName)) { columnMap.put(columnName, createColumn(DIMENSIONS.get(columnName))); - } else if (AGGREGATORS.containsKey(columnName)) { - columnMap.put(columnName, createColumn(AGGREGATORS.get(columnName))); - aggregatorFactories.add(AGGREGATORS.get(columnName)); + } else { + final Optional maybeMetric = AGGREGATORS.stream() + .filter(agg -> agg.getName().equals(columnName)) + .findAny(); + if (maybeMetric.isPresent()) { + columnMap.put(columnName, createColumn(maybeMetric.get())); + aggregatorFactories.add(maybeMetric.get()); + } } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java index a9616bf930fb..45aab98db946 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java @@ -242,7 +242,7 @@ public void testDeterminePartitions() throws Exception tmpDir, null, null, - createTuningConfigWithTargetPartitionSize(2, false, true), + createTuningConfigWithMaxRowsPerSegment(2, false, true), false ), null, @@ -288,7 +288,7 @@ public void testForceExtendableShardSpecs() throws Exception tmpDir, null, null, - createTuningConfigWithTargetPartitionSize(2, true, true), + createTuningConfigWithMaxRowsPerSegment(2, true, true), false ), null, @@ -340,7 +340,7 @@ public void testTransformSpec() throws Exception ) ), null, - createTuningConfigWithTargetPartitionSize(2, true, false), + createTuningConfigWithMaxRowsPerSegment(2, true, false), false ), null, @@ -384,7 +384,7 @@ public void testWithArbitraryGranularity() throws Exception Granularities.MINUTE, Collections.singletonList(Intervals.of("2014-01-01/2014-01-02")) ), - createTuningConfigWithTargetPartitionSize(10, false, true), + createTuningConfigWithMaxRowsPerSegment(10, false, true), false ), null, @@ -421,7 +421,7 @@ public void testIntervalBucketing() throws Exception Granularities.HOUR, Collections.singletonList(Intervals.of("2014-01-01T08:00:00Z/2014-01-01T09:00:00Z")) ), - createTuningConfigWithTargetPartitionSize(50, false, true), + createTuningConfigWithMaxRowsPerSegment(50, false, true), false ), null, @@ -567,7 +567,7 @@ public void testAppendToExisting() throws Exception tmpDir, null, null, - createTuningConfigWithTargetPartitionSize(2, false, false), + createTuningConfigWithMaxRowsPerSegment(2, false, false), true ), null, @@ -617,7 +617,7 @@ public void testIntervalNotSpecified() throws Exception Granularities.MINUTE, null ), - createTuningConfigWithTargetPartitionSize(2, false, true), + createTuningConfigWithMaxRowsPerSegment(2, false, true), false ), null, @@ -680,7 +680,7 @@ public void testCSVFileWithHeader() throws Exception 0 ), null, - createTuningConfigWithTargetPartitionSize(2, false, true), + createTuningConfigWithMaxRowsPerSegment(2, false, true), false ), null, @@ -732,7 +732,7 @@ public void testCSVFileWithHeaderColumnOverride() throws Exception 0 ), null, - createTuningConfigWithTargetPartitionSize(2, false, true), + createTuningConfigWithMaxRowsPerSegment(2, false, true), false ), null, @@ -1042,6 +1042,7 @@ public void testMultipleParseExceptionsSuccess() throws Exception } final IndexTask.IndexTuningConfig tuningConfig = new IndexTask.IndexTuningConfig( + null, 2, null, null, @@ -1164,6 +1165,7 @@ public void testMultipleParseExceptionsFailure() throws Exception // Allow up to 3 parse exceptions, and save up to 2 parse exceptions final IndexTask.IndexTuningConfig tuningConfig = new IndexTask.IndexTuningConfig( + null, 2, null, null, @@ -1279,6 +1281,7 @@ public void testMultipleParseExceptionsFailureAtDeterminePartitions() throws Exc // Allow up to 3 parse exceptions, and save up to 2 parse exceptions final IndexTask.IndexTuningConfig tuningConfig = new IndexTask.IndexTuningConfig( + null, 2, null, null, @@ -1700,14 +1703,14 @@ private IndexTask.IndexIngestionSpec createIngestionSpec( ); } - private static IndexTuningConfig createTuningConfigWithTargetPartitionSize( - int targetPartitionSize, + private static IndexTuningConfig createTuningConfigWithMaxRowsPerSegment( + int maxRowsPerSegment, boolean forceExtendableShardSpecs, boolean forceGuaranteedRollup ) { return createTuningConfig( - targetPartitionSize, + maxRowsPerSegment, 1, null, null, @@ -1740,7 +1743,7 @@ private static IndexTuningConfig createTuningConfigWithNumShards( } private static IndexTuningConfig createTuningConfig( - @Nullable Integer targetPartitionSize, + @Nullable Integer maxRowsPerSegment, @Nullable Integer maxRowsInMemory, @Nullable Long maxBytesInMemory, @Nullable Long maxTotalRows, @@ -1752,7 +1755,8 @@ private static IndexTuningConfig createTuningConfig( ) { return new IndexTask.IndexTuningConfig( - targetPartitionSize, + null, + maxRowsPerSegment, maxRowsInMemory, maxBytesInMemory, maxTotalRows, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java index c4b307001b41..8bb6154e73db 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java @@ -105,7 +105,7 @@ public void testIndexTaskTuningConfigDefaults() throws Exception Assert.assertEquals(0, tuningConfig.getMaxPendingPersists()); Assert.assertEquals(1000000, tuningConfig.getMaxRowsInMemory()); Assert.assertNull(tuningConfig.getNumShards()); - Assert.assertNull(tuningConfig.getTargetPartitionSize()); + Assert.assertNull(tuningConfig.getMaxRowsPerSegment()); } @Test @@ -116,7 +116,15 @@ public void testIndexTaskTuningConfigTargetPartitionSizeOrNumShards() throws Exc IndexTask.IndexTuningConfig.class ); - Assert.assertEquals(10, (int) tuningConfig.getTargetPartitionSize()); + Assert.assertEquals(10, (int) tuningConfig.getMaxRowsPerSegment()); + Assert.assertNull(tuningConfig.getNumShards()); + + tuningConfig = jsonMapper.readValue( + "{\"type\":\"index\", \"maxRowsPerSegment\":10}", + IndexTask.IndexTuningConfig.class + ); + + Assert.assertEquals(10, (int) tuningConfig.getMaxRowsPerSegment()); Assert.assertNull(tuningConfig.getNumShards()); tuningConfig = jsonMapper.readValue( @@ -124,7 +132,7 @@ public void testIndexTaskTuningConfigTargetPartitionSizeOrNumShards() throws Exc IndexTask.IndexTuningConfig.class ); - Assert.assertNull(tuningConfig.getTargetPartitionSize()); + Assert.assertNull(tuningConfig.getMaxRowsPerSegment()); Assert.assertEquals(10, (int) tuningConfig.getNumShards()); tuningConfig = jsonMapper.readValue( @@ -132,7 +140,7 @@ public void testIndexTaskTuningConfigTargetPartitionSizeOrNumShards() throws Exc IndexTask.IndexTuningConfig.class ); - Assert.assertNull(tuningConfig.getTargetPartitionSize()); + Assert.assertNull(tuningConfig.getMaxRowsPerSegment()); Assert.assertEquals(10, (int) tuningConfig.getNumShards()); tuningConfig = jsonMapper.readValue( @@ -141,7 +149,7 @@ public void testIndexTaskTuningConfigTargetPartitionSizeOrNumShards() throws Exc ); Assert.assertNull(tuningConfig.getNumShards()); - Assert.assertEquals(10, (int) tuningConfig.getTargetPartitionSize()); + Assert.assertEquals(10, (int) tuningConfig.getMaxRowsPerSegment()); tuningConfig = jsonMapper.readValue( "{\"type\":\"index\", \"targetPartitionSize\":-1, \"numShards\":-1}", @@ -149,7 +157,7 @@ public void testIndexTaskTuningConfigTargetPartitionSizeOrNumShards() throws Exc ); Assert.assertNull(tuningConfig.getNumShards()); - Assert.assertNull(tuningConfig.getTargetPartitionSize()); + Assert.assertNull(tuningConfig.getMaxRowsPerSegment()); } @Test @@ -184,6 +192,7 @@ public void testIndexTaskSerde() throws Exception ), new IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null), true), new IndexTuningConfig( + null, 10000, 10, null, @@ -241,7 +250,7 @@ public void testIndexTaskSerde() throws Exception Assert.assertEquals(taskTuningConfig.getMaxPendingPersists(), task2TuningConfig.getMaxPendingPersists()); Assert.assertEquals(taskTuningConfig.getMaxRowsInMemory(), task2TuningConfig.getMaxRowsInMemory()); Assert.assertEquals(taskTuningConfig.getNumShards(), task2TuningConfig.getNumShards()); - Assert.assertEquals(taskTuningConfig.getTargetPartitionSize(), task2TuningConfig.getTargetPartitionSize()); + Assert.assertEquals(taskTuningConfig.getMaxRowsPerSegment(), task2TuningConfig.getMaxRowsPerSegment()); Assert.assertEquals( taskTuningConfig.isForceExtendableShardSpecs(), task2TuningConfig.isForceExtendableShardSpecs() @@ -270,6 +279,7 @@ public void testIndexTaskwithResourceSerde() throws Exception ), new IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null), true), new IndexTuningConfig( + null, 10000, 10, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java index ff3caedfab55..cc3ca979dcbb 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java @@ -430,6 +430,7 @@ private TestSupervisorTask newTask( null, null, null, + null, NUM_SUB_TASKS, null, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java index dd9096310d86..0fa747f6604f 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java @@ -138,6 +138,7 @@ private ParallelIndexSupervisorTask newTask( null, null, null, + null, 2, null, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java index ba5254db5ce7..7f9a7ae55d07 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java @@ -228,6 +228,7 @@ private ParallelIndexSupervisorTask newTask( null, null, null, + null, 2, null, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java index 0f1a2f60cfc2..66f0ec9d0c2e 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java @@ -687,6 +687,7 @@ public void testIndexTask() throws Exception ), new IndexIOConfig(new MockFirehoseFactory(false), false), new IndexTuningConfig( + null, 10000, 10, null, @@ -768,6 +769,7 @@ public void testIndexTaskFailure() throws Exception ), new IndexIOConfig(new MockExceptionalFirehoseFactory(), false), new IndexTuningConfig( + null, 10000, 10, null, @@ -1156,6 +1158,7 @@ public void testResumeTasks() throws Exception ), new IndexIOConfig(new MockFirehoseFactory(false), false), new IndexTuningConfig( + null, 10000, 10, null, diff --git a/integration-tests/src/test/resources/indexer/wikipedia_index_task.json b/integration-tests/src/test/resources/indexer/wikipedia_index_task.json index 8b3eab89fb23..819caae752a1 100644 --- a/integration-tests/src/test/resources/indexer/wikipedia_index_task.json +++ b/integration-tests/src/test/resources/indexer/wikipedia_index_task.json @@ -64,7 +64,7 @@ }, "tuningConfig": { "type": "index", - "targetPartitionSize": 3 + "maxRowsPerSegment": 3 } } } \ No newline at end of file diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQuery.java b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQuery.java index 1fe2b0417ea1..7d6d0bf1ec20 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQuery.java +++ b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQuery.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.timeline.DataSegment; +import javax.annotation.Nullable; import java.util.List; import java.util.Map; @@ -31,6 +32,7 @@ public class ClientCompactQuery private final String dataSource; private final List segments; private final boolean keepSegmentGranularity; + @Nullable private final Long targetCompactionSizeBytes; private final ClientCompactQueryTuningConfig tuningConfig; private final Map context; @@ -40,7 +42,7 @@ public ClientCompactQuery( @JsonProperty("dataSource") String dataSource, @JsonProperty("segments") List segments, @JsonProperty("keepSegmentGranularity") boolean keepSegmentGranularity, - @JsonProperty("targetCompactionSizeBytes") Long targetCompactionSizeBytes, + @JsonProperty("targetCompactionSizeBytes") @Nullable Long targetCompactionSizeBytes, @JsonProperty("tuningConfig") ClientCompactQueryTuningConfig tuningConfig, @JsonProperty("context") Map context ) @@ -78,6 +80,7 @@ public boolean isKeepSegmentGranularity() } @JsonProperty + @Nullable public Long getTargetCompactionSizeBytes() { return targetCompactionSizeBytes; @@ -94,17 +97,4 @@ public Map getContext() { return context; } - - @Override - public String toString() - { - return "ClientCompactQuery{" + - "dataSource='" + dataSource + '\'' + - ", segments=" + segments + - ", keepSegmentGranularity=" + keepSegmentGranularity + - ", targetCompactionSizeBytes=" + targetCompactionSizeBytes + - ", tuningConfig=" + tuningConfig + - ", context=" + context + - '}'; - } } diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQueryTuningConfig.java b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQueryTuningConfig.java index a15d7feb61aa..26248b8517fa 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQueryTuningConfig.java +++ b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQueryTuningConfig.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.segment.IndexSpec; +import org.apache.druid.server.coordinator.DataSourceCompactionConfig.UserCompactTuningConfig; import javax.annotation.Nullable; import java.util.Objects; @@ -35,26 +36,45 @@ public class ClientCompactQueryTuningConfig private static final int DEFAULT_MAX_PENDING_PERSISTS = 0; private static final long DEFAULT_PUBLISH_TIMEOUT = 0; + @Nullable + private final Integer maxRowsPerSegment; private final int maxRowsInMemory; private final int maxTotalRows; private final IndexSpec indexSpec; private final int maxPendingPersists; - private final long publishTimeout; + private final long pushTimeout; + + public static ClientCompactQueryTuningConfig from( + @Nullable UserCompactTuningConfig userCompactTuningConfig, + @Nullable Integer maxRowsPerSegment + ) + { + return new ClientCompactQueryTuningConfig( + maxRowsPerSegment, + userCompactTuningConfig == null ? null : userCompactTuningConfig.getMaxRowsInMemory(), + userCompactTuningConfig == null ? null : userCompactTuningConfig.getMaxTotalRows(), + userCompactTuningConfig == null ? null : userCompactTuningConfig.getIndexSpec(), + userCompactTuningConfig == null ? null : userCompactTuningConfig.getMaxPendingPersists(), + userCompactTuningConfig == null ? null : userCompactTuningConfig.getPushTimeout() + ); + } @JsonCreator public ClientCompactQueryTuningConfig( + @JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment, @JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory, @JsonProperty("maxTotalRows") @Nullable Integer maxTotalRows, @JsonProperty("indexSpec") @Nullable IndexSpec indexSpec, @JsonProperty("maxPendingPersists") @Nullable Integer maxPendingPersists, - @JsonProperty("publishTimeout") @Nullable Long publishTimeout + @JsonProperty("pushTimeout") @Nullable Long pushTimeout ) { + this.maxRowsPerSegment = maxRowsPerSegment; this.maxRowsInMemory = maxRowsInMemory == null ? DEFAULT_MAX_ROWS_IN_MEMORY : maxRowsInMemory; this.maxTotalRows = maxTotalRows == null ? DEFAULT_MAX_TOTAL_ROWS : maxTotalRows; this.indexSpec = indexSpec == null ? DEFAULT_INDEX_SPEC : indexSpec; this.maxPendingPersists = maxPendingPersists == null ? DEFAULT_MAX_PENDING_PERSISTS : maxPendingPersists; - this.publishTimeout = publishTimeout == null ? DEFAULT_PUBLISH_TIMEOUT : publishTimeout; + this.pushTimeout = pushTimeout == null ? DEFAULT_PUBLISH_TIMEOUT : pushTimeout; } @JsonProperty @@ -63,6 +83,13 @@ public String getType() return "index"; } + @JsonProperty + @Nullable + public Integer getMaxRowsPerSegment() + { + return maxRowsPerSegment; + } + @JsonProperty public int getMaxRowsInMemory() { @@ -88,9 +115,9 @@ public int getMaxPendingPersists() } @JsonProperty - public long getPublishTimeout() + public long getPushTimeout() { - return publishTimeout; + return pushTimeout; } @Override @@ -99,47 +126,41 @@ public boolean equals(Object o) if (this == o) { return true; } - if (o == null || getClass() != o.getClass()) { return false; } - - final ClientCompactQueryTuningConfig that = (ClientCompactQueryTuningConfig) o; - - if (maxRowsInMemory != that.maxRowsInMemory) { - return false; - } - - if (maxTotalRows != that.maxTotalRows) { - return false; - } - - if (!indexSpec.equals(that.indexSpec)) { - return false; - } - - if (maxPendingPersists != that.maxPendingPersists) { - return false; - } - - return publishTimeout == that.publishTimeout; + ClientCompactQueryTuningConfig that = (ClientCompactQueryTuningConfig) o; + return maxRowsInMemory == that.maxRowsInMemory && + maxTotalRows == that.maxTotalRows && + maxPendingPersists == that.maxPendingPersists && + pushTimeout == that.pushTimeout && + Objects.equals(maxRowsPerSegment, that.maxRowsPerSegment) && + Objects.equals(indexSpec, that.indexSpec); } @Override public int hashCode() { - return Objects.hash(maxRowsInMemory, maxTotalRows, indexSpec, maxPendingPersists, publishTimeout); + return Objects.hash( + maxRowsPerSegment, + maxRowsInMemory, + maxTotalRows, + indexSpec, + maxPendingPersists, + pushTimeout + ); } @Override public String toString() { return "ClientCompactQueryTuningConfig{" + - "maxRowsInMemory='" + maxRowsInMemory + - ", maxTotalRows='" + maxTotalRows + - ", indexSpec='" + indexSpec + - ", maxPendingPersists='" + maxPendingPersists + - ", publishTimeout='" + publishTimeout + - "}"; + "maxRowsPerSegment=" + maxRowsPerSegment + + ", maxRowsInMemory=" + maxRowsInMemory + + ", maxTotalRows=" + maxTotalRows + + ", indexSpec=" + indexSpec + + ", maxPendingPersists=" + maxPendingPersists + + ", pushTimeout=" + pushTimeout + + '}'; } } diff --git a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java index 09a4753ea3c0..07ae669dd425 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java +++ b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java @@ -91,7 +91,7 @@ public void killSegments(String dataSource, Interval interval) public String compactSegments( List segments, boolean keepSegmentGranularity, - long targetCompactionSizeBytes, + @Nullable Long targetCompactionSizeBytes, int compactionTaskPriority, @Nullable ClientCompactQueryTuningConfig tuningConfig, @Nullable Map context diff --git a/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java b/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java index d9d35f5e6249..5c992baed858 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java +++ b/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java @@ -39,7 +39,7 @@ public interface IndexingServiceClient String compactSegments( List segments, boolean keepSegmentGranularity, - long targetCompactionSizeBytes, + @Nullable Long targetCompactionSizeBytes, int compactionTaskPriority, @Nullable ClientCompactQueryTuningConfig tuningConfig, @Nullable Map context diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java index cefc421bc381..1e81792c4304 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java @@ -45,7 +45,8 @@ public interface AppenderatorConfig /** * Maximum number of rows in a single segment before pushing to deep storage */ - default int getMaxRowsPerSegment() + @Nullable + default Integer getMaxRowsPerSegment() { return Integer.MAX_VALUE; } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorDriverAddResult.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorDriverAddResult.java index e80658ee2cc1..7b45479ece21 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorDriverAddResult.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorDriverAddResult.java @@ -102,10 +102,17 @@ public boolean isPersistRequired() public boolean isPushRequired(AppenderatorConfig tuningConfig) { - boolean overThreshold = getNumRowsInSegment() >= tuningConfig.getMaxRowsPerSegment(); - Long maxTotal = tuningConfig.getMaxTotalRows(); - if (maxTotal != null) { - overThreshold |= getTotalNumRowsInAppenderator() >= maxTotal; + return isPushRequired(tuningConfig.getMaxRowsPerSegment(), tuningConfig.getMaxTotalRows()); + } + + public boolean isPushRequired(@Nullable Integer maxRowsPerSegment, @Nullable Long maxTotalRows) + { + boolean overThreshold = false; + if (maxRowsPerSegment != null) { + overThreshold = getNumRowsInSegment() >= maxRowsPerSegment; + } + if (maxTotalRows != null) { + overThreshold |= getTotalNumRowsInAppenderator() >= maxTotalRows; } return overThreshold; } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java index 8a56e319b8f0..f190b2e2a2b1 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java @@ -20,9 +20,12 @@ package org.apache.druid.server.coordinator; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.druid.client.indexing.ClientCompactQueryTuningConfig; +import org.apache.druid.segment.IndexSpec; import org.joda.time.Period; import javax.annotation.Nullable; @@ -44,27 +47,35 @@ public class DataSourceCompactionConfig private final boolean keepSegmentGranularity; private final int taskPriority; private final long inputSegmentSizeBytes; - private final long targetCompactionSizeBytes; + @Nullable + private final Long targetCompactionSizeBytes; // The number of input segments is limited because the byte size of a serialized task spec is limited by // RemoteTaskRunnerConfig.maxZnodeBytes. + @Nullable + private final Integer maxRowsPerSegment; private final int maxNumSegmentsToCompact; private final Period skipOffsetFromLatest; - private final ClientCompactQueryTuningConfig tuningConfig; + private final UserCompactTuningConfig tuningConfig; private final Map taskContext; @JsonCreator public DataSourceCompactionConfig( @JsonProperty("dataSource") String dataSource, - @JsonProperty("keepSegmentGranularity") Boolean keepSegmentGranularity, + @JsonProperty("keepSegmentGranularity") @Nullable Boolean keepSegmentGranularity, @JsonProperty("taskPriority") @Nullable Integer taskPriority, @JsonProperty("inputSegmentSizeBytes") @Nullable Long inputSegmentSizeBytes, @JsonProperty("targetCompactionSizeBytes") @Nullable Long targetCompactionSizeBytes, + @JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment, @JsonProperty("maxNumSegmentsToCompact") @Nullable Integer maxNumSegmentsToCompact, @JsonProperty("skipOffsetFromLatest") @Nullable Period skipOffsetFromLatest, - @JsonProperty("tuningConfig") @Nullable ClientCompactQueryTuningConfig tuningConfig, + @JsonProperty("tuningConfig") @Nullable UserCompactTuningConfig tuningConfig, @JsonProperty("taskContext") @Nullable Map taskContext ) { + Preconditions.checkArgument( + targetCompactionSizeBytes == null || maxRowsPerSegment == null, + "targetCompactionSizeBytes and maxRowsPerSegment in tuningConfig can't be used together" + ); this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource"); this.keepSegmentGranularity = keepSegmentGranularity == null ? DEFAULT_KEEP_SEGMENT_GRANULARITY @@ -75,9 +86,12 @@ public DataSourceCompactionConfig( this.inputSegmentSizeBytes = inputSegmentSizeBytes == null ? DEFAULT_INPUT_SEGMENT_SIZE_BYTES : inputSegmentSizeBytes; - this.targetCompactionSizeBytes = targetCompactionSizeBytes == null - ? DEFAULT_TARGET_COMPACTION_SIZE_BYTES - : targetCompactionSizeBytes; + if (targetCompactionSizeBytes == null && maxRowsPerSegment == null) { + this.targetCompactionSizeBytes = DEFAULT_TARGET_COMPACTION_SIZE_BYTES; + } else { + this.targetCompactionSizeBytes = targetCompactionSizeBytes; + } + this.maxRowsPerSegment = maxRowsPerSegment; this.maxNumSegmentsToCompact = maxNumSegmentsToCompact == null ? DEFAULT_NUM_INPUT_SEGMENTS : maxNumSegmentsToCompact; @@ -122,11 +136,19 @@ public int getMaxNumSegmentsToCompact() } @JsonProperty - public long getTargetCompactionSizeBytes() + @Nullable + public Long getTargetCompactionSizeBytes() { return targetCompactionSizeBytes; } + @JsonProperty + @Nullable + public Integer getMaxRowsPerSegment() + { + return maxRowsPerSegment; + } + @JsonProperty public Period getSkipOffsetFromLatest() { @@ -135,7 +157,7 @@ public Period getSkipOffsetFromLatest() @JsonProperty @Nullable - public ClientCompactQueryTuningConfig getTuningConfig() + public UserCompactTuningConfig getTuningConfig() { return tuningConfig; } @@ -150,49 +172,22 @@ public Map getTaskContext() @Override public boolean equals(Object o) { - if (o == this) { + if (this == o) { return true; } - if (o == null || getClass() != o.getClass()) { return false; } - - final DataSourceCompactionConfig that = (DataSourceCompactionConfig) o; - - if (!dataSource.equals(that.dataSource)) { - return false; - } - - if (keepSegmentGranularity != that.keepSegmentGranularity) { - return false; - } - - if (taskPriority != that.taskPriority) { - return false; - } - - if (inputSegmentSizeBytes != that.inputSegmentSizeBytes) { - return false; - } - - if (maxNumSegmentsToCompact != that.maxNumSegmentsToCompact) { - return false; - } - - if (targetCompactionSizeBytes != that.targetCompactionSizeBytes) { - return false; - } - - if (!skipOffsetFromLatest.equals(that.skipOffsetFromLatest)) { - return false; - } - - if (!Objects.equals(tuningConfig, that.tuningConfig)) { - return false; - } - - return Objects.equals(taskContext, that.taskContext); + DataSourceCompactionConfig that = (DataSourceCompactionConfig) o; + return keepSegmentGranularity == that.keepSegmentGranularity && + taskPriority == that.taskPriority && + inputSegmentSizeBytes == that.inputSegmentSizeBytes && + maxNumSegmentsToCompact == that.maxNumSegmentsToCompact && + Objects.equals(dataSource, that.dataSource) && + Objects.equals(targetCompactionSizeBytes, that.targetCompactionSizeBytes) && + Objects.equals(skipOffsetFromLatest, that.skipOffsetFromLatest) && + Objects.equals(tuningConfig, that.tuningConfig) && + Objects.equals(taskContext, that.taskContext); } @Override @@ -203,11 +198,34 @@ public int hashCode() keepSegmentGranularity, taskPriority, inputSegmentSizeBytes, - maxNumSegmentsToCompact, targetCompactionSizeBytes, + maxNumSegmentsToCompact, skipOffsetFromLatest, tuningConfig, taskContext ); } + + public static class UserCompactTuningConfig extends ClientCompactQueryTuningConfig + { + @JsonCreator + public UserCompactTuningConfig( + @JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory, + @JsonProperty("maxTotalRows") @Nullable Integer maxTotalRows, + @JsonProperty("indexSpec") @Nullable IndexSpec indexSpec, + @JsonProperty("maxPendingPersists") @Nullable Integer maxPendingPersists, + @JsonProperty("pushTimeout") @Nullable Long pushTimeout + ) + { + super(null, maxRowsInMemory, maxTotalRows, indexSpec, maxPendingPersists, pushTimeout); + } + + @Override + @Nullable + @JsonIgnore + public Integer getMaxRowsPerSegment() + { + throw new UnsupportedOperationException(); + } + } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java index 82be5c4458da..b112f6c9ec84 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java @@ -21,6 +21,7 @@ import com.google.inject.Inject; import it.unimi.dsi.fastutil.objects.Object2LongMap; +import org.apache.druid.client.indexing.ClientCompactQueryTuningConfig; import org.apache.druid.client.indexing.IndexingServiceClient; import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.java.util.common.ISE; @@ -151,14 +152,13 @@ private CoordinatorStats doRun( if (segmentsToCompact.size() > 1) { final DataSourceCompactionConfig config = compactionConfigs.get(dataSourceName); - // Currently set keepSegmentGranularity to false because it breaks the algorithm of CompactionSegmentIterator to - // find segments to be compacted. + // make tuningConfig final String taskId = indexingServiceClient.compactSegments( segmentsToCompact, config.isKeepSegmentGranularity(), config.getTargetCompactionSizeBytes(), config.getTaskPriority(), - config.getTuningConfig(), + ClientCompactQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment()), config.getTaskContext() ); LOG.info( diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java index d114fff0d7e9..295288829ca1 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java @@ -297,7 +297,7 @@ private static SegmentsToCompact findSegmentsToCompact( segmentsToCompact.clear(); log.warn( "The number of segments[%d] for dataSource[%s] and interval[%s] is larger than " - + "numTargetCompactSegments[%d]. If you see lots of shards are being skipped due to too many " + + "maxNumSegmentsToCompact[%d]. If you see lots of shards are being skipped due to too many " + "segments, consider increasing 'numTargetCompactionSegments' and " + "'druid.indexer.runner.maxZnodeBytes'. Continue to the next shard.", chunks.size(), diff --git a/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java b/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java index d395a6c2efad..49c87f558837 100644 --- a/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java +++ b/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java @@ -52,7 +52,7 @@ public void mergeSegments(List segments) public String compactSegments( List segments, boolean keepSegmentGranularity, - long targetCompactionSizeBytes, + @Nullable Long targetCompactionSizeBytes, int compactionTaskPriority, @Nullable ClientCompactQueryTuningConfig tuningConfig, @Nullable Map context diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java new file mode 100644 index 000000000000..201f9035de43 --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.client.indexing.ClientCompactQueryTuningConfig; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.server.coordinator.DataSourceCompactionConfig.UserCompactTuningConfig; +import org.joda.time.Period; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; + +public class DataSourceCompactionConfigTest +{ + private static final ObjectMapper objectMapper = new DefaultObjectMapper(); + + @Test + public void testSerdeBasic() throws IOException + { + final DataSourceCompactionConfig config = new DataSourceCompactionConfig( + "dataSource", + null, + null, + 500L, + 100L, + null, + 20, + new Period(3600), + null, + ImmutableMap.of("key", "val") + ); + final String json = objectMapper.writeValueAsString(config); + final DataSourceCompactionConfig fromJson = objectMapper.readValue(json, DataSourceCompactionConfig.class); + + Assert.assertEquals(config.getDataSource(), fromJson.getDataSource()); + Assert.assertTrue(fromJson.isKeepSegmentGranularity()); + Assert.assertEquals(25, fromJson.getTaskPriority()); + Assert.assertEquals(config.getInputSegmentSizeBytes(), fromJson.getInputSegmentSizeBytes()); + Assert.assertEquals(config.getTargetCompactionSizeBytes(), fromJson.getTargetCompactionSizeBytes()); + Assert.assertEquals(config.getMaxNumSegmentsToCompact(), fromJson.getMaxNumSegmentsToCompact()); + Assert.assertEquals(config.getSkipOffsetFromLatest(), fromJson.getSkipOffsetFromLatest()); + Assert.assertEquals(config.getTuningConfig(), fromJson.getTuningConfig()); + Assert.assertEquals(config.getTaskContext(), fromJson.getTaskContext()); + } + + @Test + public void testSerdeWithMaxRowsPerSegment() throws IOException + { + final DataSourceCompactionConfig config = new DataSourceCompactionConfig( + "dataSource", + null, + null, + 500L, + null, + 30, + 20, + new Period(3600), + null, + ImmutableMap.of("key", "val") + ); + final String json = objectMapper.writeValueAsString(config); + final DataSourceCompactionConfig fromJson = objectMapper.readValue(json, DataSourceCompactionConfig.class); + + Assert.assertEquals(config.getDataSource(), fromJson.getDataSource()); + Assert.assertTrue(fromJson.isKeepSegmentGranularity()); + Assert.assertEquals(25, fromJson.getTaskPriority()); + Assert.assertEquals(config.getInputSegmentSizeBytes(), fromJson.getInputSegmentSizeBytes()); + Assert.assertNull(fromJson.getTargetCompactionSizeBytes()); + Assert.assertEquals(config.getMaxNumSegmentsToCompact(), fromJson.getMaxNumSegmentsToCompact()); + Assert.assertEquals(config.getSkipOffsetFromLatest(), fromJson.getSkipOffsetFromLatest()); + Assert.assertEquals(config.getTuningConfig(), fromJson.getTuningConfig()); + Assert.assertEquals(config.getTaskContext(), fromJson.getTaskContext()); + } + + @Test + public void testSerdeUserCompactTuningConfig() throws IOException + { + final UserCompactTuningConfig config = new UserCompactTuningConfig(null, null, null, null, null); + final String json = objectMapper.writeValueAsString(config); + // Check maxRowsPerSegment doesn't exist in the JSON string + Assert.assertFalse(json.contains("maxRowsPerSegment")); + final UserCompactTuningConfig fromJson = objectMapper.readValue(json, UserCompactTuningConfig.class); + Assert.assertEquals(config, fromJson); + } +} diff --git a/server/src/test/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactorTest.java index 42518ea135ed..711c18121aa3 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactorTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactorTest.java @@ -45,6 +45,7 @@ import org.junit.Before; import org.junit.Test; +import javax.annotation.Nullable; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -65,7 +66,7 @@ public class DruidCoordinatorSegmentCompactorTest public String compactSegments( List segments, boolean keepSegmentGranularity, - long targetCompactionSizeBytes, + @Nullable Long targetCompactionSizeBytes, int compactionTaskPriority, ClientCompactQueryTuningConfig tuningConfig, Map context @@ -463,6 +464,7 @@ private static List createCompactionConfigs(boolean 50L, 50L, null, + null, new Period("PT1H"), // smaller than segment interval null, null diff --git a/server/src/test/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicyTest.java b/server/src/test/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicyTest.java index 5ddbaab6cd90..8f9705ece851 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicyTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicyTest.java @@ -637,6 +637,7 @@ private DataSourceCompactionConfig createCompactionConfig( 0, targetCompactionSizeBytes, targetCompactionSizeBytes, + null, numTargetCompactionSegments, skipOffsetFromLatest, null, From 591695ec6a65796605224213e7aaf5166b883b29 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Thu, 27 Dec 2018 12:43:25 -0800 Subject: [PATCH 2/7] fix build --- .../server/coordinator/NewestSegmentFirstPolicyBenchmark.java | 1 + .../druid/server/coordinator/DataSourceCompactionConfig.java | 1 - .../server/coordinator/DataSourceCompactionConfigTest.java | 2 -- 3 files changed, 1 insertion(+), 3 deletions(-) diff --git a/benchmarks/src/main/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java b/benchmarks/src/main/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java index 760c0a523a32..e256070f67f0 100644 --- a/benchmarks/src/main/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java +++ b/benchmarks/src/main/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java @@ -96,6 +96,7 @@ public void setup() null, null, null, + null, null ) ); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java index f190b2e2a2b1..ef33fb024b62 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java @@ -22,7 +22,6 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.druid.client.indexing.ClientCompactQueryTuningConfig; import org.apache.druid.segment.IndexSpec; diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java index 201f9035de43..eae6043c2148 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java @@ -19,10 +19,8 @@ package org.apache.druid.server.coordinator; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableMap; -import org.apache.druid.client.indexing.ClientCompactQueryTuningConfig; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.server.coordinator.DataSourceCompactionConfig.UserCompactTuningConfig; import org.joda.time.Period; From c0f2a6f734fabf9597fecadf95c7c1559e022b6b Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Thu, 27 Dec 2018 23:28:04 -0800 Subject: [PATCH 3/7] fix build --- .../kinesis/KinesisIndexTaskTuningConfig.java | 64 +++++-------------- 1 file changed, 16 insertions(+), 48 deletions(-) diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java index 95ee278985fe..534330e4d100 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java @@ -188,61 +188,29 @@ public boolean equals(Object o) if (o == null || getClass() != o.getClass()) { return false; } + if (!super.equals(o)) { + return false; + } KinesisIndexTaskTuningConfig that = (KinesisIndexTaskTuningConfig) o; - return getMaxRowsInMemory() == that.getMaxRowsInMemory() && - getMaxBytesInMemory() == that.getMaxBytesInMemory() && - getMaxRowsPerSegment() == that.getMaxRowsPerSegment() && - getMaxPendingPersists() == that.getMaxPendingPersists() && - getBuildV9Directly() == that.getBuildV9Directly() && - isReportParseExceptions() == that.isReportParseExceptions() && - getHandoffConditionTimeout() == that.getHandoffConditionTimeout() && - isResetOffsetAutomatically() == that.isResetOffsetAutomatically() && - isSkipSequenceNumberAvailabilityCheck() == that.isSkipSequenceNumberAvailabilityCheck() && - getRecordBufferSize() == that.getRecordBufferSize() && - getRecordBufferOfferTimeout() == that.getRecordBufferOfferTimeout() && - getRecordBufferFullWait() == that.getRecordBufferFullWait() && - getFetchSequenceNumberTimeout() == that.getFetchSequenceNumberTimeout() && - isLogParseExceptions() == that.isLogParseExceptions() && - getMaxParseExceptions() == that.getMaxParseExceptions() && - getMaxSavedParseExceptions() == that.getMaxSavedParseExceptions() && - getMaxRecordsPerPoll() == that.getMaxRecordsPerPoll() && - Objects.equals(getIntermediatePersistPeriod(), that.getIntermediatePersistPeriod()) && - Objects.equals(getBasePersistDirectory(), that.getBasePersistDirectory()) && - Objects.equals(getIndexSpec(), that.getIndexSpec()) && - Objects.equals(getFetchThreads(), that.getFetchThreads()) && - Objects.equals(getSegmentWriteOutMediumFactory(), that.getSegmentWriteOutMediumFactory()) && - Objects.equals(getMaxTotalRows(), that.getMaxTotalRows()) && - Objects.equals(getIntermediateHandoffPeriod(), that.getIntermediateHandoffPeriod()); + return recordBufferSize == that.recordBufferSize && + recordBufferOfferTimeout == that.recordBufferOfferTimeout && + recordBufferFullWait == that.recordBufferFullWait && + fetchSequenceNumberTimeout == that.fetchSequenceNumberTimeout && + maxRecordsPerPoll == that.maxRecordsPerPoll && + Objects.equals(fetchThreads, that.fetchThreads); } @Override public int hashCode() { return Objects.hash( - getMaxRowsInMemory(), - getMaxBytesInMemory(), - getMaxRowsPerSegment(), - getMaxTotalRows(), - getIntermediatePersistPeriod(), - getBasePersistDirectory(), - getMaxPendingPersists(), - getIndexSpec(), - true, - isReportParseExceptions(), - getHandoffConditionTimeout(), - isResetOffsetAutomatically(), - isSkipSequenceNumberAvailabilityCheck(), - getRecordBufferSize(), - getRecordBufferOfferTimeout(), - getRecordBufferFullWait(), - getFetchSequenceNumberTimeout(), - getFetchThreads(), - getSegmentWriteOutMediumFactory(), - isLogParseExceptions(), - getMaxParseExceptions(), - getMaxSavedParseExceptions(), - getMaxRecordsPerPoll(), - getIntermediateHandoffPeriod() + super.hashCode(), + recordBufferSize, + recordBufferOfferTimeout, + recordBufferFullWait, + fetchSequenceNumberTimeout, + fetchThreads, + maxRecordsPerPoll ); } From 49d2ec9c0582164ead76e218fdd818f9ec6d03cb Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Fri, 28 Dec 2018 09:43:20 -0800 Subject: [PATCH 4/7] fix teamcity --- .../apache/druid/indexing/common/task/CompactionTaskTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java index 8a859ee76e12..211946af3c12 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java @@ -762,7 +762,7 @@ public void testCreateIngestionSchemaWithCustomMetricsSpec() throws IOException, assertIngestionSchema( ingestionSpecs, expectedDimensionsSpec, - Lists.newArrayList(customMetricsSpec), + Arrays.asList(customMetricsSpec), SEGMENT_INTERVALS ); } else { @@ -770,7 +770,7 @@ public void testCreateIngestionSchemaWithCustomMetricsSpec() throws IOException, assertIngestionSchema( ingestionSpecs, expectedDimensionsSpec, - Lists.newArrayList(customMetricsSpec), + Arrays.asList(customMetricsSpec), Collections.singletonList(COMPACTION_INTERVAL) ); } From ee1e8130be299840dbdd55b7bc4b4e431a702b94 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Wed, 2 Jan 2019 11:36:44 -0800 Subject: [PATCH 5/7] add test --- .../org/apache/druid/indexing/common/task/IndexTask.java | 2 +- .../apache/druid/indexing/common/task/TaskSerdeTest.java | 7 +++++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index 42b45fdad899..fabd4e07051f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -1536,7 +1536,7 @@ public IndexTuningConfig withMaxRowsPerSegment(int maxRowsPerSegment) } /** - * Return the target number of rows per segment. This returns null if it's not specified in tuningConfig. + * Return the max number of rows per segment. This returns null if it's not specified in tuningConfig. * Please use {@link IndexTask#getValidMaxRowsPerSegment} instead to get the valid value. */ @Nullable diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java index 8bb6154e73db..fc1b794bd609 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java @@ -119,6 +119,13 @@ public void testIndexTaskTuningConfigTargetPartitionSizeOrNumShards() throws Exc Assert.assertEquals(10, (int) tuningConfig.getMaxRowsPerSegment()); Assert.assertNull(tuningConfig.getNumShards()); + tuningConfig = jsonMapper.readValue( + "{\"type\":\"index\"}", + IndexTask.IndexTuningConfig.class + ); + + Assert.assertNull(tuningConfig.getMaxRowsPerSegment()); + tuningConfig = jsonMapper.readValue( "{\"type\":\"index\", \"maxRowsPerSegment\":10}", IndexTask.IndexTuningConfig.class From 5ace6adf6e0e5e8fd51d162720ce93c6b3a1533a Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Fri, 4 Jan 2019 12:15:07 -0800 Subject: [PATCH 6/7] fix test --- .../apache/druid/indexing/common/task/CompactionTaskTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java index 4ad72721e9b5..f759cc7c1c62 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java @@ -774,7 +774,7 @@ public void testCreateIngestionSchemaWithCustomMetricsSpec() throws IOException, s2.getDataSchema().getGranularitySpec().inputIntervals().get(0) ) ); - Assert.assertEquals(5, ingestionSpecs.size()); + Assert.assertEquals(6, ingestionSpecs.size()); assertIngestionSchema( ingestionSpecs, expectedDimensionsSpec, From c31ecf22fc242e40dd447e9d4191c873151ee448 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Tue, 8 Jan 2019 13:54:51 -0800 Subject: [PATCH 7/7] address comment --- docs/content/ingestion/compaction.md | 4 ++-- .../druid/client/indexing/ClientCompactQueryTuningConfig.java | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/content/ingestion/compaction.md b/docs/content/ingestion/compaction.md index e1e59453d26d..4d1b71b0072b 100644 --- a/docs/content/ingestion/compaction.md +++ b/docs/content/ingestion/compaction.md @@ -51,7 +51,7 @@ Compaction tasks merge all segments of the given interval. The syntax is: |`metricsSpec`|Custom metricsSpec. Compaction task will use this metricsSpec if specified rather than generating one.|No| |`segmentGranularity`|If this is set, compactionTask will change the segment granularity for the given interval. See [segmentGranularity of Uniform Granularity Spec](./ingestion-spec.html#uniform-granularity-spec) for more details. See the below table for the behavior.|No| |`keepSegmentGranularity`|Deprecated. Please use `segmentGranularity` instead. See the below table for its behavior.|No| -|`targetCompactionSizeBytes`|Target segment size after comapction. Cannot be used with `targetPartitionSize`, `maxTotalRows`, and `numShards` in tuningConfig.|No| +|`targetCompactionSizeBytes`|Target segment size after comapction. Cannot be used with `maxRowsPerSegment`, `maxTotalRows`, and `numShards` in tuningConfig.|No| |`tuningConfig`|[Index task tuningConfig](../ingestion/native_tasks.html#tuningconfig)|No| |`context`|[Task context](../ingestion/locking-and-priority.html#task-context)|No| @@ -78,7 +78,7 @@ An example of compaction task is This compaction task reads _all segments_ of the interval `2017-01-01/2018-01-01` and results in new segments. Since both `segmentGranularity` and `keepSegmentGranularity` are null, the original segment granularity will be remained and not changed after compaction. -To control the number of result segments per time chunk, you can set `maxRowsPerSegment` or `numShards`. See [indexTuningConfig](../ingestion/native_tasks.html#tuningconfig) for more details. +To control the number of result segments per time chunk, you can set [maxRowsPerSegment](../configuration/index.html#compaction-dynamic-configuration) or [numShards](../ingestion/native_tasks.html#tuningconfig). Please note that you can run multiple compactionTasks at the same time. For example, you can run 12 compactionTasks per month instead of running a single task for the entire year. A compaction task internally generates an `index` task spec for performing compaction work with some fixed parameters. diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQueryTuningConfig.java b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQueryTuningConfig.java index 26248b8517fa..9bae161c76a0 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQueryTuningConfig.java +++ b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQueryTuningConfig.java @@ -34,7 +34,7 @@ public class ClientCompactQueryTuningConfig private static final int DEFAULT_MAX_TOTAL_ROWS = 20_000_000; private static final IndexSpec DEFAULT_INDEX_SPEC = new IndexSpec(); private static final int DEFAULT_MAX_PENDING_PERSISTS = 0; - private static final long DEFAULT_PUBLISH_TIMEOUT = 0; + private static final long DEFAULT_PUSH_TIMEOUT = 0; @Nullable private final Integer maxRowsPerSegment; @@ -74,7 +74,7 @@ public ClientCompactQueryTuningConfig( this.maxTotalRows = maxTotalRows == null ? DEFAULT_MAX_TOTAL_ROWS : maxTotalRows; this.indexSpec = indexSpec == null ? DEFAULT_INDEX_SPEC : indexSpec; this.maxPendingPersists = maxPendingPersists == null ? DEFAULT_MAX_PENDING_PERSISTS : maxPendingPersists; - this.pushTimeout = pushTimeout == null ? DEFAULT_PUBLISH_TIMEOUT : pushTimeout; + this.pushTimeout = pushTimeout == null ? DEFAULT_PUSH_TIMEOUT : pushTimeout; } @JsonProperty