From 4b60c7eea74851d46d8fd17f8e87ed0b16c6448b Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Wed, 31 Jan 2024 14:31:20 +0530 Subject: [PATCH] Kill tasks honor the buffer period of unused segments (#15710) (#15811) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Kill tasks should honor the buffer period of unused segments. - The coordinator duty KillUnusedSegments determines an umbrella interval for each datasource to determine the kill interval. There can be multiple unused segments in an umbrella interval with different used_status_last_updated timestamps. For example, consider an unused segment that is 30 days old and one that is 1 hour old. Currently the kill task after the 30-day mark would kill both the unused segments and not retain the 1-hour old one. - However, when a kill task is instantiated with this umbrella interval, it’d kill all the unused segments regardless of the last updated timestamp. We need kill tasks and RetrieveUnusedSegmentsAction to honor the bufferPeriod to avoid killing unused segments in the kill interval prematurely. * Clarify default behavior in docs. * test comments * fix canDutyRun() * small updates. * checkstyle * forbidden api fix * doc fix, unused import, codeql scan error, and cleanup logs. * Address review comments * Rename maxUsedFlagLastUpdatedTime to maxUsedStatusLastUpdatedTime This is consistent with the column name `used_status_last_updated`. * Apply suggestions from code review * Make period Duration type * Remove older variants of runKilLTask() in OverlordClient interface * Test can now run without waiting for canDutyRun(). * Remove previous variants of retrieveUnusedSegments from internal metadata storage coordinator interface. Removes the following interface methods in favor of a new method added: - retrieveUnusedSegmentsForInterval(String, Interval) - retrieveUnusedSegmentsForInterval(String, Interval, Integer) * Chain stream operations * cleanup * Pass in the lastUpdatedTime to markUnused test function and remove sleep. --------- Co-authored-by: Abhishek Radhakrishnan Co-authored-by: Kashif Faraz --- docs/data-management/delete.md | 10 +- docs/operations/clean-metadata-store.md | 11 +- .../actions/RetrieveUnusedSegmentsAction.java | 18 +- .../indexing/common/task/ArchiveTask.java | 2 +- .../common/task/KillUnusedSegmentsTask.java | 39 +- .../druid/indexing/common/task/MoveTask.java | 2 +- .../indexing/common/task/RestoreTask.java | 2 +- .../actions/RetrieveSegmentsActionsTest.java | 20 +- ...tKillUnusedSegmentsTaskQuerySerdeTest.java | 37 +- .../task/KillUnusedSegmentsTaskTest.java | 495 ++++++++++++++++-- .../indexing/overlord/TaskLifecycleTest.java | 20 +- .../overlord/http/OverlordResourceTest.java | 2 +- ...TestIndexerMetadataStorageCoordinator.java | 28 +- .../ClientKillUnusedSegmentsTaskQuery.java | 35 +- .../IndexerMetadataStorageCoordinator.java | 16 +- .../IndexerSQLMetadataStorageCoordinator.java | 22 +- .../metadata/SegmentsMetadataManager.java | 18 +- .../metadata/SqlSegmentsMetadataManager.java | 9 +- .../metadata/SqlSegmentsMetadataQuery.java | 56 +- .../druid/rpc/indexing/OverlordClient.java | 34 +- .../coordinator/duty/KillUnusedSegments.java | 117 +++-- .../server/http/DataSourcesResource.java | 2 +- ...ClientKillUnusedSegmentsTaskQueryTest.java | 3 +- ...exerSQLMetadataStorageCoordinatorTest.java | 213 ++++++-- .../rpc/indexing/OverlordClientImplTest.java | 1 + .../duty/KillUnusedSegmentsTest.java | 56 +- .../simulate/TestSegmentsMetadataManager.java | 2 +- .../server/http/DataSourcesResourceTest.java | 2 +- 28 files changed, 980 insertions(+), 292 deletions(-) diff --git a/docs/data-management/delete.md b/docs/data-management/delete.md index 6acd2fc782bb..fccb14007b94 100644 --- a/docs/data-management/delete.md +++ b/docs/data-management/delete.md @@ -95,9 +95,10 @@ The available grammar is: "id": , "dataSource": , "interval" : , - "context": , - "batchSize": , - "limit": + "context": , + "batchSize": , + "limit": , + "maxUsedStatusLastUpdatedTime": } ``` @@ -106,7 +107,8 @@ Some of the parameters used in the task payload are further explained below: | Parameter | Default | Explanation | |-------------|-----------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | `batchSize` |100 | Maximum number of segments that are deleted in one kill batch. Some operations on the Overlord may get stuck while a `kill` task is in progress due to concurrency constraints (such as in `TaskLockbox`). Thus, a `kill` task splits the list of unused segments to be deleted into smaller batches to yield the Overlord resources intermittently to other task operations.| -| `limit` | null - no limit | Maximum number of segments for the kill task to delete.| +| `limit` | null (no limit) | Maximum number of segments for the kill task to delete.| +| `maxUsedStatusLastUpdatedTime` | null (no cutoff) | Maximum timestamp used as a cutoff to include unused segments. The kill task only considers segments which lie in the specified `interval` and were marked as unused no later than this time. The default behavior is to kill all unused segments in the `interval` regardless of when they where marked as unused.| **WARNING:** The `kill` task permanently removes all information about the affected segments from the metadata store and diff --git a/docs/operations/clean-metadata-store.md b/docs/operations/clean-metadata-store.md index 202b27805edf..b30374123ee3 100644 --- a/docs/operations/clean-metadata-store.md +++ b/docs/operations/clean-metadata-store.md @@ -86,6 +86,8 @@ Only applies to the specified datasources in the dynamic configuration parameter If `killDataSourceWhitelist` is not set or empty, then kill tasks can be submitted for all datasources. - `druid.coordinator.kill.period`: Defines the frequency in [ISO 8601 format](https://en.wikipedia.org/wiki/ISO_8601#Durations) for the cleanup job to check for and delete eligible segments. Defaults to `P1D`. Must be greater than `druid.coordinator.period.indexingPeriod`. - `druid.coordinator.kill.durationToRetain`: Defines the retention period in [ISO 8601 format](https://en.wikipedia.org/wiki/ISO_8601#Durations) after creation that segments become eligible for deletion. +- `druid.coordinator.kill.ignoreDurationToRetain`: A way to override `druid.coordinator.kill.durationToRetain`. When enabled, the coordinator considers all unused segments as eligible to be killed. +- `druid.coordinator.kill.bufferPeriod`: Defines the amount of time that a segment must be unused before it can be permanently removed from metadata and deep storage. This serves as a buffer period to prevent data loss if data ends up being needed after being marked unused. - `druid.coordinator.kill.maxSegments`: Defines the maximum number of segments to delete per kill task. ### Audit records @@ -189,15 +191,15 @@ druid.coordinator.kill.datasource.on=false ## Example configuration for automated metadata cleanup -Consider a scenario where you have scripts to create and delete hundreds of datasources and related entities a day. You do not want to fill your metadata store with leftover records. The datasources and related entities tend to persist for only one or two days. Therefore, you want to run a cleanup job that identifies and removes leftover records that are at least four days old. The exception is for audit logs, which you need to retain for 30 days: +Consider a scenario where you have scripts to create and delete hundreds of datasources and related entities a day. You do not want to fill your metadata store with leftover records. The datasources and related entities tend to persist for only one or two days. Therefore, you want to run a cleanup job that identifies and removes leftover records that are at least four days old after a seven day buffer period in case you want to recover the data. The exception is for audit logs, which you need to retain for 30 days: ```properties ... # Schedule the metadata management store task for every hour: -druid.coordinator.period.metadataStoreManagementPeriod=P1H +druid.coordinator.period.metadataStoreManagementPeriod=PT1H -# Set a kill task to poll every day to delete Segment records and segments -# in deep storage > 4 days old. When druid.coordinator.kill.on is set to true, +# Set a kill task to poll every day to delete segment records and segments +# in deep storage > 4 days old after a 7-day buffer period. When druid.coordinator.kill.on is set to true, # you can set killDataSourceWhitelist in the dynamic configuration to limit # the datasources that can be killed. # Required also for automated cleanup of rules and compaction configuration. @@ -205,6 +207,7 @@ druid.coordinator.period.metadataStoreManagementPeriod=P1H druid.coordinator.kill.on=true druid.coordinator.kill.period=P1D druid.coordinator.kill.durationToRetain=P4D +druid.coordinator.kill.bufferPeriod=P7D druid.coordinator.kill.maxSegments=1000 # Poll every day to delete audit records > 30 days old diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUnusedSegmentsAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUnusedSegmentsAction.java index 150648858c15..bb188952966a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUnusedSegmentsAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUnusedSegmentsAction.java @@ -25,6 +25,7 @@ import com.fasterxml.jackson.core.type.TypeReference; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.timeline.DataSegment; +import org.joda.time.DateTime; import org.joda.time.Interval; import javax.annotation.Nullable; @@ -42,16 +43,21 @@ public class RetrieveUnusedSegmentsAction implements TaskAction> getReturnTypeReference() { @@ -83,7 +96,7 @@ public TypeReference> getReturnTypeReference() public List perform(Task task, TaskActionToolbox toolbox) { return toolbox.getIndexerMetadataStorageCoordinator() - .retrieveUnusedSegmentsForInterval(dataSource, interval, limit); + .retrieveUnusedSegmentsForInterval(dataSource, interval, limit, maxUsedStatusLastUpdatedTime); } @Override @@ -99,6 +112,7 @@ public String toString() "dataSource='" + dataSource + '\'' + ", interval=" + interval + ", limit=" + limit + + ", maxUsedStatusLastUpdatedTime=" + maxUsedStatusLastUpdatedTime + '}'; } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/ArchiveTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/ArchiveTask.java index 42d04316a41f..957317ad93cd 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/ArchiveTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/ArchiveTask.java @@ -79,7 +79,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception // List unused segments final List unusedSegments = toolbox .getTaskActionClient() - .submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(), myLock.getInterval(), null)); + .submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(), myLock.getInterval(), null, null)); // Verify none of these segments have versions > lock version for (final DataSegment unusedSegment : unusedSegments) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java index 54fae94684fd..cbf4a84ba790 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java @@ -63,9 +63,10 @@ /** * The client representation of this task is {@link ClientKillUnusedSegmentsTaskQuery}. * JSON serialization fields of this class must correspond to those of {@link - * ClientKillUnusedSegmentsTaskQuery}, except for "id" and "context" fields. + * ClientKillUnusedSegmentsTaskQuery}, except for {@link #id} and {@link #context} fields. *

* The field {@link #isMarkAsUnused()} is now deprecated. + *

*/ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask { @@ -95,6 +96,12 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask */ @Nullable private final Integer limit; + /** + * The maximum used status last updated time. Any segments with + * {@code used_status_last_updated} no later than this time will be included in the kill task. + */ + @Nullable private final DateTime maxUsedStatusLastUpdatedTime; + @JsonCreator public KillUnusedSegmentsTask( @JsonProperty("id") String id, @@ -103,7 +110,8 @@ public KillUnusedSegmentsTask( @JsonProperty("context") Map context, @JsonProperty("markAsUnused") @Deprecated Boolean markAsUnused, @JsonProperty("batchSize") Integer batchSize, - @JsonProperty("limit") @Nullable Integer limit + @JsonProperty("limit") @Nullable Integer limit, + @JsonProperty("maxUsedStatusLastUpdatedTime") @Nullable DateTime maxUsedStatusLastUpdatedTime ) { super( @@ -115,15 +123,16 @@ public KillUnusedSegmentsTask( this.markAsUnused = markAsUnused != null && markAsUnused; this.batchSize = (batchSize != null) ? batchSize : DEFAULT_SEGMENT_NUKE_BATCH_SIZE; if (this.batchSize <= 0) { - throw InvalidInput.exception("batchSize[%d] must be a positive integer.", limit); + throw InvalidInput.exception("batchSize[%d] must be a positive integer.", batchSize); } if (limit != null && limit <= 0) { - throw InvalidInput.exception("Limit[%d] must be a positive integer.", limit); + throw InvalidInput.exception("limit[%d] must be a positive integer.", limit); } if (limit != null && Boolean.TRUE.equals(markAsUnused)) { - throw InvalidInput.exception("Limit cannot be provided when markAsUnused is enabled."); + throw InvalidInput.exception("limit[%d] cannot be provided when markAsUnused is enabled.", limit); } this.limit = limit; + this.maxUsedStatusLastUpdatedTime = maxUsedStatusLastUpdatedTime; } /** @@ -155,6 +164,13 @@ public Integer getLimit() return limit; } + @Nullable + @JsonProperty + public DateTime getMaxUsedStatusLastUpdatedTime() + { + return maxUsedStatusLastUpdatedTime; + } + @Override public String getType() { @@ -180,7 +196,8 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception numSegmentsMarkedAsUnused = toolbox.getTaskActionClient().submit( new MarkSegmentsAsUnusedAction(getDataSource(), getInterval()) ); - LOG.info("Marked [%d] segments as unused.", numSegmentsMarkedAsUnused); + LOG.info("Marked [%d] segments of datasource[%s] in interval[%s] as unused.", + numSegmentsMarkedAsUnused, getDataSource(), getInterval()); } else { numSegmentsMarkedAsUnused = 0; } @@ -190,9 +207,13 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception @Nullable Integer numTotalBatches = getNumTotalBatches(); List unusedSegments; LOG.info( - "Starting kill with batchSize[%d], up to limit[%d] segments will be deleted%s", + "Starting kill for datasource[%s] in interval[%s] with batchSize[%d], up to limit[%d] segments " + + "before maxUsedStatusLastUpdatedTime[%s] will be deleted%s", + getDataSource(), + getInterval(), batchSize, limit, + maxUsedStatusLastUpdatedTime, numTotalBatches != null ? StringUtils.format(" in [%d] batches.", numTotalBatches) : "." ); @@ -217,7 +238,9 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception unusedSegments = toolbox .getTaskActionClient() - .submit(new RetrieveUnusedSegmentsAction(getDataSource(), getInterval(), nextBatchSize)); + .submit( + new RetrieveUnusedSegmentsAction(getDataSource(), getInterval(), nextBatchSize, maxUsedStatusLastUpdatedTime + )); // Fetch locks each time as a revokal could have occurred in between batches final NavigableMap> taskLockMap diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/MoveTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/MoveTask.java index d23b3820db74..bea3685ca244 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/MoveTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/MoveTask.java @@ -87,7 +87,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception // List unused segments final List unusedSegments = toolbox .getTaskActionClient() - .submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(), myLock.getInterval(), null)); + .submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(), myLock.getInterval(), null, null)); // Verify none of these segments have versions > lock version for (final DataSegment unusedSegment : unusedSegments) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RestoreTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RestoreTask.java index 1364bcb597fe..cc635383fed5 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RestoreTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RestoreTask.java @@ -80,7 +80,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception // List unused segments final List unusedSegments = toolbox .getTaskActionClient() - .submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(), myLock.getInterval(), null)); + .submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(), myLock.getInterval(), null, null)); // Verify none of these segments have versions > lock version for (final DataSegment unusedSegment : unusedSegments) { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java index 09f724bcfc1c..2dee594aad68 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java @@ -20,8 +20,10 @@ package org.apache.druid.indexing.common.actions; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import org.apache.druid.indexing.common.task.NoopTask; import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.NoneShardSpec; @@ -103,7 +105,23 @@ public void testRetrieveUsedSegmentsAction() @Test public void testRetrieveUnusedSegmentsAction() { - final RetrieveUnusedSegmentsAction action = new RetrieveUnusedSegmentsAction(task.getDataSource(), INTERVAL, null); + final RetrieveUnusedSegmentsAction action = new RetrieveUnusedSegmentsAction(task.getDataSource(), INTERVAL, null, null); + final Set resultSegments = new HashSet<>(action.perform(task, actionTestKit.getTaskActionToolbox())); + Assert.assertEquals(expectedUnusedSegments, resultSegments); + } + + @Test + public void testRetrieveUnusedSegmentsActionWithMinUsedLastUpdatedTime() + { + final RetrieveUnusedSegmentsAction action = new RetrieveUnusedSegmentsAction(task.getDataSource(), INTERVAL, null, DateTimes.MIN); + final Set resultSegments = new HashSet<>(action.perform(task, actionTestKit.getTaskActionToolbox())); + Assert.assertEquals(ImmutableSet.of(), resultSegments); + } + + @Test + public void testRetrieveUnusedSegmentsActionWithNowUsedLastUpdatedTime() + { + final RetrieveUnusedSegmentsAction action = new RetrieveUnusedSegmentsAction(task.getDataSource(), INTERVAL, null, DateTimes.nowUtc()); final Set resultSegments = new HashSet<>(action.perform(task, actionTestKit.getTaskActionToolbox())); Assert.assertEquals(expectedUnusedSegments, resultSegments); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientKillUnusedSegmentsTaskQuerySerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientKillUnusedSegmentsTaskQuerySerdeTest.java index 3ab6bae4688f..7e7c9088d61f 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientKillUnusedSegmentsTaskQuerySerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientKillUnusedSegmentsTaskQuerySerdeTest.java @@ -24,6 +24,7 @@ import org.apache.druid.client.indexing.ClientKillUnusedSegmentsTaskQuery; import org.apache.druid.client.indexing.ClientTaskQuery; import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; import org.junit.Assert; import org.junit.Before; @@ -53,7 +54,8 @@ public void testClientKillUnusedSegmentsTaskQueryToKillUnusedSegmentsTask() thro Intervals.of("2020-01-01/P1D"), false, 99, - 5 + 5, + DateTimes.nowUtc() ); final byte[] json = objectMapper.writeValueAsBytes(taskQuery); final KillUnusedSegmentsTask fromJson = (KillUnusedSegmentsTask) objectMapper.readValue(json, Task.class); @@ -63,7 +65,7 @@ public void testClientKillUnusedSegmentsTaskQueryToKillUnusedSegmentsTask() thro Assert.assertEquals(taskQuery.getMarkAsUnused(), fromJson.isMarkAsUnused()); Assert.assertEquals(taskQuery.getBatchSize(), Integer.valueOf(fromJson.getBatchSize())); Assert.assertEquals(taskQuery.getLimit(), fromJson.getLimit()); - + Assert.assertEquals(taskQuery.getMaxUsedStatusLastUpdatedTime(), fromJson.getMaxUsedStatusLastUpdatedTime()); } @Test @@ -75,6 +77,7 @@ public void testClientKillUnusedSegmentsTaskQueryToKillUnusedSegmentsTaskDefault Intervals.of("2020-01-01/P1D"), true, null, + null, null ); final byte[] json = objectMapper.writeValueAsBytes(taskQuery); @@ -85,6 +88,7 @@ public void testClientKillUnusedSegmentsTaskQueryToKillUnusedSegmentsTaskDefault Assert.assertEquals(taskQuery.getMarkAsUnused(), fromJson.isMarkAsUnused()); Assert.assertEquals(100, fromJson.getBatchSize()); Assert.assertNull(taskQuery.getLimit()); + Assert.assertNull(taskQuery.getMaxUsedStatusLastUpdatedTime()); } @Test @@ -97,6 +101,7 @@ public void testKillUnusedSegmentsTaskToClientKillUnusedSegmentsTaskQuery() thro null, true, 99, + null, null ); final byte[] json = objectMapper.writeValueAsBytes(task); @@ -110,5 +115,33 @@ public void testKillUnusedSegmentsTaskToClientKillUnusedSegmentsTaskQuery() thro Assert.assertEquals(task.isMarkAsUnused(), taskQuery.getMarkAsUnused()); Assert.assertEquals(Integer.valueOf(task.getBatchSize()), taskQuery.getBatchSize()); Assert.assertNull(task.getLimit()); + Assert.assertNull(task.getMaxUsedStatusLastUpdatedTime()); + } + + @Test + public void testKillUnusedSegmentsTaskWithNonNullValuesToClientKillUnusedSegmentsTaskQuery() throws IOException + { + final KillUnusedSegmentsTask task = new KillUnusedSegmentsTask( + null, + "datasource", + Intervals.of("2020-01-01/P1D"), + null, + null, + 99, + 100, + DateTimes.nowUtc() + ); + final byte[] json = objectMapper.writeValueAsBytes(task); + final ClientKillUnusedSegmentsTaskQuery taskQuery = (ClientKillUnusedSegmentsTaskQuery) objectMapper.readValue( + json, + ClientTaskQuery.class + ); + Assert.assertEquals(task.getId(), taskQuery.getId()); + Assert.assertEquals(task.getDataSource(), taskQuery.getDataSource()); + Assert.assertEquals(task.getInterval(), taskQuery.getInterval()); + Assert.assertNull(taskQuery.getMarkAsUnused()); + Assert.assertEquals(Integer.valueOf(task.getBatchSize()), taskQuery.getBatchSize()); + Assert.assertEquals(task.getLimit(), taskQuery.getLimit()); + Assert.assertEquals(task.getMaxUsedStatusLastUpdatedTime(), taskQuery.getMaxUsedStatusLastUpdatedTime()); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java index d77f2e6c2439..e2c433536a2b 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java @@ -22,6 +22,8 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import org.apache.druid.error.DruidException; +import org.apache.druid.error.DruidExceptionMatcher; import org.apache.druid.indexer.TaskState; import org.apache.druid.indexing.common.KillTaskReport; import org.apache.druid.indexing.common.TaskReport; @@ -29,8 +31,11 @@ import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.JodaUtils; import org.apache.druid.timeline.DataSegment; import org.assertj.core.api.Assertions; +import org.hamcrest.MatcherAssert; +import org.joda.time.DateTime; import org.joda.time.Interval; import org.junit.Assert; import org.junit.Before; @@ -40,6 +45,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; public class KillUnusedSegmentsTaskTest extends IngestionTestBase { @@ -86,18 +92,24 @@ public void testKill() throws Exception null, false, null, + null, null ); Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task).get().getStatusCode()); - final List unusedSegments = - getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(DATA_SOURCE, Intervals.of("2019/2020")); + final List unusedSegments = getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval( + DATA_SOURCE, + Intervals.of("2019/2020"), + null, + null + ); Assert.assertEquals(ImmutableList.of(newSegment(Intervals.of("2019-02-01/2019-03-01"), version)), unusedSegments); - Assertions.assertThat( - getMetadataStorageCoordinator() - .retrieveUsedSegmentsForInterval(DATA_SOURCE, Intervals.of("2019/2020"), Segments.ONLY_VISIBLE) + Assertions.assertThat(getMetadataStorageCoordinator().retrieveUsedSegmentsForInterval( + DATA_SOURCE, + Intervals.of("2019/2020"), + Segments.ONLY_VISIBLE) ).containsExactlyInAnyOrder( newSegment(Intervals.of("2019-01-01/2019-02-01"), version), newSegment(Intervals.of("2019-04-01/2019-05-01"), version) @@ -135,13 +147,19 @@ public void testKillWithMarkUnused() throws Exception null, true, null, + null, null ); Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task).get().getStatusCode()); final List unusedSegments = - getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(DATA_SOURCE, Intervals.of("2019/2020")); + getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval( + DATA_SOURCE, + Intervals.of("2019/2020"), + null, + null + ); Assert.assertEquals(ImmutableList.of(newSegment(Intervals.of("2019-02-01/2019-03-01"), version)), unusedSegments); Assertions.assertThat( @@ -166,6 +184,7 @@ public void testGetInputSourceResources() null, true, null, + null, null ); Assert.assertTrue(task.getInputSourceResources().isEmpty()); @@ -176,10 +195,10 @@ public void testKillBatchSizeOneAndLimit4() throws Exception { final String version = DateTimes.nowUtc().toString(); final Set segments = ImmutableSet.of( - newSegment(Intervals.of("2019-01-01/2019-02-01"), version), - newSegment(Intervals.of("2019-02-01/2019-03-01"), version), - newSegment(Intervals.of("2019-03-01/2019-04-01"), version), - newSegment(Intervals.of("2019-04-01/2019-05-01"), version) + newSegment(Intervals.of("2019-01-01/2019-02-01"), version), + newSegment(Intervals.of("2019-02-01/2019-03-01"), version), + newSegment(Intervals.of("2019-03-01/2019-04-01"), version), + newSegment(Intervals.of("2019-04-01/2019-05-01"), version) ); final Set announced = getMetadataStorageCoordinator().commitSegments(segments); @@ -194,58 +213,374 @@ public void testKillBatchSizeOneAndLimit4() throws Exception ); final KillUnusedSegmentsTask task = - new KillUnusedSegmentsTask( - null, - DATA_SOURCE, - Intervals.of("2018-01-01/2020-01-01"), - null, - false, - 1, - 4 - ); + new KillUnusedSegmentsTask( + null, + DATA_SOURCE, + Intervals.of("2018-01-01/2020-01-01"), + null, + false, + 1, + 4, + null + ); Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task).get().getStatusCode()); // we expect ALL tasks to be deleted final List unusedSegments = - getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(DATA_SOURCE, Intervals.of("2019/2020")); + getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval( + DATA_SOURCE, + Intervals.of("2019/2020"), + null, + null + ); Assert.assertEquals(Collections.emptyList(), unusedSegments); Assert.assertEquals(new KillTaskReport.Stats(4, 4, 0), getReportedStats()); } + /** + * Test kill functionality of multiple unused segments in a wide interval with different {@code used_status_last_updated} + * timestamps. A kill task submitted with null {@code maxUsedStatusLastUpdatedTime} will kill all the unused segments in the kill + * interval. + */ + @Test + public void testKillMultipleUnusedSegmentsWithNullMaxUsedStatusLastUpdatedTime() throws Exception + { + final String version = DateTimes.nowUtc().toString(); + final DataSegment segment1 = newSegment(Intervals.of("2019-01-01/2019-02-01"), version); + final DataSegment segment2 = newSegment(Intervals.of("2019-02-01/2019-03-01"), version); + final DataSegment segment3 = newSegment(Intervals.of("2019-03-01/2019-04-01"), version); + final DataSegment segment4 = newSegment(Intervals.of("2019-04-01/2019-05-01"), version); + + final Set segments = ImmutableSet.of(segment1, segment2, segment3, segment4); + final Set announced = getMetadataStorageCoordinator().commitSegments(segments); + + Assert.assertEquals(segments, announced); + + Assert.assertEquals( + 1, + getSegmentsMetadataManager().markAsUnusedSegmentsInInterval( + DATA_SOURCE, + segment1.getInterval() + ) + ); + + Assert.assertEquals( + 1, + getSegmentsMetadataManager().markAsUnusedSegmentsInInterval( + DATA_SOURCE, + segment4.getInterval() + ) + ); + + Assert.assertEquals( + 1, + getSegmentsMetadataManager().markAsUnusedSegmentsInInterval( + DATA_SOURCE, + segment3.getInterval() + ) + ); + + final List segmentIntervals = segments.stream() + .map(DataSegment::getInterval) + .collect(Collectors.toList()); + + final Interval umbrellaInterval = JodaUtils.umbrellaInterval(segmentIntervals); + + + final KillUnusedSegmentsTask task = + new KillUnusedSegmentsTask( + null, + DATA_SOURCE, + umbrellaInterval, + null, + false, + 1, + 10, + null + ); + + Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task).get().getStatusCode()); + + final List unusedSegments = + getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval( + DATA_SOURCE, + umbrellaInterval, + null, + null + ); + + Assert.assertEquals(ImmutableList.of(), unusedSegments); + Assert.assertEquals(new KillTaskReport.Stats(3, 4, 0), getReportedStats()); + } + + /** + * Test kill functionality of multiple unused segments in a wide interval with different {@code used_status_last_updated} + * timestamps. Consider: + *
  • {@code segment1}, {@code segment2} and {@code segment3} have t1, t2 and t3 {@code used_status_last_updated} timestamps + * respectively, where t1 < t2 < t3
  • + *
  • {@code segment4} is a used segment and therefore shouldn't be killed
  • + * + *

    + * A kill task submitted with t2 as the {@code maxUsedStatusLastUpdatedTime} should only kill {@code segment1} and {@code segment2} + * After that, a kill task submitted with t3 as the {@code maxUsedStatusLastUpdatedTime} should kill {@code segment3}. + *

    + */ + @Test + public void testKillMultipleUnusedSegmentsWithDifferentMaxUsedStatusLastUpdatedTime() throws Exception + { + final String version = DateTimes.nowUtc().toString(); + final DataSegment segment1 = newSegment(Intervals.of("2019-01-01/2019-02-01"), version); + final DataSegment segment2 = newSegment(Intervals.of("2019-02-01/2019-03-01"), version); + final DataSegment segment3 = newSegment(Intervals.of("2019-03-01/2019-04-01"), version); + final DataSegment segment4 = newSegment(Intervals.of("2019-04-01/2019-05-01"), version); + + final Set segments = ImmutableSet.of(segment1, segment2, segment3, segment4); + final Set announced = getMetadataStorageCoordinator().commitSegments(segments); + + Assert.assertEquals(segments, announced); + + Assert.assertEquals( + 1, + getSegmentsMetadataManager().markAsUnusedSegmentsInInterval( + DATA_SOURCE, + segment1.getInterval() + ) + ); + + Assert.assertEquals( + 1, + getSegmentsMetadataManager().markAsUnusedSegmentsInInterval( + DATA_SOURCE, + segment4.getInterval() + ) + ); + + // Capture the last updated time cutoff + final DateTime maxUsedStatusLastUpdatedTime1 = DateTimes.nowUtc(); + + // Delay for 1s, mark the segments as unused and then capture the last updated time cutoff again + Thread.sleep(1000); + + // now mark the third segment as unused + Assert.assertEquals( + 1, + getSegmentsMetadataManager().markAsUnusedSegmentsInInterval( + DATA_SOURCE, + segment3.getInterval() + ) + ); + + final DateTime maxUsedStatusLastUpdatedTime2 = DateTimes.nowUtc(); + + + final List segmentIntervals = segments.stream() + .map(DataSegment::getInterval) + .collect(Collectors.toList()); + + final Interval umbrellaInterval = JodaUtils.umbrellaInterval(segmentIntervals); + + final KillUnusedSegmentsTask task1 = + new KillUnusedSegmentsTask( + null, + DATA_SOURCE, + umbrellaInterval, + null, + false, + 1, + 10, + maxUsedStatusLastUpdatedTime1 + ); + + Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task1).get().getStatusCode()); + + final List unusedSegments = + getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval( + DATA_SOURCE, + umbrellaInterval, + null, + null + ); + + Assert.assertEquals(ImmutableList.of(segment3), unusedSegments); + Assert.assertEquals(new KillTaskReport.Stats(2, 3, 0), getReportedStats()); + + final KillUnusedSegmentsTask task2 = + new KillUnusedSegmentsTask( + null, + DATA_SOURCE, + umbrellaInterval, + null, + false, + 1, + 10, + maxUsedStatusLastUpdatedTime2 + ); + + Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task2).get().getStatusCode()); + + final List unusedSegments2 = + getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval( + DATA_SOURCE, + umbrellaInterval, + null, + null + ); + + Assert.assertEquals(ImmutableList.of(), unusedSegments2); + Assert.assertEquals(new KillTaskReport.Stats(1, 2, 0), getReportedStats()); + } + + /** + * Similar to {@link #testKillMultipleUnusedSegmentsWithDifferentMaxUsedStatusLastUpdatedTime()}}, but with a different setup. + *

    + * Tests kill functionality of multiple unused segments in a wide interval with different {@code used_status_last_updated} + * timestamps. Consider: + *

  • {@code segment1} and {@code segment4} have t1 {@code used_status_last_updated} timestamp + *
  • {@code segment2} and {@code segment3} have t2 {@code used_status_last_updated} timestamp, where t1 < t2
  • + * + *

    + * A kill task submitted with t1 as the {@code maxUsedStatusLastUpdatedTime} should only kill {@code segment1} and {@code segment4} + * After that, a kill task submitted with t2 as the {@code maxUsedStatusLastUpdatedTime} should kill {@code segment2} and {@code segment3}. + *

    + */ + @Test + public void testKillMultipleUnusedSegmentsWithDifferentMaxUsedStatusLastUpdatedTime2() throws Exception + { + final String version = DateTimes.nowUtc().toString(); + final DataSegment segment1 = newSegment(Intervals.of("2019-01-01/2019-02-01"), version); + final DataSegment segment2 = newSegment(Intervals.of("2019-02-01/2019-03-01"), version); + final DataSegment segment3 = newSegment(Intervals.of("2019-03-01/2019-04-01"), version); + final DataSegment segment4 = newSegment(Intervals.of("2019-04-01/2019-05-01"), version); + + final Set segments = ImmutableSet.of(segment1, segment2, segment3, segment4); + final Set announced = getMetadataStorageCoordinator().commitSegments(segments); + + Assert.assertEquals(segments, announced); + + Assert.assertEquals( + 2, + getSegmentsMetadataManager().markSegmentsAsUnused( + ImmutableSet.of( + segment1.getId(), + segment4.getId() + ) + ) + ); + + final DateTime maxUsedStatusLastUpdatedTime1 = DateTimes.nowUtc(); + + // Delay for 1s, mark the segments as unused and then capture the last updated time cutoff again + Thread.sleep(1000); + + Assert.assertEquals( + 2, + getSegmentsMetadataManager().markSegmentsAsUnused( + ImmutableSet.of( + segment2.getId(), + segment3.getId() + ) + ) + ); + + final DateTime maxUsedStatusLastUpdatedTime2 = DateTimes.nowUtc(); + + + final List segmentIntervals = segments.stream() + .map(DataSegment::getInterval) + .collect(Collectors.toList()); + + final Interval umbrellaInterval = JodaUtils.umbrellaInterval(segmentIntervals); + + + final KillUnusedSegmentsTask task1 = + new KillUnusedSegmentsTask( + null, + DATA_SOURCE, + umbrellaInterval, + null, + false, + 1, + 10, + maxUsedStatusLastUpdatedTime1 + ); + + Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task1).get().getStatusCode()); + + final List unusedSegments = + getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval( + DATA_SOURCE, + umbrellaInterval, + null, + null + ); + + Assert.assertEquals(ImmutableList.of(segment2, segment3), unusedSegments); + Assert.assertEquals(new KillTaskReport.Stats(2, 3, 0), getReportedStats()); + + final KillUnusedSegmentsTask task2 = + new KillUnusedSegmentsTask( + null, + DATA_SOURCE, + umbrellaInterval, + null, + false, + 1, + 10, + maxUsedStatusLastUpdatedTime2 + ); + + Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task2).get().getStatusCode()); + + final List unusedSegments2 = getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval( + DATA_SOURCE, + umbrellaInterval, + null, + null + ); + + Assert.assertEquals(ImmutableList.of(), unusedSegments2); + Assert.assertEquals(new KillTaskReport.Stats(2, 3, 0), getReportedStats()); + } + @Test public void testKillBatchSizeThree() throws Exception { final String version = DateTimes.nowUtc().toString(); final Set segments = ImmutableSet.of( - newSegment(Intervals.of("2019-01-01/2019-02-01"), version), - newSegment(Intervals.of("2019-02-01/2019-03-01"), version), - newSegment(Intervals.of("2019-03-01/2019-04-01"), version), - newSegment(Intervals.of("2019-04-01/2019-05-01"), version) + newSegment(Intervals.of("2019-01-01/2019-02-01"), version), + newSegment(Intervals.of("2019-02-01/2019-03-01"), version), + newSegment(Intervals.of("2019-03-01/2019-04-01"), version), + newSegment(Intervals.of("2019-04-01/2019-05-01"), version) ); final Set announced = getMetadataStorageCoordinator().commitSegments(segments); Assert.assertEquals(segments, announced); final KillUnusedSegmentsTask task = - new KillUnusedSegmentsTask( - null, - DATA_SOURCE, - Intervals.of("2018-01-01/2020-01-01"), - null, - true, - 3, - null - ); + new KillUnusedSegmentsTask( + null, + DATA_SOURCE, + Intervals.of("2018-01-01/2020-01-01"), + null, + true, + 3, + null, + null + ); Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task).get().getStatusCode()); // we expect ALL tasks to be deleted - final List unusedSegments = - getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(DATA_SOURCE, Intervals.of("2019/2020")); + final List unusedSegments = getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval( + DATA_SOURCE, + Intervals.of("2019/2020"), + null, + null + ); Assert.assertEquals(Collections.emptyList(), unusedSegments); @@ -263,6 +598,7 @@ public void testComputeNextBatchSizeDefault() null, false, null, + null, null ); Assert.assertEquals(100, task.computeNextBatchSize(50)); @@ -279,7 +615,8 @@ public void testComputeNextBatchSizeWithBatchSizeLargerThanLimit() null, false, 10, - 5 + 5, + null ); Assert.assertEquals(5, task.computeNextBatchSize(0)); } @@ -295,7 +632,8 @@ public void testComputeNextBatchSizeWithBatchSizeSmallerThanLimit() null, false, 5, - 10 + 10, + null ); Assert.assertEquals(5, task.computeNextBatchSize(0)); } @@ -311,7 +649,8 @@ public void testComputeNextBatchSizeWithRemainingLessThanLimit() null, false, 5, - 10 + 10, + null ); Assert.assertEquals(3, task.computeNextBatchSize(7)); } @@ -327,6 +666,7 @@ public void testGetNumTotalBatchesDefault() null, false, null, + null, null ); Assert.assertNull(task.getNumTotalBatches()); @@ -343,11 +683,81 @@ public void testGetNumTotalBatchesWithBatchSizeLargerThanLimit() null, false, 10, - 5 + 5, + null ); Assert.assertEquals(1, (int) task.getNumTotalBatches()); } + @Test + public void testInvalidLimit() + { + MatcherAssert.assertThat( + Assert.assertThrows( + DruidException.class, + () -> new KillUnusedSegmentsTask( + null, + DATA_SOURCE, + Intervals.of("2018-01-01/2020-01-01"), + null, + false, + 10, + 0, + null + ) + ), + DruidExceptionMatcher.invalidInput().expectMessageIs( + "limit[0] must be a positive integer." + ) + ); + } + + @Test + public void testInvalidBatchSize() + { + MatcherAssert.assertThat( + Assert.assertThrows( + DruidException.class, + () -> new KillUnusedSegmentsTask( + null, + DATA_SOURCE, + Intervals.of("2018-01-01/2020-01-01"), + null, + false, + 0, + 10, + null + ) + ), + DruidExceptionMatcher.invalidInput().expectMessageIs( + "batchSize[0] must be a positive integer." + ) + ); + } + + @Test + public void testInvalidMarkAsUnusedWithLimit() + { + MatcherAssert.assertThat( + Assert.assertThrows( + DruidException.class, + () -> new KillUnusedSegmentsTask( + null, + DATA_SOURCE, + Intervals.of("2018-01-01/2020-01-01"), + null, + true, + 10, + 10, + null + ) + ), + DruidExceptionMatcher.invalidInput().expectMessageIs( + "limit[10] cannot be provided when markAsUnused is enabled." + ) + ); + } + @Test public void testGetNumTotalBatchesWithBatchSizeSmallerThanLimit() { @@ -359,7 +769,8 @@ public void testGetNumTotalBatchesWithBatchSizeSmallerThanLimit() null, false, 5, - 10 + 10, + null ); Assert.assertEquals(2, (int) task.getNumTotalBatches()); } @@ -387,7 +798,9 @@ private KillTaskReport.Stats getReportedStats() try { Object payload = getObjectMapper().readValue( taskRunner.getTaskReportsFile(), - new TypeReference>() { } + new TypeReference>() + { + } ).get(KillTaskReport.REPORT_KEY).getPayload(); return getObjectMapper().convertValue(payload, KillTaskReport.Stats.class); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java index 94d21c144dc1..002c70262cb6 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java @@ -903,7 +903,13 @@ public DataSegment apply(String input) // manually create local segments files List segmentFiles = new ArrayList<>(); - for (DataSegment segment : mdc.retrieveUnusedSegmentsForInterval("test_kill_task", Intervals.of("2011-04-01/P4D"))) { + final List unusedSegments = mdc.retrieveUnusedSegmentsForInterval( + "test_kill_task", + Intervals.of("2011-04-01/P4D"), + null, + null + ); + for (DataSegment segment : unusedSegments) { File file = new File((String) segment.getLoadSpec().get("path")); FileUtils.mkdirp(file.getParentFile()); Files.write(file.toPath(), ByteArrays.EMPTY_ARRAY); @@ -918,6 +924,7 @@ public DataSegment apply(String input) null, false, null, + null, null ); @@ -993,7 +1000,13 @@ public DataSegment apply(String input) // manually create local segments files List segmentFiles = new ArrayList<>(); - for (DataSegment segment : mdc.retrieveUnusedSegmentsForInterval("test_kill_task", Intervals.of("2011-04-01/P4D"))) { + final List unusedSegments = mdc.retrieveUnusedSegmentsForInterval( + "test_kill_task", + Intervals.of("2011-04-01/P4D"), + null, + null + ); + for (DataSegment segment : unusedSegments) { File file = new File((String) segment.getLoadSpec().get("path")); FileUtils.mkdirp(file.getParentFile()); Files.write(file.toPath(), ByteArrays.EMPTY_ARRAY); @@ -1009,7 +1022,8 @@ public DataSegment apply(String input) null, false, null, - maxSegmentsToKill + maxSegmentsToKill, + null ); final TaskStatus status = runTask(killUnusedSegmentsTask); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java index cdaf5b9b359a..6897e69c26b9 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java @@ -926,7 +926,7 @@ public void testKillTaskIsAudited() auditManager ); - Task task = new KillUnusedSegmentsTask("kill_all", "allow", Intervals.ETERNITY, null, false, 10, null); + Task task = new KillUnusedSegmentsTask("kill_all", "allow", Intervals.ETERNITY, null, false, 10, null, null); overlordResource.taskPost(task, req); Assert.assertTrue(auditEntryCapture.hasCaptured()); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java index 2fc80adceac1..f42a300de5f3 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java @@ -34,6 +34,7 @@ import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.PartialShardSpec; +import org.joda.time.DateTime; import org.joda.time.Interval; import javax.annotation.Nullable; @@ -43,7 +44,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.stream.Stream; public class TestIndexerMetadataStorageCoordinator implements IndexerMetadataStorageCoordinator { @@ -104,31 +104,21 @@ public List retrieveUsedSegmentsForIntervals( return ImmutableList.of(); } - @Override - public List retrieveUnusedSegmentsForInterval(String dataSource, Interval interval) - { - synchronized (unusedSegments) { - return ImmutableList.copyOf(unusedSegments); - } - } - @Override public List retrieveUnusedSegmentsForInterval( String dataSource, Interval interval, - @Nullable Integer limit + @Nullable Integer limit, + @Nullable DateTime maxUsedStatusLastUpdatedTime ) { synchronized (unusedSegments) { - Stream resultStream = unusedSegments.stream(); - - resultStream = resultStream.filter(ds -> !nuked.contains(ds)); - - if (limit != null) { - resultStream = resultStream.limit(limit); - } - - return ImmutableList.copyOf(resultStream.iterator()); + return ImmutableList.copyOf( + unusedSegments.stream() + .filter(ds -> !nuked.contains(ds)) + .limit(limit != null ? limit : Long.MAX_VALUE) + .iterator() + ); } } diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQuery.java b/server/src/main/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQuery.java index 279c6699ff92..e5656ff39752 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQuery.java +++ b/server/src/main/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQuery.java @@ -21,16 +21,17 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.base.Preconditions; +import org.apache.druid.error.InvalidInput; +import org.joda.time.DateTime; import org.joda.time.Interval; import javax.annotation.Nullable; import java.util.Objects; /** - * Client representation of org.apache.druid.indexing.common.task.KillUnusedSegmentsTask. JSON searialization - * fields of this class must correspond to those of org.apache.druid.indexing.common.task.KillUnusedSegmentsTask, except - * for "id" and "context" fields. + * Client representation of {@link org.apache.druid.indexing.common.task.KillUnusedSegmentsTask}. JSON searialization + * fields of this class must correspond to those of {@link org.apache.druid.indexing.common.task.KillUnusedSegmentsTask}, + * except for {@code id} and {@code context} fields. */ public class ClientKillUnusedSegmentsTaskQuery implements ClientTaskQuery { @@ -42,6 +43,7 @@ public class ClientKillUnusedSegmentsTaskQuery implements ClientTaskQuery private final Boolean markAsUnused; private final Integer batchSize; @Nullable private final Integer limit; + @Nullable private final DateTime maxUsedStatusLastUpdatedTime; @JsonCreator public ClientKillUnusedSegmentsTaskQuery( @@ -50,16 +52,23 @@ public ClientKillUnusedSegmentsTaskQuery( @JsonProperty("interval") Interval interval, @JsonProperty("markAsUnused") @Deprecated Boolean markAsUnused, @JsonProperty("batchSize") Integer batchSize, - @JsonProperty("limit") Integer limit + @JsonProperty("limit") @Nullable Integer limit, + @JsonProperty("maxUsedStatusLastUpdatedTime") @Nullable DateTime maxUsedStatusLastUpdatedTime ) { - this.id = Preconditions.checkNotNull(id, "id"); + if (id == null) { + throw InvalidInput.exception("kill task id cannot be null"); + } + if (limit != null && limit <= 0) { + throw InvalidInput.exception("limit[%d] must be a positive integer.", limit); + } + this.id = id; this.dataSource = dataSource; this.interval = interval; this.markAsUnused = markAsUnused; this.batchSize = batchSize; - Preconditions.checkArgument(limit == null || limit > 0, "limit must be > 0"); this.limit = limit; + this.maxUsedStatusLastUpdatedTime = maxUsedStatusLastUpdatedTime; } @JsonProperty @@ -116,6 +125,13 @@ public Integer getLimit() return limit; } + @JsonProperty + @Nullable + public DateTime getMaxUsedStatusLastUpdatedTime() + { + return maxUsedStatusLastUpdatedTime; + } + @Override public boolean equals(Object o) @@ -132,12 +148,13 @@ public boolean equals(Object o) && Objects.equals(interval, that.interval) && Objects.equals(markAsUnused, that.markAsUnused) && Objects.equals(batchSize, that.batchSize) - && Objects.equals(limit, that.limit); + && Objects.equals(limit, that.limit) + && Objects.equals(maxUsedStatusLastUpdatedTime, that.maxUsedStatusLastUpdatedTime); } @Override public int hashCode() { - return Objects.hash(id, dataSource, interval, markAsUnused, batchSize, limit); + return Objects.hash(id, dataSource, interval, markAsUnused, batchSize, limit, maxUsedStatusLastUpdatedTime); } } diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java index dd0f7d8c98ab..31c975339007 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java @@ -24,6 +24,7 @@ import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.PartialShardSpec; +import org.joda.time.DateTime; import org.joda.time.Interval; import javax.annotation.Nullable; @@ -126,14 +127,6 @@ Collection retrieveUsedSegmentsForIntervals( Segments visibility ); - /** - * see {@link #retrieveUnusedSegmentsForInterval(String, Interval, Integer)} - */ - default List retrieveUnusedSegmentsForInterval(String dataSource, Interval interval) - { - return retrieveUnusedSegmentsForInterval(dataSource, interval, null); - } - /** * Retrieve all published segments which include ONLY data within the given interval and are marked as unused from the * metadata store. @@ -141,6 +134,10 @@ default List retrieveUnusedSegmentsForInterval(String dataSource, I * @param dataSource The data source the segments belong to * @param interval Filter the data segments to ones that include data in this interval exclusively. * @param limit The maximum number of unused segments to retreive. If null, no limit is applied. + * @param maxUsedStatusLastUpdatedTime The maximum {@code used_status_last_updated} time. Any unused segment in {@code interval} + * with {@code used_status_last_updated} no later than this time will be included in the + * kill task. Segments without {@code used_status_last_updated} time (due to an upgrade + * from legacy Druid) will have {@code maxUsedStatusLastUpdatedTime} ignored * * @return DataSegments which include ONLY data within the requested interval and are marked as unused. Segments NOT * returned here may include data in the interval @@ -148,7 +145,8 @@ default List retrieveUnusedSegmentsForInterval(String dataSource, I List retrieveUnusedSegmentsForInterval( String dataSource, Interval interval, - @Nullable Integer limit + @Nullable Integer limit, + @Nullable DateTime maxUsedStatusLastUpdatedTime ); /** diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 69dca46ea1c1..0ef488aed405 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -229,30 +229,34 @@ public List> retrieveUsedSegmentsAndCreatedDates(Strin ); } - @Override - public List retrieveUnusedSegmentsForInterval(final String dataSource, final Interval interval) - { - return retrieveUnusedSegmentsForInterval(dataSource, interval, null); - } - @Override public List retrieveUnusedSegmentsForInterval( String dataSource, Interval interval, - @Nullable Integer limit + @Nullable Integer limit, + @Nullable DateTime maxUsedStatusLastUpdatedTime ) { final List matchingSegments = connector.inReadOnlyTransaction( (handle, status) -> { try (final CloseableIterator iterator = SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables, jsonMapper) - .retrieveUnusedSegments(dataSource, Collections.singletonList(interval), limit, null, null)) { + .retrieveUnusedSegments( + dataSource, + Collections.singletonList(interval), + limit, + null, + null, + maxUsedStatusLastUpdatedTime + ) + ) { return ImmutableList.copyOf(iterator); } } ); - log.info("Found %,d unused segments for %s for interval %s.", matchingSegments.size(), dataSource, interval); + log.info("Found [%,d] unused segments for datasource[%s] in interval[%s] with maxUsedStatusLastUpdatedTime[%s].", + matchingSegments.size(), dataSource, interval, maxUsedStatusLastUpdatedTime); return matchingSegments; } diff --git a/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java b/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java index eb8a36bc3a78..94c2fae60fd5 100644 --- a/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java +++ b/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java @@ -163,17 +163,25 @@ Iterable iterateAllUnusedSegmentsForDatasource( Set retrieveAllDataSourceNames(); /** - * Returns top N unused segment intervals with the start time no earlier than the specified start time (if not null) - * and with the end time no later than the specified maxEndTime and with sed_status_last_updated time no later than - * maxLastUsedTime when ordered by segment start time, end time. Any segment having no used_status_last_updated time - * due to upgrade from legacy Druid means maxUsedFlagLastUpdatedTime is ignored for that segment. + * Returns a list of up to {@code limit} unused segment intervals for the specified datasource. Segments are filtered based on the following criteria: + * + *
  • The start time of the segment must be no earlier than the specified {@code minStartTime} (if not null).
  • + *
  • The end time of the segment must be no later than the specified {@code maxEndTime}.
  • + *
  • The {@code used_status_last_updated} time of the segment must be no later than {@code maxUsedStatusLastUpdatedTime}. + * Segments that have no {@code used_status_last_updated} time (due to an upgrade from legacy Druid) will + * have {@code maxUsedStatusLastUpdatedTime} ignored.
  • + * + *

    + * The list of intervals is ordered by segment start time and then by end time. + *

    */ List getUnusedSegmentIntervals( String dataSource, DateTime minStartTime, DateTime maxEndTime, int limit, - DateTime maxUsedFlagLastUpdatedTime); + DateTime maxUsedStatusLastUpdatedTime + ); @VisibleForTesting void poll(); diff --git a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java index 12d43ec5b76a..0b423006152e 100644 --- a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java +++ b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java @@ -687,7 +687,7 @@ private int doMarkAsUsedNonOvershadowedSegments(String dataSourceName, @Nullable } try (final CloseableIterator iterator = - queryTool.retrieveUnusedSegments(dataSourceName, intervals, null, null, null)) { + queryTool.retrieveUnusedSegments(dataSourceName, intervals, null, null, null, null)) { while (iterator.hasNext()) { final DataSegment dataSegment = iterator.next(); timeline.addSegments(Iterators.singletonIterator(dataSegment)); @@ -994,7 +994,7 @@ public Iterable iterateAllUnusedSegmentsForDatasource( ? Intervals.ONLY_ETERNITY : Collections.singletonList(interval); try (final CloseableIterator iterator = - queryTool.retrieveUnusedSegments(datasource, intervals, limit, lastSegmentId, sortOrder)) { + queryTool.retrieveUnusedSegments(datasource, intervals, limit, lastSegmentId, sortOrder, null)) { return ImmutableList.copyOf(iterator); } } @@ -1138,7 +1138,7 @@ public List getUnusedSegmentIntervals( @Nullable final DateTime minStartTime, final DateTime maxEndTime, final int limit, - DateTime maxUsedFlagLastUpdatedTime + DateTime maxUsedStatusLastUpdatedTime ) { // Note that we handle the case where used_status_last_updated IS NULL here to allow smooth transition to Druid version that uses used_status_last_updated column @@ -1162,7 +1162,7 @@ public List inTransaction(Handle handle, TransactionStatus status) .setMaxRows(limit) .bind("dataSource", dataSource) .bind("end", maxEndTime.toString()) - .bind("used_status_last_updated", maxUsedFlagLastUpdatedTime.toString()) + .bind("used_status_last_updated", maxUsedStatusLastUpdatedTime.toString()) .map( new BaseResultSetMapper() { @@ -1182,7 +1182,6 @@ protected Interval mapInternal(int index, Map row) Iterator iter = sql.iterator(); - List result = Lists.newArrayListWithCapacity(limit); for (int i = 0; i < limit && iter.hasNext(); i++) { try { diff --git a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java index 545acb84507e..3bd7c48ad051 100644 --- a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java +++ b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java @@ -34,6 +34,7 @@ import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; +import org.joda.time.DateTime; import org.joda.time.Interval; import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.PreparedBatch; @@ -117,7 +118,16 @@ public CloseableIterator retrieveUsedSegments( final Collection intervals ) { - return retrieveSegments(dataSource, intervals, IntervalMode.OVERLAPS, true, null, null, null); + return retrieveSegments( + dataSource, + intervals, + IntervalMode.OVERLAPS, + true, + null, + null, + null, + null + ); } /** @@ -135,6 +145,10 @@ public CloseableIterator retrieveUsedSegments( * lexigraphically if sortOrder is DESC. * @param sortOrder Specifies the order with which to return the matching segments by start time, end time. * A null value indicates that order does not matter. + * @param maxUsedStatusLastUpdatedTime The maximum {@code used_status_last_updated} time. Any unused segment in {@code intervals} + * with {@code used_status_last_updated} no later than this time will be included in the + * iterator. Segments without {@code used_status_last_updated} time (due to an upgrade + * from legacy Druid) will have {@code maxUsedStatusLastUpdatedTime} ignored * Returns a closeable iterator. You should close it when you are done. */ @@ -143,10 +157,20 @@ public CloseableIterator retrieveUnusedSegments( final Collection intervals, @Nullable final Integer limit, @Nullable final String lastSegmentId, - @Nullable final SortOrder sortOrder + @Nullable final SortOrder sortOrder, + @Nullable final DateTime maxUsedStatusLastUpdatedTime ) { - return retrieveSegments(dataSource, intervals, IntervalMode.CONTAINS, false, limit, lastSegmentId, sortOrder); + return retrieveSegments( + dataSource, + intervals, + IntervalMode.CONTAINS, + false, + limit, + lastSegmentId, + sortOrder, + maxUsedStatusLastUpdatedTime + ); } /** @@ -241,6 +265,7 @@ public int markSegmentsUnused(final String dataSource, final Interval interval) true, null, null, + null, null ), DataSegment::getId @@ -379,12 +404,13 @@ private CloseableIterator retrieveSegments( final boolean used, @Nullable final Integer limit, @Nullable final String lastSegmentId, - @Nullable final SortOrder sortOrder + @Nullable final SortOrder sortOrder, + @Nullable final DateTime maxUsedStatusLastUpdatedTime ) { if (intervals.isEmpty() || intervals.size() <= MAX_INTERVALS_PER_BATCH) { return CloseableIterators.withEmptyBaggage( - retrieveSegmentsInIntervalsBatch(dataSource, intervals, matchMode, used, limit, lastSegmentId, sortOrder) + retrieveSegmentsInIntervalsBatch(dataSource, intervals, matchMode, used, limit, lastSegmentId, sortOrder, maxUsedStatusLastUpdatedTime) ); } else { final List> intervalsLists = Lists.partition(new ArrayList<>(intervals), MAX_INTERVALS_PER_BATCH); @@ -399,7 +425,8 @@ private CloseableIterator retrieveSegments( used, limitPerBatch, lastSegmentId, - sortOrder + sortOrder, + maxUsedStatusLastUpdatedTime ); if (limitPerBatch != null) { // If limit is provided, we need to shrink the limit for subsequent batches or circuit break if @@ -425,7 +452,8 @@ private UnmodifiableIterator retrieveSegmentsInIntervalsBatch( final boolean used, @Nullable final Integer limit, @Nullable final String lastSegmentId, - @Nullable final SortOrder sortOrder + @Nullable final SortOrder sortOrder, + @Nullable final DateTime maxUsedStatusLastUpdatedTime ) { // Check if the intervals all support comparing as strings. If so, bake them into the SQL. @@ -438,6 +466,12 @@ private UnmodifiableIterator retrieveSegmentsInIntervalsBatch( appendConditionForIntervalsAndMatchMode(sb, intervals, matchMode, connector); } + // Add the used_status_last_updated time filter only for unused segments when maxUsedStatusLastUpdatedTime is non-null. + final boolean addMaxUsedLastUpdatedTimeFilter = !used && maxUsedStatusLastUpdatedTime != null; + if (addMaxUsedLastUpdatedTimeFilter) { + sb.append(" AND (used_status_last_updated IS NOT NULL AND used_status_last_updated <= :used_status_last_updated)"); + } + if (lastSegmentId != null) { sb.append( StringUtils.format( @@ -462,10 +496,16 @@ private UnmodifiableIterator retrieveSegmentsInIntervalsBatch( .setFetchSize(connector.getStreamingFetchSize()) .bind("used", used) .bind("dataSource", dataSource); + + if (addMaxUsedLastUpdatedTimeFilter) { + sql.bind("used_status_last_updated", maxUsedStatusLastUpdatedTime.toString()); + } + if (lastSegmentId != null) { sql.bind("id", lastSegmentId); } - if (null != limit) { + + if (limit != null) { sql.setMaxRows(limit); } diff --git a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java index 6a8e515b3270..422803492d8e 100644 --- a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java +++ b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java @@ -34,6 +34,7 @@ import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.metadata.LockFilterPolicy; import org.apache.druid.rpc.ServiceRetryPolicy; +import org.joda.time.DateTime; import org.joda.time.Interval; import javax.annotation.Nullable; @@ -73,33 +74,20 @@ public interface OverlordClient /** * Run a "kill" task for a particular datasource and interval. Shortcut to {@link #runTask(String, Object)}. - * - * The kill task deletes all unused segment records from deep storage and the metadata store. The task runs - * asynchronously after the API call returns. The resolved future is the ID of the task, which can be used to - * monitor its progress through the {@link #taskStatus(String)} API. - * - * @param idPrefix Descriptive prefix to include at the start of task IDs - * @param dataSource Datasource to kill - * @param interval Interval to kill - * - * @return future with task ID - */ - default ListenableFuture runKillTask(String idPrefix, String dataSource, Interval interval) - { - return runKillTask(idPrefix, dataSource, interval, null); - } - - /** - * Run a "kill" task for a particular datasource and interval. Shortcut to {@link #runTask(String, Object)}. - * * The kill task deletes all unused segment records from deep storage and the metadata store. The task runs * asynchronously after the API call returns. The resolved future is the ID of the task, which can be used to * monitor its progress through the {@link #taskStatus(String)} API. * * @param idPrefix Descriptive prefix to include at the start of task IDs * @param dataSource Datasource to kill - * @param interval Interval to kill + * @param interval Umbrella interval to be considered by the kill task. Note that unused segments falling in this + * widened umbrella interval may have different {@code used_status_last_updated} time, so the kill task + * should also filter by {@code maxUsedStatusLastUpdatedTime} * @param maxSegmentsToKill The maximum number of segments to kill + * @param maxUsedStatusLastUpdatedTime The maximum {@code used_status_last_updated} time. Any unused segment in {@code interval} + * with {@code used_status_last_updated} no later than this time will be included in the + * kill task. Segments without {@code used_status_last_updated} time (due to an upgrade + * from legacy Druid) will have {@code maxUsedStatusLastUpdatedTime} ignored * * @return future with task ID */ @@ -107,7 +95,8 @@ default ListenableFuture runKillTask( String idPrefix, String dataSource, Interval interval, - @Nullable Integer maxSegmentsToKill + @Nullable Integer maxSegmentsToKill, + @Nullable DateTime maxUsedStatusLastUpdatedTime ) { final String taskId = IdUtils.newTaskId(idPrefix, ClientKillUnusedSegmentsTaskQuery.TYPE, dataSource, interval); @@ -117,7 +106,8 @@ default ListenableFuture runKillTask( interval, false, null, - maxSegmentsToKill + maxSegmentsToKill, + maxUsedStatusLastUpdatedTime ); return FutureUtils.transform(runTask(taskId, taskQuery), ignored -> taskId); } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java index da7847f45053..fb166362345c 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java @@ -20,10 +20,10 @@ package org.apache.druid.server.coordinator.duty; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; import com.google.common.base.Predicate; import com.google.common.collect.ImmutableList; import org.apache.druid.common.guava.FutureUtils; +import org.apache.druid.error.InvalidInput; import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.JodaUtils; @@ -37,6 +37,7 @@ import org.apache.druid.server.coordinator.stats.Stats; import org.apache.druid.utils.CollectionUtils; import org.joda.time.DateTime; +import org.joda.time.Duration; import org.joda.time.Interval; import javax.annotation.Nullable; @@ -46,13 +47,19 @@ import java.util.concurrent.ConcurrentHashMap; /** + *

    * Completely removes information about unused segments who have an interval end that comes before - * now - {@link #retainDuration} from the metadata store. retainDuration can be a positive or negative duration, - * negative meaning the interval end target will be in the future. Also, retainDuration can be ignored, - * meaning that there is no upper bound to the end interval of segments that will be killed. This action is called - * "to kill a segment". + * now - {@link #durationToRetain} from the metadata store. {@link #durationToRetain} can be a positive or negative duration, + * negative meaning the interval end target will be in the future. Also, {@link #durationToRetain} can be ignored if + * {@link #ignoreDurationToRetain} is enabled, meaning that there is no upper bound to the end interval of segments that + * will be killed. The umbrella interval of the unused segments per datasource to be killed is determined by + * {@link #findIntervalForKill(String, DateTime)}, which takes into account the configured {@link #bufferPeriod}. However, + * the kill task needs to check again for max {@link #bufferPeriod} for the unused segments in the widened interval + * as there can be multiple unused segments with different {@code used_status_last_updated} time. + *

    *

    - * See org.apache.druid.indexing.common.task.KillUnusedSegmentsTask. + * See {@link org.apache.druid.indexing.common.task.KillUnusedSegmentsTask}. + *

    */ public class KillUnusedSegments implements CoordinatorDuty { @@ -63,9 +70,9 @@ public class KillUnusedSegments implements CoordinatorDuty && (KILL_TASK_TYPE.equals(status.getType()) && status.getId().startsWith(TASK_ID_PREFIX)); private static final Logger log = new Logger(KillUnusedSegments.class); - private final long period; - private final long retainDuration; - private final boolean ignoreRetainDuration; + private final Duration period; + private final Duration durationToRetain; + private final boolean ignoreDurationToRetain; private final int maxSegmentsToKill; /** @@ -73,7 +80,7 @@ public class KillUnusedSegments implements CoordinatorDuty * datasource. */ private final Map datasourceToLastKillIntervalEnd; - private long lastKillTime = 0; + private DateTime lastKillTime; private final long bufferPeriod; private final SegmentsMetadataManager segmentsMetadataManager; @@ -85,32 +92,37 @@ public KillUnusedSegments( DruidCoordinatorConfig config ) { - this.period = config.getCoordinatorKillPeriod().getMillis(); - Preconditions.checkArgument( - this.period >= config.getCoordinatorIndexingPeriod().getMillis(), - "coordinator kill period must be greater than or equal to druid.coordinator.period.indexingPeriod" - ); - - this.ignoreRetainDuration = config.getCoordinatorKillIgnoreDurationToRetain(); - this.retainDuration = config.getCoordinatorKillDurationToRetain().getMillis(); - if (this.ignoreRetainDuration) { + if (config.getCoordinatorKillPeriod().getMillis() < config.getCoordinatorIndexingPeriod().getMillis()) { + throw InvalidInput.exception( + "druid.coordinator.kill.period[%s] must be >= druid.coordinator.period.indexingPeriod[%s]", + config.getCoordinatorKillPeriod(), + config.getCoordinatorIndexingPeriod() + ); + } + if (config.getCoordinatorKillMaxSegments() < 0) { + throw InvalidInput.exception( + "druid.coordinator.kill.maxSegments[%s] is invalid. It must be a positive integer.", + config.getCoordinatorKillMaxSegments() + ); + } + this.period = config.getCoordinatorKillPeriod(); + this.ignoreDurationToRetain = config.getCoordinatorKillIgnoreDurationToRetain(); + this.durationToRetain = config.getCoordinatorKillDurationToRetain(); + if (this.ignoreDurationToRetain) { log.debug( - "druid.coordinator.kill.durationToRetain [%s] will be ignored when discovering segments to kill " - + "because you have set druid.coordinator.kill.ignoreDurationToRetain to True.", - this.retainDuration + "druid.coordinator.kill.durationToRetain[%s] will be ignored when discovering segments to kill " + + "because druid.coordinator.kill.ignoreDurationToRetain is set to true.", + this.durationToRetain ); } this.bufferPeriod = config.getCoordinatorKillBufferPeriod().getMillis(); - this.maxSegmentsToKill = config.getCoordinatorKillMaxSegments(); - Preconditions.checkArgument(this.maxSegmentsToKill > 0, "coordinator kill maxSegments must be > 0"); - datasourceToLastKillIntervalEnd = new ConcurrentHashMap<>(); log.info( - "Kill Task scheduling enabled with period [%s], retainDuration [%s], bufferPeriod [%s], maxSegmentsToKill [%s]", + "Kill task scheduling enabled with period[%s], durationToRetain[%s], bufferPeriod[%s], maxSegmentsToKill[%s]", this.period, - this.ignoreRetainDuration ? "IGNORING" : this.retainDuration, + this.ignoreDurationToRetain ? "IGNORING" : this.durationToRetain, this.bufferPeriod, this.maxSegmentsToKill ); @@ -122,9 +134,12 @@ public KillUnusedSegments( @Override public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) { - final long currentTimeMillis = System.currentTimeMillis(); - if (lastKillTime + period > currentTimeMillis) { - log.debug("Skipping kill of unused segments as kill period has not elapsed yet."); + if (!canDutyRun()) { + log.debug( + "Skipping KillUnusedSegments until period[%s] has elapsed after lastKillTime[%s].", + period, + lastKillTime + ); return params; } @@ -159,10 +174,9 @@ DruidCoordinatorRuntimeParams runInternal(DruidCoordinatorRuntimeParams params) dataSourcesToKill = segmentsMetadataManager.retrieveAllDataSourceNames(); } - log.debug("Killing unused segments in datasources: %s", dataSourcesToKill); - lastKillTime = System.currentTimeMillis(); + log.debug("Killing unused segments for datasources[%s]", dataSourcesToKill); + lastKillTime = DateTimes.nowUtc(); taskStats.submittedTasks = killUnusedSegments(dataSourcesToKill, availableKillTaskSlots); - } // any datasources that are no longer being considered for kill should have their @@ -196,26 +210,31 @@ private int killUnusedSegments( + "on the next coordinator cycle.", submittedTasks, availableKillTaskSlots)); break; } - final Interval intervalToKill = findIntervalForKill(dataSource); + final DateTime maxUsedStatusLastUpdatedTime = DateTimes.nowUtc().minus(bufferPeriod); + final Interval intervalToKill = findIntervalForKill(dataSource, maxUsedStatusLastUpdatedTime); if (intervalToKill == null) { datasourceToLastKillIntervalEnd.remove(dataSource); continue; } try { - FutureUtils.getUnchecked(overlordClient.runKillTask( - TASK_ID_PREFIX, - dataSource, - intervalToKill, - maxSegmentsToKill - ), true); + FutureUtils.getUnchecked( + overlordClient.runKillTask( + TASK_ID_PREFIX, + dataSource, + intervalToKill, + maxSegmentsToKill, + maxUsedStatusLastUpdatedTime + ), + true + ); ++submittedTasks; datasourceToLastKillIntervalEnd.put(dataSource, intervalToKill.getEnd()); } catch (Exception ex) { - log.error(ex, "Failed to submit kill task for dataSource [%s]", dataSource); + log.error(ex, "Failed to submit kill task for dataSource[%s] in interval[%s]", dataSource, intervalToKill); if (Thread.currentThread().isInterrupted()) { - log.warn("skipping kill task scheduling because thread is interrupted."); + log.warn("Skipping kill task scheduling because thread is interrupted."); break; } } @@ -244,14 +263,13 @@ private int killUnusedSegments( * Calculates the interval for which segments are to be killed in a datasource. */ @Nullable - private Interval findIntervalForKill(String dataSource) + private Interval findIntervalForKill(String dataSource, DateTime maxUsedStatusLastUpdatedTime) { - final DateTime maxEndTime = ignoreRetainDuration + final DateTime maxEndTime = ignoreDurationToRetain ? DateTimes.COMPARE_DATE_AS_STRING_MAX - : DateTimes.nowUtc().minus(retainDuration); - + : DateTimes.nowUtc().minus(durationToRetain); List unusedSegmentIntervals = segmentsMetadataManager - .getUnusedSegmentIntervals(dataSource, datasourceToLastKillIntervalEnd.get(dataSource), maxEndTime, maxSegmentsToKill, DateTimes.nowUtc().minus(bufferPeriod)); + .getUnusedSegmentIntervals(dataSource, datasourceToLastKillIntervalEnd.get(dataSource), maxEndTime, maxSegmentsToKill, maxUsedStatusLastUpdatedTime); if (CollectionUtils.isNullOrEmpty(unusedSegmentIntervals)) { return null; @@ -270,6 +288,11 @@ private int getAvailableKillTaskSlots(int killTaskCapacity, int numActiveKillTas ); } + private boolean canDutyRun() + { + return lastKillTime == null || !DateTimes.nowUtc().isBefore(lastKillTime.plus(period)); + } + @VisibleForTesting static int getKillTaskCapacity(int totalWorkerCapacity, double killTaskSlotRatio, int maxKillTaskSlots) { diff --git a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java index a539a48ecb74..1546544029e8 100644 --- a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java +++ b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java @@ -380,7 +380,7 @@ public Response killUnusedSegmentsInInterval( final Interval theInterval = Intervals.of(interval.replace('_', '/')); try { final String killTaskId = FutureUtils.getUnchecked( - overlordClient.runKillTask("api-issued", dataSourceName, theInterval, null), + overlordClient.runKillTask("api-issued", dataSourceName, theInterval, null, null), true ); auditManager.doAudit( diff --git a/server/src/test/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQueryTest.java b/server/src/test/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQueryTest.java index 60edff930771..0059d048e063 100644 --- a/server/src/test/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQueryTest.java +++ b/server/src/test/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQueryTest.java @@ -48,7 +48,8 @@ public void setUp() INTERVAL, true, BATCH_SIZE, - LIMIT + LIMIT, + null ); } diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index 4c3534feacdb..0626792f1fd4 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -372,10 +372,10 @@ public int getSqlMetadataMaxRetry() private void markAllSegmentsUnused() { - markAllSegmentsUnused(SEGMENTS); + markAllSegmentsUnused(SEGMENTS, DateTimes.nowUtc()); } - private void markAllSegmentsUnused(Set segments) + private void markAllSegmentsUnused(Set segments, DateTime usedStatusLastUpdatedTime) { for (final DataSegment segment : segments) { Assert.assertEquals( @@ -386,7 +386,10 @@ private void markAllSegmentsUnused(Set segments) "UPDATE %s SET used = false, used_status_last_updated = :used_status_last_updated WHERE id = :id", derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable() ); - return handle.createStatement(request).bind("id", segment.getId().toString()).bind("used_status_last_updated", DateTimes.nowUtc().toString()).execute(); + return handle.createStatement(request) + .bind("id", segment.getId().toString()) + .bind("used_status_last_updated", usedStatusLastUpdatedTime.toString() + ).execute(); } ) ); @@ -977,7 +980,7 @@ public void testRetrieveUsedSegmentForId() public void testRetrieveSegmentForId() { insertUsedSegments(ImmutableSet.of(defaultSegment)); - markAllSegmentsUnused(ImmutableSet.of(defaultSegment)); + markAllSegmentsUnused(ImmutableSet.of(defaultSegment), DateTimes.nowUtc()); Assert.assertEquals(defaultSegment, coordinator.retrieveSegmentForId(defaultSegment.getId().toString(), true)); } @@ -1147,11 +1150,12 @@ public void testRetrieveAllUsedSegmentsUsingNoIntervals() throws IOException public void testRetrieveUnusedSegmentsUsingSingleIntervalAndNoLimit() throws IOException { final List segments = createAndGetUsedYearSegments(1900, 2133); - markAllSegmentsUnused(new HashSet<>(segments)); + markAllSegmentsUnused(new HashSet<>(segments), DateTimes.nowUtc()); final List actualUnusedSegments = coordinator.retrieveUnusedSegmentsForInterval( DS.WIKI, Intervals.of("1900/3000"), + null, null ); @@ -1163,13 +1167,14 @@ public void testRetrieveUnusedSegmentsUsingSingleIntervalAndNoLimit() throws IOE public void testRetrieveUnusedSegmentsUsingSingleIntervalAndLimitAtRange() throws IOException { final List segments = createAndGetUsedYearSegments(1900, 2133); - markAllSegmentsUnused(new HashSet<>(segments)); + markAllSegmentsUnused(new HashSet<>(segments), DateTimes.nowUtc()); final int requestedLimit = segments.size(); final List actualUnusedSegments = coordinator.retrieveUnusedSegmentsForInterval( DS.WIKI, Intervals.of("1900/3000"), - requestedLimit + requestedLimit, + null ); Assert.assertEquals(requestedLimit, actualUnusedSegments.size()); @@ -1180,13 +1185,14 @@ public void testRetrieveUnusedSegmentsUsingSingleIntervalAndLimitAtRange() throw public void testRetrieveUnusedSegmentsUsingSingleIntervalAndLimitInRange() throws IOException { final List segments = createAndGetUsedYearSegments(1900, 2133); - markAllSegmentsUnused(new HashSet<>(segments)); + markAllSegmentsUnused(new HashSet<>(segments), DateTimes.nowUtc()); final int requestedLimit = segments.size() - 1; final List actualUnusedSegments = coordinator.retrieveUnusedSegmentsForInterval( DS.WIKI, Intervals.of("1900/3000"), - requestedLimit + requestedLimit, + null ); Assert.assertEquals(requestedLimit, actualUnusedSegments.size()); @@ -1197,13 +1203,14 @@ public void testRetrieveUnusedSegmentsUsingSingleIntervalAndLimitInRange() throw public void testRetrieveUnusedSegmentsUsingSingleIntervalAndLimitOutOfRange() throws IOException { final List segments = createAndGetUsedYearSegments(1900, 2133); - markAllSegmentsUnused(new HashSet<>(segments)); + markAllSegmentsUnused(new HashSet<>(segments), DateTimes.nowUtc()); final int limit = segments.size() + 1; final List actualUnusedSegments = coordinator.retrieveUnusedSegmentsForInterval( DS.WIKI, Intervals.of("1900/3000"), - limit + limit, + null ); Assert.assertEquals(segments.size(), actualUnusedSegments.size()); Assert.assertTrue(actualUnusedSegments.containsAll(segments)); @@ -1213,7 +1220,7 @@ public void testRetrieveUnusedSegmentsUsingSingleIntervalAndLimitOutOfRange() th public void testRetrieveUnusedSegmentsUsingSingleIntervalOutOfRange() throws IOException { final List segments = createAndGetUsedYearSegments(1905, 1910); - markAllSegmentsUnused(new HashSet<>(segments)); + markAllSegmentsUnused(new HashSet<>(segments), DateTimes.nowUtc()); final Interval outOfRangeInterval = Intervals.of("1700/1800"); Assert.assertTrue(segments.stream() @@ -1223,7 +1230,8 @@ public void testRetrieveUnusedSegmentsUsingSingleIntervalOutOfRange() throws IOE final List actualUnusedSegments = coordinator.retrieveUnusedSegmentsForInterval( DS.WIKI, outOfRangeInterval, - limit + limit, + null ); Assert.assertEquals(0, actualUnusedSegments.size()); } @@ -1232,12 +1240,13 @@ public void testRetrieveUnusedSegmentsUsingSingleIntervalOutOfRange() throws IOE public void testRetrieveUnusedSegmentsUsingMultipleIntervalsAndNoLimit() throws IOException { final List segments = createAndGetUsedYearSegments(1900, 2133); - markAllSegmentsUnused(new HashSet<>(segments)); + markAllSegmentsUnused(new HashSet<>(segments), DateTimes.nowUtc()); final ImmutableList actualUnusedSegments = retrieveUnusedSegments( segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()), null, null, + null, null ); Assert.assertEquals(segments.size(), actualUnusedSegments.size()); @@ -1248,12 +1257,13 @@ public void testRetrieveUnusedSegmentsUsingMultipleIntervalsAndNoLimit() throws public void testRetrieveUnusedSegmentsUsingNoIntervalsNoLimitAndNoLastSegmentId() throws IOException { final List segments = createAndGetUsedYearSegments(1900, 2133); - markAllSegmentsUnused(new HashSet<>(segments)); + markAllSegmentsUnused(new HashSet<>(segments), DateTimes.nowUtc()); final ImmutableList actualUnusedSegments = retrieveUnusedSegments( ImmutableList.of(), null, null, + null, null ); Assert.assertEquals(segments.size(), actualUnusedSegments.size()); @@ -1264,7 +1274,7 @@ public void testRetrieveUnusedSegmentsUsingNoIntervalsNoLimitAndNoLastSegmentId( public void testRetrieveUnusedSegmentsUsingNoIntervalsAndNoLimitAndNoLastSegmentId() throws IOException { final List segments = createAndGetUsedYearSegments(2033, 2133); - markAllSegmentsUnused(new HashSet<>(segments)); + markAllSegmentsUnused(new HashSet<>(segments), DateTimes.nowUtc()); String lastSegmentId = segments.get(9).getId().toString(); final List expectedSegmentsAscOrder = segments.stream() @@ -1274,6 +1284,7 @@ public void testRetrieveUnusedSegmentsUsingNoIntervalsAndNoLimitAndNoLastSegment ImmutableList.of(), null, lastSegmentId, + null, null ); Assert.assertEquals(expectedSegmentsAscOrder.size(), actualUnusedSegments.size()); @@ -1283,7 +1294,8 @@ public void testRetrieveUnusedSegmentsUsingNoIntervalsAndNoLimitAndNoLastSegment ImmutableList.of(), null, lastSegmentId, - SortOrder.ASC + SortOrder.ASC, + null ); Assert.assertEquals(expectedSegmentsAscOrder.size(), actualUnusedSegments.size()); Assert.assertEquals(expectedSegmentsAscOrder, actualUnusedSegments); @@ -1297,7 +1309,8 @@ public void testRetrieveUnusedSegmentsUsingNoIntervalsAndNoLimitAndNoLastSegment ImmutableList.of(), null, lastSegmentId, - SortOrder.DESC + SortOrder.DESC, + null ); Assert.assertEquals(expectedSegmentsDescOrder.size(), actualUnusedSegments.size()); Assert.assertEquals(expectedSegmentsDescOrder, actualUnusedSegments); @@ -1307,12 +1320,13 @@ public void testRetrieveUnusedSegmentsUsingNoIntervalsAndNoLimitAndNoLastSegment public void testRetrieveUnusedSegmentsUsingMultipleIntervalsAndLimitAtRange() throws IOException { final List segments = createAndGetUsedYearSegments(1900, 2133); - markAllSegmentsUnused(new HashSet<>(segments)); + markAllSegmentsUnused(new HashSet<>(segments), DateTimes.nowUtc()); final ImmutableList actualUnusedSegments = retrieveUnusedSegments( segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()), segments.size(), null, + null, null ); Assert.assertEquals(segments.size(), actualUnusedSegments.size()); @@ -1323,13 +1337,14 @@ public void testRetrieveUnusedSegmentsUsingMultipleIntervalsAndLimitAtRange() th public void testRetrieveUnusedSegmentsUsingMultipleIntervalsAndLimitInRange() throws IOException { final List segments = createAndGetUsedYearSegments(1900, 2133); - markAllSegmentsUnused(new HashSet<>(segments)); + markAllSegmentsUnused(new HashSet<>(segments), DateTimes.nowUtc()); final int requestedLimit = segments.size() - 1; final ImmutableList actualUnusedSegments = retrieveUnusedSegments( segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()), requestedLimit, null, + null, null ); Assert.assertEquals(requestedLimit, actualUnusedSegments.size()); @@ -1340,7 +1355,7 @@ public void testRetrieveUnusedSegmentsUsingMultipleIntervalsAndLimitInRange() th public void testRetrieveUnusedSegmentsUsingMultipleIntervalsInSingleBatchLimitAndLastSegmentId() throws IOException { final List segments = createAndGetUsedYearSegments(2034, 2133); - markAllSegmentsUnused(new HashSet<>(segments)); + markAllSegmentsUnused(new HashSet<>(segments), DateTimes.nowUtc()); final int requestedLimit = segments.size(); final String lastSegmentId = segments.get(4).getId().toString(); @@ -1348,6 +1363,7 @@ public void testRetrieveUnusedSegmentsUsingMultipleIntervalsInSingleBatchLimitAn segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()), requestedLimit, lastSegmentId, + null, null ); Assert.assertEquals(segments.size() - 5, actualUnusedSegments.size()); @@ -1361,7 +1377,7 @@ public void testRetrieveUnusedSegmentsUsingMultipleIntervalsInSingleBatchLimitAn public void testRetrieveUnusedSegmentsUsingMultipleIntervalsLimitAndLastSegmentId() throws IOException { final List segments = createAndGetUsedYearSegments(1900, 2133); - markAllSegmentsUnused(new HashSet<>(segments)); + markAllSegmentsUnused(new HashSet<>(segments), DateTimes.nowUtc()); final int requestedLimit = segments.size() - 1; final String lastSegmentId = segments.get(4).getId().toString(); @@ -1369,6 +1385,7 @@ public void testRetrieveUnusedSegmentsUsingMultipleIntervalsLimitAndLastSegmentI segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()), requestedLimit, lastSegmentId, + null, null ); Assert.assertEquals(requestedLimit - 4, actualUnusedSegments.size()); @@ -1382,12 +1399,13 @@ public void testRetrieveUnusedSegmentsUsingMultipleIntervalsLimitAndLastSegmentI public void testRetrieveUnusedSegmentsUsingMultipleIntervals() throws IOException { final List segments = createAndGetUsedYearSegments(1900, 2133); - markAllSegmentsUnused(new HashSet<>(segments)); + markAllSegmentsUnused(new HashSet<>(segments), DateTimes.nowUtc()); final ImmutableList actualUnusedSegments = retrieveUnusedSegments( segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()), segments.size() + 1, null, + null, null ); Assert.assertEquals(segments.size(), actualUnusedSegments.size()); @@ -1398,7 +1416,7 @@ public void testRetrieveUnusedSegmentsUsingMultipleIntervals() throws IOExceptio public void testRetrieveUnusedSegmentsUsingIntervalOutOfRange() throws IOException { final List segments = createAndGetUsedYearSegments(1905, 1910); - markAllSegmentsUnused(new HashSet<>(segments)); + markAllSegmentsUnused(new HashSet<>(segments), DateTimes.nowUtc()); final Interval outOfRangeInterval = Intervals.of("1700/1800"); Assert.assertTrue(segments.stream() @@ -1408,11 +1426,82 @@ public void testRetrieveUnusedSegmentsUsingIntervalOutOfRange() throws IOExcepti ImmutableList.of(outOfRangeInterval), null, null, - null + null, + null ); Assert.assertEquals(0, actualUnusedSegments.size()); } + @Test + public void testRetrieveUnusedSegmentsWithMaxUsedStatusLastUpdatedTime() throws IOException + { + final List segments = createAndGetUsedYearSegments(1905, 1910); + markAllSegmentsUnused(new HashSet<>(segments), DateTimes.nowUtc()); + + final Interval interval = Intervals.of("1905/1920"); + + final ImmutableList actualUnusedSegments1 = retrieveUnusedSegments( + ImmutableList.of(interval), + null, + null, + null, + DateTimes.nowUtc() + ); + Assert.assertEquals(5, actualUnusedSegments1.size()); + + final ImmutableList actualUnusedSegments2 = retrieveUnusedSegments( + ImmutableList.of(interval), + null, + null, + null, + DateTimes.nowUtc().minusHours(1) + ); + Assert.assertEquals(0, actualUnusedSegments2.size()); + } + + @Test + public void testRetrieveUnusedSegmentsWithMaxUsedStatusLastUpdatedTime2() throws IOException + { + final List segments = createAndGetUsedYearSegments(1900, 1950); + final List evenYearSegments = new ArrayList<>(); + final List oddYearSegments = new ArrayList<>(); + + for (int i = 0; i < segments.size(); i++) { + DataSegment dataSegment = segments.get(i); + if (i % 2 == 0) { + evenYearSegments.add(dataSegment); + } else { + oddYearSegments.add(dataSegment); + } + } + + final DateTime maxUsedStatusLastUpdatedTime1 = DateTimes.nowUtc(); + markAllSegmentsUnused(new HashSet<>(oddYearSegments), maxUsedStatusLastUpdatedTime1); + + final DateTime maxUsedStatusLastUpdatedTime2 = DateTimes.nowUtc(); + markAllSegmentsUnused(new HashSet<>(evenYearSegments), maxUsedStatusLastUpdatedTime2); + + final Interval interval = Intervals.of("1900/1950"); + + final ImmutableList actualUnusedSegments1 = retrieveUnusedSegments( + ImmutableList.of(interval), + null, + null, + null, + maxUsedStatusLastUpdatedTime1 + ); + Assert.assertEquals(oddYearSegments.size(), actualUnusedSegments1.size()); + + final ImmutableList actualUnusedSegments2 = retrieveUnusedSegments( + ImmutableList.of(interval), + null, + null, + null, + maxUsedStatusLastUpdatedTime2 + ); + Assert.assertEquals(segments.size(), actualUnusedSegments2.size()); + } + @Test public void testSimpleUnusedList() throws IOException { @@ -1423,7 +1512,9 @@ public void testSimpleUnusedList() throws IOException ImmutableSet.copyOf( coordinator.retrieveUnusedSegmentsForInterval( defaultSegment.getDataSource(), - defaultSegment.getInterval() + defaultSegment.getInterval(), + null, + null ) ) ); @@ -1439,7 +1530,8 @@ public void testSimpleUnusedListWithLimit() throws IOException coordinator.retrieveUnusedSegmentsForInterval( defaultSegment.getDataSource(), defaultSegment.getInterval(), - limit + limit, + null ) ); Assert.assertEquals(limit, retreivedUnusedSegments.size()); @@ -1551,7 +1643,9 @@ public void testUnusedOverlapLow() throws IOException new Interval( defaultSegment.getInterval().getStart().minus(1), defaultSegment.getInterval().getStart().plus(1) - ) + ), + null, + null ).isEmpty() ); } @@ -1564,7 +1658,9 @@ public void testUnusedUnderlapLow() throws IOException Assert.assertTrue( coordinator.retrieveUnusedSegmentsForInterval( defaultSegment.getDataSource(), - new Interval(defaultSegment.getInterval().getStart().plus(1), defaultSegment.getInterval().getEnd()) + new Interval(defaultSegment.getInterval().getStart().plus(1), defaultSegment.getInterval().getEnd()), + null, + null ).isEmpty() ); } @@ -1578,7 +1674,9 @@ public void testUnusedUnderlapHigh() throws IOException Assert.assertTrue( coordinator.retrieveUnusedSegmentsForInterval( defaultSegment.getDataSource(), - new Interval(defaultSegment.getInterval().getStart(), defaultSegment.getInterval().getEnd().minus(1)) + new Interval(defaultSegment.getInterval().getStart(), defaultSegment.getInterval().getEnd().minus(1)), + null, + null ).isEmpty() ); } @@ -1591,7 +1689,9 @@ public void testUnusedOverlapHigh() throws IOException Assert.assertTrue( coordinator.retrieveUnusedSegmentsForInterval( defaultSegment.getDataSource(), - defaultSegment.getInterval().withStart(defaultSegment.getInterval().getEnd().minus(1)) + defaultSegment.getInterval().withStart(defaultSegment.getInterval().getEnd().minus(1)), + null, + null ).isEmpty() ); } @@ -1606,7 +1706,9 @@ public void testUnusedBigOverlap() throws IOException ImmutableSet.copyOf( coordinator.retrieveUnusedSegmentsForInterval( defaultSegment.getDataSource(), - Intervals.of("2000/2999") + Intervals.of("2000/2999"), + null, + null ) ) ); @@ -1622,7 +1724,9 @@ public void testUnusedLowRange() throws IOException ImmutableSet.copyOf( coordinator.retrieveUnusedSegmentsForInterval( defaultSegment.getDataSource(), - defaultSegment.getInterval().withStart(defaultSegment.getInterval().getStart().minus(1)) + defaultSegment.getInterval().withStart(defaultSegment.getInterval().getStart().minus(1)), + null, + null ) ) ); @@ -1631,7 +1735,9 @@ public void testUnusedLowRange() throws IOException ImmutableSet.copyOf( coordinator.retrieveUnusedSegmentsForInterval( defaultSegment.getDataSource(), - defaultSegment.getInterval().withStart(defaultSegment.getInterval().getStart().minusYears(1)) + defaultSegment.getInterval().withStart(defaultSegment.getInterval().getStart().minusYears(1)), + null, + null ) ) ); @@ -1647,7 +1753,9 @@ public void testUnusedHighRange() throws IOException ImmutableSet.copyOf( coordinator.retrieveUnusedSegmentsForInterval( defaultSegment.getDataSource(), - defaultSegment.getInterval().withEnd(defaultSegment.getInterval().getEnd().plus(1)) + defaultSegment.getInterval().withEnd(defaultSegment.getInterval().getEnd().plus(1)), + null, + null ) ) ); @@ -1656,7 +1764,9 @@ public void testUnusedHighRange() throws IOException ImmutableSet.copyOf( coordinator.retrieveUnusedSegmentsForInterval( defaultSegment.getDataSource(), - defaultSegment.getInterval().withEnd(defaultSegment.getInterval().getEnd().plusYears(1)) + defaultSegment.getInterval().withEnd(defaultSegment.getInterval().getEnd().plusYears(1)), + null, + null ) ) ); @@ -2189,7 +2299,7 @@ public void testAllocatePendingSegmentAfterDroppingExistingSegment() Assert.assertEquals(1, identifier3.getShardSpec().getNumCorePartitions()); // now drop the used segment previously loaded: - markAllSegmentsUnused(ImmutableSet.of(segment)); + markAllSegmentsUnused(ImmutableSet.of(segment), DateTimes.nowUtc()); // and final load, this reproduces an issue that could happen with multiple streaming appends, // followed by a reindex, followed by a drop, and more streaming data coming in for same interval @@ -2208,7 +2318,7 @@ public void testAllocatePendingSegmentAfterDroppingExistingSegment() } /** - * Slightly different that the above test but that involves reverted compaction + * Slightly different from the above test that involves reverted compaction * 1) used segments of version = A, id = 0, 1, 2 * 2) overwrote segments of version = B, id = 0 <= compaction * 3) marked segments unused for version = A, id = 0, 1, 2 <= overshadowing @@ -2354,7 +2464,7 @@ public void testAnotherAllocatePendingSegmentAfterRevertingCompaction() // 5) reverted compaction (by marking B_0 as unused) // Revert compaction a manual metadata update which is basically the following two steps: - markAllSegmentsUnused(ImmutableSet.of(compactedSegment)); // <- drop compacted segment + markAllSegmentsUnused(ImmutableSet.of(compactedSegment), DateTimes.nowUtc()); // <- drop compacted segment // pending: version = A, id = 0,1,2 // version = B, id = 1 // @@ -2896,7 +3006,9 @@ public void testMarkSegmentsAsUnusedWithinIntervalOneYear() throws IOException ImmutableSet.copyOf( coordinator.retrieveUnusedSegmentsForInterval( existingSegment1.getDataSource(), - existingSegment1.getInterval().withEnd(existingSegment1.getInterval().getEnd().plus(1)) + existingSegment1.getInterval().withEnd(existingSegment1.getInterval().getEnd().plus(1)), + null, + null ) ) ); @@ -2905,7 +3017,9 @@ public void testMarkSegmentsAsUnusedWithinIntervalOneYear() throws IOException ImmutableSet.copyOf( coordinator.retrieveUnusedSegmentsForInterval( existingSegment2.getDataSource(), - existingSegment2.getInterval().withEnd(existingSegment2.getInterval().getEnd().plusYears(1)) + existingSegment2.getInterval().withEnd(existingSegment2.getInterval().getEnd().plusYears(1)), + null, + null ) ) ); @@ -2928,7 +3042,9 @@ public void testMarkSegmentsAsUnusedWithinIntervalTwoYears() throws IOException ImmutableSet.copyOf( coordinator.retrieveUnusedSegmentsForInterval( existingSegment1.getDataSource(), - existingSegment1.getInterval().withEnd(existingSegment1.getInterval().getEnd().plus(1)) + existingSegment1.getInterval().withEnd(existingSegment1.getInterval().getEnd().plus(1)), + null, + null ) ) ); @@ -2937,7 +3053,9 @@ public void testMarkSegmentsAsUnusedWithinIntervalTwoYears() throws IOException ImmutableSet.copyOf( coordinator.retrieveUnusedSegmentsForInterval( existingSegment2.getDataSource(), - existingSegment2.getInterval().withEnd(existingSegment2.getInterval().getEnd().plusYears(1)) + existingSegment2.getInterval().withEnd(existingSegment2.getInterval().getEnd().plusYears(1)), + null, + null ) ) ); @@ -3100,7 +3218,7 @@ public void testTimelineVisibilityWith0CorePartitionTombstone() throws IOExcepti Assert.assertTrue(coordinator.commitSegments(dataSegments).containsAll(dataSegments)); // Mark the tombstone as unused - markAllSegmentsUnused(tombstones); + markAllSegmentsUnused(tombstones, DateTimes.nowUtc()); final Collection allUsedSegments = coordinator.retrieveAllUsedSegments( DS.WIKI, @@ -3154,7 +3272,7 @@ public void testTimelineWith1CorePartitionTombstone() throws IOException Assert.assertTrue(coordinator.commitSegments(dataSegments).containsAll(dataSegments)); // Mark the tombstone as unused - markAllSegmentsUnused(tombstones); + markAllSegmentsUnused(tombstones, DateTimes.nowUtc()); final Collection allUsedSegments = coordinator.retrieveAllUsedSegments( DS.WIKI, @@ -3205,7 +3323,8 @@ private ImmutableList retrieveUnusedSegments( final List intervals, final Integer limit, final String lastSegmentId, - final SortOrder sortOrder + final SortOrder sortOrder, + final DateTime maxUsedStatusLastUpdatedTime ) { return derbyConnector.inReadOnlyTransaction( @@ -3217,7 +3336,7 @@ private ImmutableList retrieveUnusedSegments( derbyConnectorRule.metadataTablesConfigSupplier().get(), mapper ) - .retrieveUnusedSegments(DS.WIKI, intervals, limit, lastSegmentId, sortOrder)) { + .retrieveUnusedSegments(DS.WIKI, intervals, limit, lastSegmentId, sortOrder, maxUsedStatusLastUpdatedTime)) { return ImmutableList.copyOf(iterator); } } diff --git a/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java b/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java index 7ba5916a7710..cad3f21f06a2 100644 --- a/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java +++ b/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java @@ -433,6 +433,7 @@ public void test_taskPayload() throws ExecutionException, InterruptedException, null, null, null, + null, null ); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java index 5adb345c9fa9..a174b9e2264a 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java @@ -60,6 +60,7 @@ import java.util.stream.Collectors; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; /** @@ -68,10 +69,10 @@ @RunWith(MockitoJUnitRunner.class) public class KillUnusedSegmentsTest { - private static final int MAX_SEGMENTS_TO_KILL = 10; - private static final Duration COORDINATOR_KILL_PERIOD = Duration.standardMinutes(2); + private static final Duration INDEXING_PERIOD = Duration.standardSeconds(1); + private static final Duration COORDINATOR_KILL_PERIOD = Duration.standardSeconds(1); private static final Duration DURATION_TO_RETAIN = Duration.standardDays(1); - private static final Duration INDEXING_PERIOD = Duration.standardMinutes(1); + private static final int MAX_SEGMENTS_TO_KILL = 10; private static final String DATASOURCE = "DS1"; @Mock @@ -174,7 +175,7 @@ public void testRunWithNoIntervalShouldNotKillAnySegments() mockTaskSlotUsage(1.0, Integer.MAX_VALUE, 1, 10); target.run(params); Mockito.verify(overlordClient, Mockito.never()) - .runKillTask(anyString(), anyString(), any(Interval.class)); + .runKillTask(anyString(), anyString(), any(Interval.class), anyInt(), any(DateTime.class)); } @Test @@ -192,7 +193,7 @@ public void testRunWithSpecificDatasourceAndNoIntervalShouldNotKillAnySegments() mockTaskSlotUsage(1.0, Integer.MAX_VALUE, 1, 10); target.run(params); Mockito.verify(overlordClient, Mockito.never()) - .runKillTask(anyString(), anyString(), any(Interval.class)); + .runKillTask(anyString(), anyString(), any(Interval.class), anyInt(), any(DateTime.class)); } @Test @@ -364,48 +365,24 @@ private void runAndVerifyKillInterval(Interval expectedKillInterval) { int limit = config.getCoordinatorKillMaxSegments(); Mockito.doReturn(Futures.immediateFuture("ok")) - .when(overlordClient) - .runKillTask( - ArgumentMatchers.anyString(), - ArgumentMatchers.anyString(), - ArgumentMatchers.any(Interval.class), - ArgumentMatchers.anyInt()); + .when(overlordClient) + .runKillTask( + ArgumentMatchers.anyString(), + ArgumentMatchers.anyString(), + ArgumentMatchers.any(Interval.class), + ArgumentMatchers.anyInt(), + ArgumentMatchers.any(DateTime.class)); target.runInternal(params); Mockito.verify(overlordClient, Mockito.times(1)).runKillTask( ArgumentMatchers.anyString(), ArgumentMatchers.eq(DATASOURCE), ArgumentMatchers.eq(expectedKillInterval), - ArgumentMatchers.eq(limit) + ArgumentMatchers.eq(limit), + ArgumentMatchers.any() ); } - private void runAndVerifyKillIntervals(List expectedKillIntervals) - { - int limit = config.getCoordinatorKillMaxSegments(); - Mockito.doReturn(Futures.immediateFuture("ok")) - .when(overlordClient) - .runKillTask( - ArgumentMatchers.anyString(), - ArgumentMatchers.anyString(), - ArgumentMatchers.any(Interval.class), - ArgumentMatchers.anyInt()); - for (int i = 0; i < expectedKillIntervals.size(); i++) { - target.run(params); - verifyState(ImmutableMap.of(DATASOURCE, yearOldSegment.getInterval().getEnd())); - verifyStats(9, 1, 10); - } - - for (Interval expectedKillInterval : expectedKillIntervals) { - Mockito.verify(overlordClient, Mockito.times(1)).runKillTask( - ArgumentMatchers.anyString(), - ArgumentMatchers.eq(DATASOURCE), - ArgumentMatchers.eq(expectedKillInterval), - ArgumentMatchers.eq(limit) - ); - } - } - private void verifyStats(int availableSlots, int submittedTasks, int maxSlots) { verifyStats(availableSlots, submittedTasks, maxSlots, 1); @@ -430,7 +407,8 @@ private void runAndVerifyNoKill() ArgumentMatchers.anyString(), ArgumentMatchers.anyString(), ArgumentMatchers.any(Interval.class), - ArgumentMatchers.anyInt() + ArgumentMatchers.anyInt(), + ArgumentMatchers.any(DateTime.class) ); } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java index cfb8fec941e0..1e2f2d462c45 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java @@ -217,7 +217,7 @@ public List getUnusedSegmentIntervals( @Nullable final DateTime minStartTime, final DateTime maxEndTime, final int limit, - final DateTime maxUsedFlagLastUpdatedTime + final DateTime maxUsedStatusLastUpdatedTime ) { return null; diff --git a/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java b/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java index b83ebf67527f..089cd2e1d29c 100644 --- a/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java +++ b/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java @@ -591,7 +591,7 @@ public void testKillSegmentsInIntervalInDataSource() Interval theInterval = Intervals.of(interval.replace('_', '/')); OverlordClient overlordClient = EasyMock.createStrictMock(OverlordClient.class); - EasyMock.expect(overlordClient.runKillTask("api-issued", "datasource1", theInterval, null)) + EasyMock.expect(overlordClient.runKillTask("api-issued", "datasource1", theInterval, null, null)) .andReturn(Futures.immediateFuture("kill_task_1")); EasyMock.replay(overlordClient, server);