From a372da0fb34e5e0369a19542b6c4faca14279e8e Mon Sep 17 00:00:00 2001 From: Jason Koch Date: Sun, 23 Jul 2023 12:58:10 -0700 Subject: [PATCH 01/10] split KillUnusedSegmentsTask to smaller batches Processing in smaller chunks allows the task execution to yield the TaskLockbox lock, which allows the overlord to continue being responsive to other tasks and users while this particular kill task is executing. --- .../common/task/KillUnusedSegmentsTask.java | 43 +++++++++++++------ 1 file changed, 31 insertions(+), 12 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java index 0dbceaa7e78b..67d36960948f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; import org.apache.druid.client.indexing.ClientKillUnusedSegmentsTaskQuery; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.TaskLock; @@ -60,6 +61,10 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask { private static final Logger LOG = new Logger(KillUnusedSegmentsTask.class); + // We split this to try and keep each nuke operation relatively short, in the case that either + // the database or the storage layer is particularly slow. + private static final int SEGMENT_NUKE_BATCH_SIZE = 10_000; + private final boolean markAsUnused; @JsonCreator @@ -114,23 +119,37 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception } // List unused segments - final List unusedSegments = toolbox + final List allUnusedSegments = toolbox .getTaskActionClient() .submit(new RetrieveUnusedSegmentsAction(getDataSource(), getInterval())); - if (!TaskLocks.isLockCoversSegments(taskLockMap, unusedSegments)) { - throw new ISE( - "Locks[%s] for task[%s] can't cover segments[%s]", - taskLockMap.values().stream().flatMap(List::stream).collect(Collectors.toList()), - getId(), - unusedSegments - ); + final List> unusedSegmentBatches = Lists.partition(allUnusedSegments, SEGMENT_NUKE_BATCH_SIZE); + + // The individual activities here on the toolbox have possibility to run for a longer period of time, + // since they involve calls to metadata storage and archival object storage. And, the tasks take hold of the + // task lockbox to run. By splitting the segment list into smaller batches, we have an opportunity to yield the + // lock to other activity that might need to happen using the overlord tasklockbox. + + for (final List unusedSegments : unusedSegmentBatches) { + if (!TaskLocks.isLockCoversSegments(taskLockMap, unusedSegments)) { + throw new ISE( + "Locks[%s] for task[%s] can't cover segments[%s]", + taskLockMap.values().stream().flatMap(List::stream).collect(Collectors.toList()), + getId(), + unusedSegments + ); + } + + // Kill segments: + // Order is important here: we want the nuke action to clean up the metadata records _before_ the + // segments are removed from storage, this helps maintain that we will always have a storage segment if + // the metadata segment is present. If the segment nuke throws an exception, then the segment cleanup is + // abandoned. + + toolbox.getTaskActionClient().submit(new SegmentNukeAction(new HashSet<>(unusedSegments))); + toolbox.getDataSegmentKiller().kill(unusedSegments); } - // Kill segments - toolbox.getTaskActionClient().submit(new SegmentNukeAction(new HashSet<>(unusedSegments))); - toolbox.getDataSegmentKiller().kill(unusedSegments); - return TaskStatus.success(getId()); } From 011d04331b203ea63b79ddde7b5423a0370f9c5d Mon Sep 17 00:00:00 2001 From: Jason Koch Date: Fri, 28 Jul 2023 15:03:44 -0700 Subject: [PATCH 02/10] introduce KillUnusedSegmentsTask batchSize parameter to control size of batching --- docs/data-management/delete.md | 4 +- .../common/task/KillUnusedSegmentsTask.java | 29 ++++++- ...tKillUnusedSegmentsTaskQuerySerdeTest.java | 27 +++++- .../task/KillUnusedSegmentsTaskTest.java | 82 ++++++++++++++++++- .../indexing/overlord/TaskLifecycleTest.java | 4 +- ...TestIndexerMetadataStorageCoordinator.java | 7 ++ .../ClientKillUnusedSegmentsTaskQuery.java | 16 +++- .../druid/rpc/indexing/OverlordClient.java | 2 +- ...ClientKillUnusedSegmentsTaskQueryTest.java | 11 ++- .../rpc/indexing/OverlordClientImplTest.java | 2 +- 10 files changed, 167 insertions(+), 17 deletions(-) diff --git a/docs/data-management/delete.md b/docs/data-management/delete.md index 60377d858ca6..373d456289d8 100644 --- a/docs/data-management/delete.md +++ b/docs/data-management/delete.md @@ -95,9 +95,11 @@ The available grammar is: "id": , "dataSource": , "interval" : , - "context": + "context": , + "batchSize": } ``` **WARNING:** The `kill` task permanently removes all information about the affected segments from the metadata store and deep storage. This operation cannot be undone. + diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java index 67d36960948f..bd4f3e8df05f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java @@ -23,6 +23,8 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import org.apache.druid.client.indexing.ClientKillUnusedSegmentsTaskQuery; @@ -63,9 +65,12 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask // We split this to try and keep each nuke operation relatively short, in the case that either // the database or the storage layer is particularly slow. - private static final int SEGMENT_NUKE_BATCH_SIZE = 10_000; + private static final int DEFAULT_SEGMENT_NUKE_BATCH_SIZE = 100; private final boolean markAsUnused; + private final int batchSize; + + private int countBatchesIssued = 0; @JsonCreator public KillUnusedSegmentsTask( @@ -73,7 +78,8 @@ public KillUnusedSegmentsTask( @JsonProperty("dataSource") String dataSource, @JsonProperty("interval") Interval interval, @JsonProperty("context") Map context, - @JsonProperty("markAsUnused") Boolean markAsUnused + @JsonProperty("markAsUnused") Boolean markAsUnused, + @JsonProperty("batchSize") Integer batchSize ) { super( @@ -83,6 +89,8 @@ public KillUnusedSegmentsTask( context ); this.markAsUnused = markAsUnused != null && markAsUnused; + this.batchSize = (batchSize != null) ? batchSize : DEFAULT_SEGMENT_NUKE_BATCH_SIZE; + Preconditions.checkArgument(this.batchSize > 0, "batchSize should be greater than zero"); } @JsonProperty @@ -92,6 +100,13 @@ public boolean isMarkAsUnused() return markAsUnused; } + @JsonProperty + @JsonInclude(JsonInclude.Include.NON_DEFAULT) + public int getBatchSize() + { + return batchSize; + } + @Override public String getType() { @@ -106,6 +121,13 @@ public Set getInputSourceResources() return ImmutableSet.of(); } + @JsonIgnore + @VisibleForTesting + int getCountBatchesIssued() + { + return countBatchesIssued; + } + @Override public TaskStatus runTask(TaskToolbox toolbox) throws Exception { @@ -123,7 +145,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception .getTaskActionClient() .submit(new RetrieveUnusedSegmentsAction(getDataSource(), getInterval())); - final List> unusedSegmentBatches = Lists.partition(allUnusedSegments, SEGMENT_NUKE_BATCH_SIZE); + final List> unusedSegmentBatches = Lists.partition(allUnusedSegments, batchSize); // The individual activities here on the toolbox have possibility to run for a longer period of time, // since they involve calls to metadata storage and archival object storage. And, the tasks take hold of the @@ -148,6 +170,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception toolbox.getTaskActionClient().submit(new SegmentNukeAction(new HashSet<>(unusedSegments))); toolbox.getDataSegmentKiller().kill(unusedSegments); + countBatchesIssued++; } return TaskStatus.success(getId()); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientKillUnusedSegmentsTaskQuerySerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientKillUnusedSegmentsTaskQuerySerdeTest.java index e4583c91abf9..70cd5fcf19b9 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientKillUnusedSegmentsTaskQuerySerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientKillUnusedSegmentsTaskQuerySerdeTest.java @@ -51,7 +51,8 @@ public void testClientKillUnusedSegmentsTaskQueryToKillUnusedSegmentsTask() thro "killTaskId", "datasource", Intervals.of("2020-01-01/P1D"), - true + true, + 99 ); final byte[] json = objectMapper.writeValueAsBytes(taskQuery); final KillUnusedSegmentsTask fromJson = (KillUnusedSegmentsTask) objectMapper.readValue(json, Task.class); @@ -59,6 +60,26 @@ public void testClientKillUnusedSegmentsTaskQueryToKillUnusedSegmentsTask() thro Assert.assertEquals(taskQuery.getDataSource(), fromJson.getDataSource()); Assert.assertEquals(taskQuery.getInterval(), fromJson.getInterval()); Assert.assertEquals(taskQuery.getMarkAsUnused(), fromJson.isMarkAsUnused()); + Assert.assertEquals(taskQuery.getBatchSize(), Integer.valueOf(fromJson.getBatchSize())); + } + + @Test + public void testClientKillUnusedSegmentsTaskQueryToKillUnusedSegmentsTaskDefaultBatchSize() throws IOException + { + final ClientKillUnusedSegmentsTaskQuery taskQuery = new ClientKillUnusedSegmentsTaskQuery( + "killTaskId", + "datasource", + Intervals.of("2020-01-01/P1D"), + true, + null + ); + final byte[] json = objectMapper.writeValueAsBytes(taskQuery); + final KillUnusedSegmentsTask fromJson = (KillUnusedSegmentsTask) objectMapper.readValue(json, Task.class); + Assert.assertEquals(taskQuery.getId(), fromJson.getId()); + Assert.assertEquals(taskQuery.getDataSource(), fromJson.getDataSource()); + Assert.assertEquals(taskQuery.getInterval(), fromJson.getInterval()); + Assert.assertEquals(taskQuery.getMarkAsUnused(), fromJson.isMarkAsUnused()); + Assert.assertEquals(100, fromJson.getBatchSize()); } @Test @@ -69,7 +90,8 @@ public void testKillUnusedSegmentsTaskToClientKillUnusedSegmentsTaskQuery() thro "datasource", Intervals.of("2020-01-01/P1D"), null, - true + true, + 99 ); final byte[] json = objectMapper.writeValueAsBytes(task); final ClientKillUnusedSegmentsTaskQuery taskQuery = (ClientKillUnusedSegmentsTaskQuery) objectMapper.readValue( @@ -80,5 +102,6 @@ public void testKillUnusedSegmentsTaskToClientKillUnusedSegmentsTaskQuery() thro Assert.assertEquals(task.getDataSource(), taskQuery.getDataSource()); Assert.assertEquals(task.getInterval(), taskQuery.getInterval()); Assert.assertEquals(task.isMarkAsUnused(), taskQuery.getMarkAsUnused()); + Assert.assertEquals(Integer.valueOf(task.getBatchSize()), taskQuery.getBatchSize()); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java index 86c6aeb4095e..44d6b51d14d3 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java @@ -33,6 +33,7 @@ import org.junit.Before; import org.junit.Test; +import java.util.Collections; import java.util.List; import java.util.Set; @@ -79,7 +80,8 @@ public void testKill() throws Exception DATA_SOURCE, Intervals.of("2019-03-01/2019-04-01"), null, - false + false, + null ); Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task).get().getStatusCode()); @@ -95,6 +97,7 @@ public void testKill() throws Exception newSegment(Intervals.of("2019-01-01/2019-02-01"), version), newSegment(Intervals.of("2019-04-01/2019-05-01"), version) ); + Assert.assertEquals(1, task.getCountBatchesIssued()); } @@ -124,7 +127,8 @@ public void testKillWithMarkUnused() throws Exception DATA_SOURCE, Intervals.of("2019-03-01/2019-04-01"), null, - true + true, + null ); Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task).get().getStatusCode()); @@ -140,6 +144,7 @@ public void testKillWithMarkUnused() throws Exception newSegment(Intervals.of("2019-01-01/2019-02-01"), version), newSegment(Intervals.of("2019-04-01/2019-05-01"), version) ); + Assert.assertEquals(1, task.getCountBatchesIssued()); } @Test @@ -151,11 +156,82 @@ public void testGetInputSourceResources() DATA_SOURCE, Intervals.of("2019-03-01/2019-04-01"), null, - true + true, + null ); Assert.assertTrue(task.getInputSourceResources().isEmpty()); } + @Test + public void testKillBatchSizeOne() throws Exception + { + final String version = DateTimes.nowUtc().toString(); + final Set segments = ImmutableSet.of( + newSegment(Intervals.of("2019-01-01/2019-02-01"), version), + newSegment(Intervals.of("2019-02-01/2019-03-01"), version), + newSegment(Intervals.of("2019-03-01/2019-04-01"), version), + newSegment(Intervals.of("2019-04-01/2019-05-01"), version) + ); + final Set announced = getMetadataStorageCoordinator().announceHistoricalSegments(segments); + + Assert.assertEquals(segments, announced); + + final KillUnusedSegmentsTask task = + new KillUnusedSegmentsTask( + null, + DATA_SOURCE, + Intervals.of("2018-01-01/2020-01-01"), + null, + true, + 1 + ); + + Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task).get().getStatusCode()); + + // we expect ALL tasks to be deleted + + final List unusedSegments = + getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(DATA_SOURCE, Intervals.of("2019/2020")); + + Assert.assertEquals(Collections.emptyList(), unusedSegments); + Assert.assertEquals(4, task.getCountBatchesIssued()); + } + + @Test + public void testKillBatchSizeThree() throws Exception + { + final String version = DateTimes.nowUtc().toString(); + final Set segments = ImmutableSet.of( + newSegment(Intervals.of("2019-01-01/2019-02-01"), version), + newSegment(Intervals.of("2019-02-01/2019-03-01"), version), + newSegment(Intervals.of("2019-03-01/2019-04-01"), version), + newSegment(Intervals.of("2019-04-01/2019-05-01"), version) + ); + final Set announced = getMetadataStorageCoordinator().announceHistoricalSegments(segments); + + Assert.assertEquals(segments, announced); + + final KillUnusedSegmentsTask task = + new KillUnusedSegmentsTask( + null, + DATA_SOURCE, + Intervals.of("2018-01-01/2020-01-01"), + null, + true, + 3 + ); + + Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task).get().getStatusCode()); + + // we expect ALL tasks to be deleted + + final List unusedSegments = + getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(DATA_SOURCE, Intervals.of("2019/2020")); + + Assert.assertEquals(Collections.emptyList(), unusedSegments); + Assert.assertEquals(2, task.getCountBatchesIssued()); + } + private static DataSegment newSegment(Interval interval, String version) { return new DataSegment( diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java index 9b9d6d362249..2ea7fcedf193 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java @@ -948,7 +948,8 @@ public DataSegment apply(String input) "test_kill_task", Intervals.of("2011-04-01/P4D"), null, - false + false, + null ); final TaskStatus status = runTask(killUnusedSegmentsTask); @@ -956,6 +957,7 @@ public DataSegment apply(String input) Assert.assertEquals("merged statusCode", TaskState.SUCCESS, status.getStatusCode()); Assert.assertEquals("num segments published", 0, mdc.getPublished().size()); Assert.assertEquals("num segments nuked", 3, mdc.getNuked().size()); + Assert.assertEquals("delete segment batch call count", 1, mdc.getDeleteSegmentsCount()); Assert.assertTrue( "expected unused segments get killed", expectedUnusedSegments.containsAll(mdc.getNuked()) && mdc.getNuked().containsAll( diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java index d64bd1d22263..c8bf8fd28ab1 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java @@ -49,6 +49,7 @@ public class TestIndexerMetadataStorageCoordinator implements IndexerMetadataSto private final Set published = Sets.newConcurrentHashSet(); private final Set nuked = Sets.newConcurrentHashSet(); private final List unusedSegments; + private int deleteSegmentsCount = 0; public TestIndexerMetadataStorageCoordinator() { @@ -201,6 +202,7 @@ public int deletePendingSegments(String dataSource) @Override public void deleteSegments(Set segments) { + deleteSegmentsCount++; nuked.addAll(segments); } @@ -220,6 +222,11 @@ public Set getNuked() return ImmutableSet.copyOf(nuked); } + public int getDeleteSegmentsCount() + { + return deleteSegmentsCount; + } + public void setUnusedSegments(List newUnusedSegments) { synchronized (unusedSegments) { diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQuery.java b/server/src/main/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQuery.java index 4435e5fac4c8..779c8acc7f28 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQuery.java +++ b/server/src/main/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQuery.java @@ -39,19 +39,22 @@ public class ClientKillUnusedSegmentsTaskQuery implements ClientTaskQuery private final String dataSource; private final Interval interval; private final Boolean markAsUnused; + private final Integer batchSize; @JsonCreator public ClientKillUnusedSegmentsTaskQuery( @JsonProperty("id") String id, @JsonProperty("dataSource") String dataSource, @JsonProperty("interval") Interval interval, - @JsonProperty("markAsUnused") Boolean markAsUnused + @JsonProperty("markAsUnused") Boolean markAsUnused, + @JsonProperty("batchSize") Integer batchSize ) { this.id = Preconditions.checkNotNull(id, "id"); this.dataSource = dataSource; this.interval = interval; this.markAsUnused = markAsUnused; + this.batchSize = batchSize; } @JsonProperty @@ -87,6 +90,12 @@ public Boolean getMarkAsUnused() return markAsUnused; } + @JsonProperty + public Integer getBatchSize() + { + return batchSize; + } + @Override public boolean equals(Object o) { @@ -100,12 +109,13 @@ public boolean equals(Object o) return Objects.equals(id, that.id) && Objects.equals(dataSource, that.dataSource) && Objects.equals(interval, that.interval) - && Objects.equals(markAsUnused, that.markAsUnused); + && Objects.equals(markAsUnused, that.markAsUnused) + && Objects.equals(batchSize, that.batchSize); } @Override public int hashCode() { - return Objects.hash(id, dataSource, interval, markAsUnused); + return Objects.hash(id, dataSource, interval, markAsUnused, batchSize); } } diff --git a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java index 0cb77ee7045a..51b4323a11f4 100644 --- a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java +++ b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java @@ -86,7 +86,7 @@ public interface OverlordClient default ListenableFuture runKillTask(String idPrefix, String dataSource, Interval interval) { final String taskId = IdUtils.newTaskId(idPrefix, ClientKillUnusedSegmentsTaskQuery.TYPE, dataSource, interval); - final ClientTaskQuery taskQuery = new ClientKillUnusedSegmentsTaskQuery(taskId, dataSource, interval, false); + final ClientTaskQuery taskQuery = new ClientKillUnusedSegmentsTaskQuery(taskId, dataSource, interval, false, null); return FutureUtils.transform(runTask(taskId, taskQuery), ignored -> taskId); } diff --git a/server/src/test/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQueryTest.java b/server/src/test/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQueryTest.java index 0e6c0c86cb53..af9b2c8ec11b 100644 --- a/server/src/test/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQueryTest.java +++ b/server/src/test/java/org/apache/druid/client/indexing/ClientKillUnusedSegmentsTaskQueryTest.java @@ -34,13 +34,14 @@ public class ClientKillUnusedSegmentsTaskQueryTest public static final DateTime START = DateTimes.nowUtc(); private static final Interval INTERVAL = new Interval(START, START.plus(1)); private static final Boolean MARK_UNUSED = true; + private static final Integer BATCH_SIZE = 999; ClientKillUnusedSegmentsTaskQuery clientKillUnusedSegmentsQuery; @Before public void setUp() { - clientKillUnusedSegmentsQuery = new ClientKillUnusedSegmentsTaskQuery("killTaskId", DATA_SOURCE, INTERVAL, true); + clientKillUnusedSegmentsQuery = new ClientKillUnusedSegmentsTaskQuery("killTaskId", DATA_SOURCE, INTERVAL, true, BATCH_SIZE); } @After @@ -73,12 +74,18 @@ public void testGetMarkUnused() Assert.assertEquals(MARK_UNUSED, clientKillUnusedSegmentsQuery.getMarkAsUnused()); } + @Test + public void testGetBatchSize() + { + Assert.assertEquals(BATCH_SIZE, clientKillUnusedSegmentsQuery.getBatchSize()); + } + @Test public void testEquals() { EqualsVerifier.forClass(ClientKillUnusedSegmentsTaskQuery.class) .usingGetClass() - .withNonnullFields("id", "dataSource", "interval") + .withNonnullFields("id", "dataSource", "interval", "batchSize") .verify(); } } diff --git a/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java b/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java index 7e747956a1a9..c1e48c496ca1 100644 --- a/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java +++ b/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java @@ -422,7 +422,7 @@ public void test_killPendingSegments() throws Exception public void test_taskPayload() throws ExecutionException, InterruptedException, JsonProcessingException { final String taskID = "taskId_1"; - final ClientTaskQuery clientTaskQuery = new ClientKillUnusedSegmentsTaskQuery(taskID, "test", null, null); + final ClientTaskQuery clientTaskQuery = new ClientKillUnusedSegmentsTaskQuery(taskID, "test", null, null, null); serviceClient.expectAndRespond( new RequestBuilder(HttpMethod.GET, "/druid/indexer/v1/task/" + taskID), From ffc0659d7bb3890dfcc4fdc637eb9841931daeb9 Mon Sep 17 00:00:00 2001 From: Jason Koch Date: Fri, 28 Jul 2023 16:20:55 -0700 Subject: [PATCH 03/10] switch getCountBatchesIssued from int to long, just in case --- .../indexing/common/task/KillUnusedSegmentsTask.java | 4 ++-- .../indexing/common/task/KillUnusedSegmentsTaskTest.java | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java index bd4f3e8df05f..a963244c5b15 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java @@ -70,7 +70,7 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask private final boolean markAsUnused; private final int batchSize; - private int countBatchesIssued = 0; + private long countBatchesIssued = 0; @JsonCreator public KillUnusedSegmentsTask( @@ -123,7 +123,7 @@ public Set getInputSourceResources() @JsonIgnore @VisibleForTesting - int getCountBatchesIssued() + long getCountBatchesIssued() { return countBatchesIssued; } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java index 44d6b51d14d3..9b936f52e6d6 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java @@ -97,7 +97,7 @@ public void testKill() throws Exception newSegment(Intervals.of("2019-01-01/2019-02-01"), version), newSegment(Intervals.of("2019-04-01/2019-05-01"), version) ); - Assert.assertEquals(1, task.getCountBatchesIssued()); + Assert.assertEquals(1L, task.getCountBatchesIssued()); } @@ -144,7 +144,7 @@ public void testKillWithMarkUnused() throws Exception newSegment(Intervals.of("2019-01-01/2019-02-01"), version), newSegment(Intervals.of("2019-04-01/2019-05-01"), version) ); - Assert.assertEquals(1, task.getCountBatchesIssued()); + Assert.assertEquals(1L, task.getCountBatchesIssued()); } @Test @@ -194,7 +194,7 @@ public void testKillBatchSizeOne() throws Exception getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(DATA_SOURCE, Intervals.of("2019/2020")); Assert.assertEquals(Collections.emptyList(), unusedSegments); - Assert.assertEquals(4, task.getCountBatchesIssued()); + Assert.assertEquals(4L, task.getCountBatchesIssued()); } @Test @@ -229,7 +229,7 @@ public void testKillBatchSizeThree() throws Exception getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(DATA_SOURCE, Intervals.of("2019/2020")); Assert.assertEquals(Collections.emptyList(), unusedSegments); - Assert.assertEquals(2, task.getCountBatchesIssued()); + Assert.assertEquals(2L, task.getCountBatchesIssued()); } private static DataSegment newSegment(Interval interval, String version) From d3d5a71ddf41dd258a7721b8bd1893b25e83cd11 Mon Sep 17 00:00:00 2001 From: Jason Koch Date: Sat, 29 Jul 2023 14:43:47 -0700 Subject: [PATCH 04/10] split KillUnusedSegmentsTask to smaller batches (pr feedback) * provide a longer explanation for kill task batchSize parameter * add logging details for kill batch progress * javadoc and field name changes --- docs/data-management/delete.md | 8 +++++- .../common/task/KillUnusedSegmentsTask.java | 28 ++++++++++++++----- .../task/KillUnusedSegmentsTaskTest.java | 8 +++--- 3 files changed, 32 insertions(+), 12 deletions(-) diff --git a/docs/data-management/delete.md b/docs/data-management/delete.md index 373d456289d8..d6ddc6c6856c 100644 --- a/docs/data-management/delete.md +++ b/docs/data-management/delete.md @@ -96,10 +96,16 @@ The available grammar is: "dataSource": , "interval" : , "context": , - "batchSize": + "batchSize": } ``` +Special parameter explanations: + +| Parameter |Default| Explanation | +|--------------|-------|--------------------------------------------------------------------------------------------------------| +| batchSize |100 | Split Metadata and Segment storage into smaller batches for processing. The Tasks lockbox is locked during operation, preventing other segment operations. Splitting the task into batches allows the kill job to yield to other task lockbox operations.| + **WARNING:** The `kill` task permanently removes all information about the affected segments from the metadata store and deep storage. This operation cannot be undone. diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java index a963244c5b15..6c6addd34a5d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java @@ -63,14 +63,17 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask { private static final Logger LOG = new Logger(KillUnusedSegmentsTask.class); - // We split this to try and keep each nuke operation relatively short, in the case that either - // the database or the storage layer is particularly slow. private static final int DEFAULT_SEGMENT_NUKE_BATCH_SIZE = 100; private final boolean markAsUnused; + + /** Split processing to try and keep each nuke operation relatively short, in the case that either + * the database or the storage layer is particularly slow. + */ private final int batchSize; - private long countBatchesIssued = 0; + // counter included primarily for testing + private long numBatchesProcessed = 0; @JsonCreator public KillUnusedSegmentsTask( @@ -123,9 +126,9 @@ public Set getInputSourceResources() @JsonIgnore @VisibleForTesting - long getCountBatchesIssued() + long getNumBatchesProcessed() { - return countBatchesIssued; + return numBatchesProcessed; } @Override @@ -152,6 +155,9 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception // task lockbox to run. By splitting the segment list into smaller batches, we have an opportunity to yield the // lock to other activity that might need to happen using the overlord tasklockbox. + LOG.info("kill starting: id [%s] dataSource [%s] interval [%s], total segments [%d], batches [%d], ([%d] segments per batch)", + getId(), getDataSource(), getInterval(), allUnusedSegments.size(), unusedSegmentBatches.size(), batchSize); + for (final List unusedSegments : unusedSegmentBatches) { if (!TaskLocks.isLockCoversSegments(taskLockMap, unusedSegments)) { throw new ISE( @@ -162,7 +168,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception ); } - // Kill segments: + // Kill segments // Order is important here: we want the nuke action to clean up the metadata records _before_ the // segments are removed from storage, this helps maintain that we will always have a storage segment if // the metadata segment is present. If the segment nuke throws an exception, then the segment cleanup is @@ -170,9 +176,17 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception toolbox.getTaskActionClient().submit(new SegmentNukeAction(new HashSet<>(unusedSegments))); toolbox.getDataSegmentKiller().kill(unusedSegments); - countBatchesIssued++; + numBatchesProcessed++; + + if (numBatchesProcessed % 10 == 0) { + LOG.info("kill progress: id [%s] dataSource [%s] batch progress: [%d/%d]", + getId(), getDataSource(), numBatchesProcessed, allUnusedSegments.size()); + } } + LOG.info("kill complete: id [%s] dataSource [%s] interval [%s], total segments [%d], batches [%d]", + getId(), getDataSource(), getInterval(), allUnusedSegments.size(), unusedSegmentBatches.size()); + return TaskStatus.success(getId()); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java index 9b936f52e6d6..f57624ed7ddf 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java @@ -97,7 +97,7 @@ public void testKill() throws Exception newSegment(Intervals.of("2019-01-01/2019-02-01"), version), newSegment(Intervals.of("2019-04-01/2019-05-01"), version) ); - Assert.assertEquals(1L, task.getCountBatchesIssued()); + Assert.assertEquals(1L, task.getNumBatchesProcessed()); } @@ -144,7 +144,7 @@ public void testKillWithMarkUnused() throws Exception newSegment(Intervals.of("2019-01-01/2019-02-01"), version), newSegment(Intervals.of("2019-04-01/2019-05-01"), version) ); - Assert.assertEquals(1L, task.getCountBatchesIssued()); + Assert.assertEquals(1L, task.getNumBatchesProcessed()); } @Test @@ -194,7 +194,7 @@ public void testKillBatchSizeOne() throws Exception getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(DATA_SOURCE, Intervals.of("2019/2020")); Assert.assertEquals(Collections.emptyList(), unusedSegments); - Assert.assertEquals(4L, task.getCountBatchesIssued()); + Assert.assertEquals(4L, task.getNumBatchesProcessed()); } @Test @@ -229,7 +229,7 @@ public void testKillBatchSizeThree() throws Exception getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(DATA_SOURCE, Intervals.of("2019/2020")); Assert.assertEquals(Collections.emptyList(), unusedSegments); - Assert.assertEquals(2L, task.getCountBatchesIssued()); + Assert.assertEquals(2L, task.getNumBatchesProcessed()); } private static DataSegment newSegment(Interval interval, String version) From 3fc9e96d2baeaabcc904826377d79c5ab3dcdc13 Mon Sep 17 00:00:00 2001 From: Jason Koch Date: Sat, 29 Jul 2023 14:50:02 -0700 Subject: [PATCH 05/10] split KillUnusedSegmentsTask to smaller batches (better javadocs) --- .../common/task/KillUnusedSegmentsTask.java | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java index 6c6addd34a5d..f5f26a197b8f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java @@ -63,13 +63,22 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask { private static final Logger LOG = new Logger(KillUnusedSegmentsTask.class); + /** + * Default nuke batch size. This is a small enough size that we still get value from batching, while + * yielding as quickly as possible. In one real cluster environment backed with mysql, ~2000rows/sec, + * with batch size of 100, means a batch should only less than a second for the task lock, and depending + * on the segment store latency, unoptimised S3 cleanups typically take 5-10 seconds per 100. Over time + * we expect the S3 cleanup to get quicker, so this should be < 1 second, which means we'll be yielding + * the task lockbox every 1-2 seconds. + */ private static final int DEFAULT_SEGMENT_NUKE_BATCH_SIZE = 100; private final boolean markAsUnused; - /** Split processing to try and keep each nuke operation relatively short, in the case that either - * the database or the storage layer is particularly slow. - */ + /** + * Split processing to try and keep each nuke operation relatively short, in the case that either + * the database or the storage layer is particularly slow. + */ private final int batchSize; // counter included primarily for testing From 4527a29a98dbb29ba4a977c005718fac26b8a35b Mon Sep 17 00:00:00 2001 From: Jason Koch Date: Mon, 31 Jul 2023 09:16:21 -0700 Subject: [PATCH 06/10] pr feedback - Update docs/data-management/delete.md better description of the task parameters Co-authored-by: Kashif Faraz --- docs/data-management/delete.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/data-management/delete.md b/docs/data-management/delete.md index d6ddc6c6856c..daf67af218ab 100644 --- a/docs/data-management/delete.md +++ b/docs/data-management/delete.md @@ -100,7 +100,7 @@ The available grammar is: } ``` -Special parameter explanations: +Some of the parameters used in the task payload are further explained below: | Parameter |Default| Explanation | |--------------|-------|--------------------------------------------------------------------------------------------------------| From 400ae475a0a33c9478df11fd290e3b80f5870d69 Mon Sep 17 00:00:00 2001 From: Jason Koch Date: Mon, 31 Jul 2023 09:16:54 -0700 Subject: [PATCH 07/10] pr feedback - Update docs/data-management/delete.md better description of batch size parameter Co-authored-by: Kashif Faraz --- docs/data-management/delete.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/data-management/delete.md b/docs/data-management/delete.md index daf67af218ab..260a66a17498 100644 --- a/docs/data-management/delete.md +++ b/docs/data-management/delete.md @@ -104,7 +104,7 @@ Some of the parameters used in the task payload are further explained below: | Parameter |Default| Explanation | |--------------|-------|--------------------------------------------------------------------------------------------------------| -| batchSize |100 | Split Metadata and Segment storage into smaller batches for processing. The Tasks lockbox is locked during operation, preventing other segment operations. Splitting the task into batches allows the kill job to yield to other task lockbox operations.| +| `batchSize` |100 | Maximum number of segments that are deleted in one kill batch. Some operations on the Overlord may get stuck while a `kill` task is in progress due to concurrency constraints (such as in `TaskLockbox`). Thus, a `kill` task splits the list of unused segments to be deleted into smaller batches to yield the Overlord resources intermittently to other task operations.| **WARNING:** The `kill` task permanently removes all information about the affected segments from the metadata store and deep storage. This operation cannot be undone. From f35135b68bac271c3a3d389fb5d188bed00ca739 Mon Sep 17 00:00:00 2001 From: Jason Koch Date: Mon, 31 Jul 2023 09:17:24 -0700 Subject: [PATCH 08/10] pr feedback - Update indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java updated druid logging style Co-authored-by: Kashif Faraz --- .../druid/indexing/common/task/KillUnusedSegmentsTask.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java index f5f26a197b8f..8ee97f4c85c7 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java @@ -164,7 +164,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception // task lockbox to run. By splitting the segment list into smaller batches, we have an opportunity to yield the // lock to other activity that might need to happen using the overlord tasklockbox. - LOG.info("kill starting: id [%s] dataSource [%s] interval [%s], total segments [%d], batches [%d], ([%d] segments per batch)", + LOG.info("Running kill task[%s] for dataSource[%s] and interval[%s]. Killing total [%,d] unused segments in [%d] batches(batchSize = [%d]).", getId(), getDataSource(), getInterval(), allUnusedSegments.size(), unusedSegmentBatches.size(), batchSize); for (final List unusedSegments : unusedSegmentBatches) { From 8907b997b6eb8e27ef3c51abcdcca6dcf10e8f45 Mon Sep 17 00:00:00 2001 From: Jason Koch Date: Mon, 31 Jul 2023 09:18:02 -0700 Subject: [PATCH 09/10] pr feedback - Update indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java updated druid logging style Co-authored-by: Kashif Faraz --- .../druid/indexing/common/task/KillUnusedSegmentsTask.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java index 8ee97f4c85c7..97d8cd26f507 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java @@ -188,8 +188,8 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception numBatchesProcessed++; if (numBatchesProcessed % 10 == 0) { - LOG.info("kill progress: id [%s] dataSource [%s] batch progress: [%d/%d]", - getId(), getDataSource(), numBatchesProcessed, allUnusedSegments.size()); + LOG.info("Processed [%d/%d] batches for kill task[%s].", + numBatchesProcessed, unusedSegmentBatches.size(), getId()); } } From 8e19cb27a4b550d10020ad26d262d55c431ccdec Mon Sep 17 00:00:00 2001 From: Jason Koch Date: Mon, 31 Jul 2023 09:18:18 -0700 Subject: [PATCH 10/10] pr feedback - Update indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java updated druid logging style Co-authored-by: Kashif Faraz --- .../druid/indexing/common/task/KillUnusedSegmentsTask.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java index 97d8cd26f507..0d54ae96b05d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java @@ -193,7 +193,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception } } - LOG.info("kill complete: id [%s] dataSource [%s] interval [%s], total segments [%d], batches [%d]", + LOG.info("Finished kill task[%s] for dataSource[%s] and interval[%s]. Deleted total [%,d] unused segments in [%d] batches.", getId(), getDataSource(), getInterval(), allUnusedSegments.size(), unusedSegmentBatches.size()); return TaskStatus.success(getId());