Skip to content

Commit

Permalink
split KillUnusedSegmentsTask to processing in smaller chunks (#14642)
Browse files Browse the repository at this point in the history
split KillUnusedSegmentsTask to smaller batches

Processing in smaller chunks allows the task execution to yield the TaskLockbox lock,
which allows the overlord to continue being responsive to other tasks and users while
this particular kill task is executing.

* introduce KillUnusedSegmentsTask batchSize parameter to control size of batching

* provide an explanation for kill task batchSize parameter

* add logging details for kill batch progress
  • Loading branch information
jasonk000 authored Jul 31, 2023
1 parent 339b8d9 commit 44d5c1a
Show file tree
Hide file tree
Showing 10 changed files with 224 additions and 26 deletions.
10 changes: 9 additions & 1 deletion docs/data-management/delete.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,17 @@ The available grammar is:
"id": <task_id>,
"dataSource": <task_datasource>,
"interval" : <all_unused_segments_in_this_interval_will_die!>,
"context": <task context>
"context": <task context>,
"batchSize": <optional_batch size>
}
```

Some of the parameters used in the task payload are further explained below:

| Parameter |Default| Explanation |
|--------------|-------|--------------------------------------------------------------------------------------------------------|
| `batchSize` |100 | Maximum number of segments that are deleted in one kill batch. Some operations on the Overlord may get stuck while a `kill` task is in progress due to concurrency constraints (such as in `TaskLockbox`). Thus, a `kill` task splits the list of unused segments to be deleted into smaller batches to yield the Overlord resources intermittently to other task operations.|

**WARNING:** The `kill` task permanently removes all information about the affected segments from the metadata store and
deep storage. This operation cannot be undone.

Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import org.apache.druid.client.indexing.ClientKillUnusedSegmentsTaskQuery;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskLock;
Expand Down Expand Up @@ -60,15 +63,35 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
{
private static final Logger LOG = new Logger(KillUnusedSegmentsTask.class);

/**
* Default nuke batch size. This is a small enough size that we still get value from batching, while
* yielding as quickly as possible. In one real cluster environment backed with mysql, ~2000rows/sec,
* with batch size of 100, means a batch should only less than a second for the task lock, and depending
* on the segment store latency, unoptimised S3 cleanups typically take 5-10 seconds per 100. Over time
* we expect the S3 cleanup to get quicker, so this should be < 1 second, which means we'll be yielding
* the task lockbox every 1-2 seconds.
*/
private static final int DEFAULT_SEGMENT_NUKE_BATCH_SIZE = 100;

private final boolean markAsUnused;

/**
* Split processing to try and keep each nuke operation relatively short, in the case that either
* the database or the storage layer is particularly slow.
*/
private final int batchSize;

// counter included primarily for testing
private long numBatchesProcessed = 0;

@JsonCreator
public KillUnusedSegmentsTask(
@JsonProperty("id") String id,
@JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval,
@JsonProperty("context") Map<String, Object> context,
@JsonProperty("markAsUnused") Boolean markAsUnused
@JsonProperty("markAsUnused") Boolean markAsUnused,
@JsonProperty("batchSize") Integer batchSize
)
{
super(
Expand All @@ -78,6 +101,8 @@ public KillUnusedSegmentsTask(
context
);
this.markAsUnused = markAsUnused != null && markAsUnused;
this.batchSize = (batchSize != null) ? batchSize : DEFAULT_SEGMENT_NUKE_BATCH_SIZE;
Preconditions.checkArgument(this.batchSize > 0, "batchSize should be greater than zero");
}

@JsonProperty
Expand All @@ -87,6 +112,13 @@ public boolean isMarkAsUnused()
return markAsUnused;
}

@JsonProperty
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
public int getBatchSize()
{
return batchSize;
}

@Override
public String getType()
{
Expand All @@ -101,6 +133,13 @@ public Set<ResourceAction> getInputSourceResources()
return ImmutableSet.of();
}

@JsonIgnore
@VisibleForTesting
long getNumBatchesProcessed()
{
return numBatchesProcessed;
}

@Override
public TaskStatus runTask(TaskToolbox toolbox) throws Exception
{
Expand All @@ -114,22 +153,48 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception
}

// List unused segments
final List<DataSegment> unusedSegments = toolbox
final List<DataSegment> allUnusedSegments = toolbox
.getTaskActionClient()
.submit(new RetrieveUnusedSegmentsAction(getDataSource(), getInterval()));

if (!TaskLocks.isLockCoversSegments(taskLockMap, unusedSegments)) {
throw new ISE(
"Locks[%s] for task[%s] can't cover segments[%s]",
taskLockMap.values().stream().flatMap(List::stream).collect(Collectors.toList()),
getId(),
unusedSegments
);
final List<List<DataSegment>> unusedSegmentBatches = Lists.partition(allUnusedSegments, batchSize);

// The individual activities here on the toolbox have possibility to run for a longer period of time,
// since they involve calls to metadata storage and archival object storage. And, the tasks take hold of the
// task lockbox to run. By splitting the segment list into smaller batches, we have an opportunity to yield the
// lock to other activity that might need to happen using the overlord tasklockbox.

LOG.info("Running kill task[%s] for dataSource[%s] and interval[%s]. Killing total [%,d] unused segments in [%d] batches(batchSize = [%d]).",
getId(), getDataSource(), getInterval(), allUnusedSegments.size(), unusedSegmentBatches.size(), batchSize);

for (final List<DataSegment> unusedSegments : unusedSegmentBatches) {
if (!TaskLocks.isLockCoversSegments(taskLockMap, unusedSegments)) {
throw new ISE(
"Locks[%s] for task[%s] can't cover segments[%s]",
taskLockMap.values().stream().flatMap(List::stream).collect(Collectors.toList()),
getId(),
unusedSegments
);
}

// Kill segments
// Order is important here: we want the nuke action to clean up the metadata records _before_ the
// segments are removed from storage, this helps maintain that we will always have a storage segment if
// the metadata segment is present. If the segment nuke throws an exception, then the segment cleanup is
// abandoned.

toolbox.getTaskActionClient().submit(new SegmentNukeAction(new HashSet<>(unusedSegments)));
toolbox.getDataSegmentKiller().kill(unusedSegments);
numBatchesProcessed++;

if (numBatchesProcessed % 10 == 0) {
LOG.info("Processed [%d/%d] batches for kill task[%s].",
numBatchesProcessed, unusedSegmentBatches.size(), getId());
}
}

// Kill segments
toolbox.getTaskActionClient().submit(new SegmentNukeAction(new HashSet<>(unusedSegments)));
toolbox.getDataSegmentKiller().kill(unusedSegments);
LOG.info("Finished kill task[%s] for dataSource[%s] and interval[%s]. Deleted total [%,d] unused segments in [%d] batches.",
getId(), getDataSource(), getInterval(), allUnusedSegments.size(), unusedSegmentBatches.size());

return TaskStatus.success(getId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,35 @@ public void testClientKillUnusedSegmentsTaskQueryToKillUnusedSegmentsTask() thro
"killTaskId",
"datasource",
Intervals.of("2020-01-01/P1D"),
true
true,
99
);
final byte[] json = objectMapper.writeValueAsBytes(taskQuery);
final KillUnusedSegmentsTask fromJson = (KillUnusedSegmentsTask) objectMapper.readValue(json, Task.class);
Assert.assertEquals(taskQuery.getId(), fromJson.getId());
Assert.assertEquals(taskQuery.getDataSource(), fromJson.getDataSource());
Assert.assertEquals(taskQuery.getInterval(), fromJson.getInterval());
Assert.assertEquals(taskQuery.getMarkAsUnused(), fromJson.isMarkAsUnused());
Assert.assertEquals(taskQuery.getBatchSize(), Integer.valueOf(fromJson.getBatchSize()));
}

@Test
public void testClientKillUnusedSegmentsTaskQueryToKillUnusedSegmentsTaskDefaultBatchSize() throws IOException
{
final ClientKillUnusedSegmentsTaskQuery taskQuery = new ClientKillUnusedSegmentsTaskQuery(
"killTaskId",
"datasource",
Intervals.of("2020-01-01/P1D"),
true,
null
);
final byte[] json = objectMapper.writeValueAsBytes(taskQuery);
final KillUnusedSegmentsTask fromJson = (KillUnusedSegmentsTask) objectMapper.readValue(json, Task.class);
Assert.assertEquals(taskQuery.getId(), fromJson.getId());
Assert.assertEquals(taskQuery.getDataSource(), fromJson.getDataSource());
Assert.assertEquals(taskQuery.getInterval(), fromJson.getInterval());
Assert.assertEquals(taskQuery.getMarkAsUnused(), fromJson.isMarkAsUnused());
Assert.assertEquals(100, fromJson.getBatchSize());
}

@Test
Expand All @@ -69,7 +90,8 @@ public void testKillUnusedSegmentsTaskToClientKillUnusedSegmentsTaskQuery() thro
"datasource",
Intervals.of("2020-01-01/P1D"),
null,
true
true,
99
);
final byte[] json = objectMapper.writeValueAsBytes(task);
final ClientKillUnusedSegmentsTaskQuery taskQuery = (ClientKillUnusedSegmentsTaskQuery) objectMapper.readValue(
Expand All @@ -80,5 +102,6 @@ public void testKillUnusedSegmentsTaskToClientKillUnusedSegmentsTaskQuery() thro
Assert.assertEquals(task.getDataSource(), taskQuery.getDataSource());
Assert.assertEquals(task.getInterval(), taskQuery.getInterval());
Assert.assertEquals(task.isMarkAsUnused(), taskQuery.getMarkAsUnused());
Assert.assertEquals(Integer.valueOf(task.getBatchSize()), taskQuery.getBatchSize());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.junit.Before;
import org.junit.Test;

import java.util.Collections;
import java.util.List;
import java.util.Set;

Expand Down Expand Up @@ -79,7 +80,8 @@ public void testKill() throws Exception
DATA_SOURCE,
Intervals.of("2019-03-01/2019-04-01"),
null,
false
false,
null
);

Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task).get().getStatusCode());
Expand All @@ -95,6 +97,7 @@ public void testKill() throws Exception
newSegment(Intervals.of("2019-01-01/2019-02-01"), version),
newSegment(Intervals.of("2019-04-01/2019-05-01"), version)
);
Assert.assertEquals(1L, task.getNumBatchesProcessed());
}


Expand Down Expand Up @@ -124,7 +127,8 @@ public void testKillWithMarkUnused() throws Exception
DATA_SOURCE,
Intervals.of("2019-03-01/2019-04-01"),
null,
true
true,
null
);

Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task).get().getStatusCode());
Expand All @@ -140,6 +144,7 @@ public void testKillWithMarkUnused() throws Exception
newSegment(Intervals.of("2019-01-01/2019-02-01"), version),
newSegment(Intervals.of("2019-04-01/2019-05-01"), version)
);
Assert.assertEquals(1L, task.getNumBatchesProcessed());
}

@Test
Expand All @@ -151,11 +156,82 @@ public void testGetInputSourceResources()
DATA_SOURCE,
Intervals.of("2019-03-01/2019-04-01"),
null,
true
true,
null
);
Assert.assertTrue(task.getInputSourceResources().isEmpty());
}

@Test
public void testKillBatchSizeOne() throws Exception
{
final String version = DateTimes.nowUtc().toString();
final Set<DataSegment> segments = ImmutableSet.of(
newSegment(Intervals.of("2019-01-01/2019-02-01"), version),
newSegment(Intervals.of("2019-02-01/2019-03-01"), version),
newSegment(Intervals.of("2019-03-01/2019-04-01"), version),
newSegment(Intervals.of("2019-04-01/2019-05-01"), version)
);
final Set<DataSegment> announced = getMetadataStorageCoordinator().announceHistoricalSegments(segments);

Assert.assertEquals(segments, announced);

final KillUnusedSegmentsTask task =
new KillUnusedSegmentsTask(
null,
DATA_SOURCE,
Intervals.of("2018-01-01/2020-01-01"),
null,
true,
1
);

Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task).get().getStatusCode());

// we expect ALL tasks to be deleted

final List<DataSegment> unusedSegments =
getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(DATA_SOURCE, Intervals.of("2019/2020"));

Assert.assertEquals(Collections.emptyList(), unusedSegments);
Assert.assertEquals(4L, task.getNumBatchesProcessed());
}

@Test
public void testKillBatchSizeThree() throws Exception
{
final String version = DateTimes.nowUtc().toString();
final Set<DataSegment> segments = ImmutableSet.of(
newSegment(Intervals.of("2019-01-01/2019-02-01"), version),
newSegment(Intervals.of("2019-02-01/2019-03-01"), version),
newSegment(Intervals.of("2019-03-01/2019-04-01"), version),
newSegment(Intervals.of("2019-04-01/2019-05-01"), version)
);
final Set<DataSegment> announced = getMetadataStorageCoordinator().announceHistoricalSegments(segments);

Assert.assertEquals(segments, announced);

final KillUnusedSegmentsTask task =
new KillUnusedSegmentsTask(
null,
DATA_SOURCE,
Intervals.of("2018-01-01/2020-01-01"),
null,
true,
3
);

Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task).get().getStatusCode());

// we expect ALL tasks to be deleted

final List<DataSegment> unusedSegments =
getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(DATA_SOURCE, Intervals.of("2019/2020"));

Assert.assertEquals(Collections.emptyList(), unusedSegments);
Assert.assertEquals(2L, task.getNumBatchesProcessed());
}

private static DataSegment newSegment(Interval interval, String version)
{
return new DataSegment(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -948,14 +948,16 @@ public DataSegment apply(String input)
"test_kill_task",
Intervals.of("2011-04-01/P4D"),
null,
false
false,
null
);

final TaskStatus status = runTask(killUnusedSegmentsTask);
Assert.assertEquals(taskLocation, status.getLocation());
Assert.assertEquals("merged statusCode", TaskState.SUCCESS, status.getStatusCode());
Assert.assertEquals("num segments published", 0, mdc.getPublished().size());
Assert.assertEquals("num segments nuked", 3, mdc.getNuked().size());
Assert.assertEquals("delete segment batch call count", 1, mdc.getDeleteSegmentsCount());
Assert.assertTrue(
"expected unused segments get killed",
expectedUnusedSegments.containsAll(mdc.getNuked()) && mdc.getNuked().containsAll(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class TestIndexerMetadataStorageCoordinator implements IndexerMetadataSto
private final Set<DataSegment> published = Sets.newConcurrentHashSet();
private final Set<DataSegment> nuked = Sets.newConcurrentHashSet();
private final List<DataSegment> unusedSegments;
private int deleteSegmentsCount = 0;

public TestIndexerMetadataStorageCoordinator()
{
Expand Down Expand Up @@ -201,6 +202,7 @@ public int deletePendingSegments(String dataSource)
@Override
public void deleteSegments(Set<DataSegment> segments)
{
deleteSegmentsCount++;
nuked.addAll(segments);
}

Expand All @@ -220,6 +222,11 @@ public Set<DataSegment> getNuked()
return ImmutableSet.copyOf(nuked);
}

public int getDeleteSegmentsCount()
{
return deleteSegmentsCount;
}

public void setUnusedSegments(List<DataSegment> newUnusedSegments)
{
synchronized (unusedSegments) {
Expand Down
Loading

0 comments on commit 44d5c1a

Please sign in to comment.