Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

split KillUnusedSegmentsTask to processing in smaller chunks #14642

Merged
merged 10 commits into from
Jul 31, 2023
Merged
10 changes: 9 additions & 1 deletion docs/data-management/delete.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,17 @@ The available grammar is:
"id": <task_id>,
"dataSource": <task_datasource>,
"interval" : <all_unused_segments_in_this_interval_will_die!>,
"context": <task context>
"context": <task context>,
"batchSize": <optional_batch size>
}
```

Some of the parameters used in the task payload are further explained below:

| Parameter |Default| Explanation |
|--------------|-------|--------------------------------------------------------------------------------------------------------|
| `batchSize` |100 | Maximum number of segments that are deleted in one kill batch. Some operations on the Overlord may get stuck while a `kill` task is in progress due to concurrency constraints (such as in `TaskLockbox`). Thus, a `kill` task splits the list of unused segments to be deleted into smaller batches to yield the Overlord resources intermittently to other task operations.|

**WARNING:** The `kill` task permanently removes all information about the affected segments from the metadata store and
deep storage. This operation cannot be undone.

Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import org.apache.druid.client.indexing.ClientKillUnusedSegmentsTaskQuery;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskLock;
Expand Down Expand Up @@ -60,15 +63,35 @@
{
private static final Logger LOG = new Logger(KillUnusedSegmentsTask.class);

/**
* Default nuke batch size. This is a small enough size that we still get value from batching, while
* yielding as quickly as possible. In one real cluster environment backed with mysql, ~2000rows/sec,
* with batch size of 100, means a batch should only less than a second for the task lock, and depending
* on the segment store latency, unoptimised S3 cleanups typically take 5-10 seconds per 100. Over time
* we expect the S3 cleanup to get quicker, so this should be < 1 second, which means we'll be yielding
* the task lockbox every 1-2 seconds.
*/
private static final int DEFAULT_SEGMENT_NUKE_BATCH_SIZE = 100;

private final boolean markAsUnused;

/**
* Split processing to try and keep each nuke operation relatively short, in the case that either
* the database or the storage layer is particularly slow.
*/
private final int batchSize;

// counter included primarily for testing
private long numBatchesProcessed = 0;

@JsonCreator
public KillUnusedSegmentsTask(
@JsonProperty("id") String id,
@JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval,
@JsonProperty("context") Map<String, Object> context,
@JsonProperty("markAsUnused") Boolean markAsUnused
@JsonProperty("markAsUnused") Boolean markAsUnused,
@JsonProperty("batchSize") Integer batchSize
)
{
super(
Expand All @@ -78,6 +101,8 @@
context
);
this.markAsUnused = markAsUnused != null && markAsUnused;
this.batchSize = (batchSize != null) ? batchSize : DEFAULT_SEGMENT_NUKE_BATCH_SIZE;
Preconditions.checkArgument(this.batchSize > 0, "batchSize should be greater than zero");
}

@JsonProperty
Expand All @@ -87,6 +112,13 @@
return markAsUnused;
}

@JsonProperty
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
public int getBatchSize()
{
return batchSize;
}

@Override
public String getType()
{
Expand All @@ -101,6 +133,13 @@
return ImmutableSet.of();
}

@JsonIgnore
@VisibleForTesting
long getNumBatchesProcessed()
{
return numBatchesProcessed;
}

@Override
public TaskStatus runTask(TaskToolbox toolbox) throws Exception
{
Expand All @@ -114,22 +153,48 @@
}

// List unused segments
final List<DataSegment> unusedSegments = toolbox
final List<DataSegment> allUnusedSegments = toolbox
.getTaskActionClient()
.submit(new RetrieveUnusedSegmentsAction(getDataSource(), getInterval()));

if (!TaskLocks.isLockCoversSegments(taskLockMap, unusedSegments)) {
throw new ISE(
"Locks[%s] for task[%s] can't cover segments[%s]",
taskLockMap.values().stream().flatMap(List::stream).collect(Collectors.toList()),
getId(),
unusedSegments
);
final List<List<DataSegment>> unusedSegmentBatches = Lists.partition(allUnusedSegments, batchSize);

jasonk000 marked this conversation as resolved.
Show resolved Hide resolved
// The individual activities here on the toolbox have possibility to run for a longer period of time,
// since they involve calls to metadata storage and archival object storage. And, the tasks take hold of the
// task lockbox to run. By splitting the segment list into smaller batches, we have an opportunity to yield the
// lock to other activity that might need to happen using the overlord tasklockbox.

LOG.info("Running kill task[%s] for dataSource[%s] and interval[%s]. Killing total [%,d] unused segments in [%d] batches(batchSize = [%d]).",
getId(), getDataSource(), getInterval(), allUnusedSegments.size(), unusedSegmentBatches.size(), batchSize);

for (final List<DataSegment> unusedSegments : unusedSegmentBatches) {
if (!TaskLocks.isLockCoversSegments(taskLockMap, unusedSegments)) {
throw new ISE(
"Locks[%s] for task[%s] can't cover segments[%s]",
taskLockMap.values().stream().flatMap(List::stream).collect(Collectors.toList()),
getId(),
unusedSegments
);
}

// Kill segments
// Order is important here: we want the nuke action to clean up the metadata records _before_ the
// segments are removed from storage, this helps maintain that we will always have a storage segment if
jasonk000 marked this conversation as resolved.
Show resolved Hide resolved
// the metadata segment is present. If the segment nuke throws an exception, then the segment cleanup is
// abandoned.

toolbox.getTaskActionClient().submit(new SegmentNukeAction(new HashSet<>(unusedSegments)));
toolbox.getDataSegmentKiller().kill(unusedSegments);
numBatchesProcessed++;
Fixed Show fixed Hide fixed
Dismissed Show dismissed Hide dismissed

if (numBatchesProcessed % 10 == 0) {
LOG.info("Processed [%d/%d] batches for kill task[%s].",
numBatchesProcessed, unusedSegmentBatches.size(), getId());
}
}

// Kill segments
toolbox.getTaskActionClient().submit(new SegmentNukeAction(new HashSet<>(unusedSegments)));
toolbox.getDataSegmentKiller().kill(unusedSegments);
LOG.info("Finished kill task[%s] for dataSource[%s] and interval[%s]. Deleted total [%,d] unused segments in [%d] batches.",
getId(), getDataSource(), getInterval(), allUnusedSegments.size(), unusedSegmentBatches.size());

return TaskStatus.success(getId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,35 @@ public void testClientKillUnusedSegmentsTaskQueryToKillUnusedSegmentsTask() thro
"killTaskId",
"datasource",
Intervals.of("2020-01-01/P1D"),
true
true,
99
);
final byte[] json = objectMapper.writeValueAsBytes(taskQuery);
final KillUnusedSegmentsTask fromJson = (KillUnusedSegmentsTask) objectMapper.readValue(json, Task.class);
Assert.assertEquals(taskQuery.getId(), fromJson.getId());
Assert.assertEquals(taskQuery.getDataSource(), fromJson.getDataSource());
Assert.assertEquals(taskQuery.getInterval(), fromJson.getInterval());
Assert.assertEquals(taskQuery.getMarkAsUnused(), fromJson.isMarkAsUnused());
Assert.assertEquals(taskQuery.getBatchSize(), Integer.valueOf(fromJson.getBatchSize()));
}

@Test
public void testClientKillUnusedSegmentsTaskQueryToKillUnusedSegmentsTaskDefaultBatchSize() throws IOException
{
final ClientKillUnusedSegmentsTaskQuery taskQuery = new ClientKillUnusedSegmentsTaskQuery(
"killTaskId",
"datasource",
Intervals.of("2020-01-01/P1D"),
true,
null
);
final byte[] json = objectMapper.writeValueAsBytes(taskQuery);
final KillUnusedSegmentsTask fromJson = (KillUnusedSegmentsTask) objectMapper.readValue(json, Task.class);
Assert.assertEquals(taskQuery.getId(), fromJson.getId());
Assert.assertEquals(taskQuery.getDataSource(), fromJson.getDataSource());
Assert.assertEquals(taskQuery.getInterval(), fromJson.getInterval());
Assert.assertEquals(taskQuery.getMarkAsUnused(), fromJson.isMarkAsUnused());
Assert.assertEquals(100, fromJson.getBatchSize());
}

@Test
Expand All @@ -69,7 +90,8 @@ public void testKillUnusedSegmentsTaskToClientKillUnusedSegmentsTaskQuery() thro
"datasource",
Intervals.of("2020-01-01/P1D"),
null,
true
true,
99
);
final byte[] json = objectMapper.writeValueAsBytes(task);
final ClientKillUnusedSegmentsTaskQuery taskQuery = (ClientKillUnusedSegmentsTaskQuery) objectMapper.readValue(
Expand All @@ -80,5 +102,6 @@ public void testKillUnusedSegmentsTaskToClientKillUnusedSegmentsTaskQuery() thro
Assert.assertEquals(task.getDataSource(), taskQuery.getDataSource());
Assert.assertEquals(task.getInterval(), taskQuery.getInterval());
Assert.assertEquals(task.isMarkAsUnused(), taskQuery.getMarkAsUnused());
Assert.assertEquals(Integer.valueOf(task.getBatchSize()), taskQuery.getBatchSize());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.junit.Before;
import org.junit.Test;

import java.util.Collections;
import java.util.List;
import java.util.Set;

Expand Down Expand Up @@ -79,7 +80,8 @@ public void testKill() throws Exception
DATA_SOURCE,
Intervals.of("2019-03-01/2019-04-01"),
null,
false
false,
null
);

Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task).get().getStatusCode());
Expand All @@ -95,6 +97,7 @@ public void testKill() throws Exception
newSegment(Intervals.of("2019-01-01/2019-02-01"), version),
newSegment(Intervals.of("2019-04-01/2019-05-01"), version)
);
Assert.assertEquals(1L, task.getNumBatchesProcessed());
}


Expand Down Expand Up @@ -124,7 +127,8 @@ public void testKillWithMarkUnused() throws Exception
DATA_SOURCE,
Intervals.of("2019-03-01/2019-04-01"),
null,
true
true,
null
);

Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task).get().getStatusCode());
Expand All @@ -140,6 +144,7 @@ public void testKillWithMarkUnused() throws Exception
newSegment(Intervals.of("2019-01-01/2019-02-01"), version),
newSegment(Intervals.of("2019-04-01/2019-05-01"), version)
);
Assert.assertEquals(1L, task.getNumBatchesProcessed());
}

@Test
Expand All @@ -151,11 +156,82 @@ public void testGetInputSourceResources()
DATA_SOURCE,
Intervals.of("2019-03-01/2019-04-01"),
null,
true
true,
null
);
Assert.assertTrue(task.getInputSourceResources().isEmpty());
}

@Test
public void testKillBatchSizeOne() throws Exception
{
final String version = DateTimes.nowUtc().toString();
final Set<DataSegment> segments = ImmutableSet.of(
newSegment(Intervals.of("2019-01-01/2019-02-01"), version),
newSegment(Intervals.of("2019-02-01/2019-03-01"), version),
newSegment(Intervals.of("2019-03-01/2019-04-01"), version),
newSegment(Intervals.of("2019-04-01/2019-05-01"), version)
);
final Set<DataSegment> announced = getMetadataStorageCoordinator().announceHistoricalSegments(segments);

Assert.assertEquals(segments, announced);

final KillUnusedSegmentsTask task =
new KillUnusedSegmentsTask(
null,
DATA_SOURCE,
Intervals.of("2018-01-01/2020-01-01"),
null,
true,
1
);

Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task).get().getStatusCode());

// we expect ALL tasks to be deleted

final List<DataSegment> unusedSegments =
getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(DATA_SOURCE, Intervals.of("2019/2020"));

Assert.assertEquals(Collections.emptyList(), unusedSegments);
Assert.assertEquals(4L, task.getNumBatchesProcessed());
}

@Test
public void testKillBatchSizeThree() throws Exception
{
final String version = DateTimes.nowUtc().toString();
final Set<DataSegment> segments = ImmutableSet.of(
newSegment(Intervals.of("2019-01-01/2019-02-01"), version),
newSegment(Intervals.of("2019-02-01/2019-03-01"), version),
newSegment(Intervals.of("2019-03-01/2019-04-01"), version),
newSegment(Intervals.of("2019-04-01/2019-05-01"), version)
);
final Set<DataSegment> announced = getMetadataStorageCoordinator().announceHistoricalSegments(segments);

Assert.assertEquals(segments, announced);

final KillUnusedSegmentsTask task =
new KillUnusedSegmentsTask(
null,
DATA_SOURCE,
Intervals.of("2018-01-01/2020-01-01"),
null,
true,
3
);

Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task).get().getStatusCode());

// we expect ALL tasks to be deleted

final List<DataSegment> unusedSegments =
getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(DATA_SOURCE, Intervals.of("2019/2020"));

Assert.assertEquals(Collections.emptyList(), unusedSegments);
Assert.assertEquals(2L, task.getNumBatchesProcessed());
}

private static DataSegment newSegment(Interval interval, String version)
{
return new DataSegment(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -948,14 +948,16 @@ public DataSegment apply(String input)
"test_kill_task",
Intervals.of("2011-04-01/P4D"),
null,
false
false,
null
);

final TaskStatus status = runTask(killUnusedSegmentsTask);
Assert.assertEquals(taskLocation, status.getLocation());
Assert.assertEquals("merged statusCode", TaskState.SUCCESS, status.getStatusCode());
Assert.assertEquals("num segments published", 0, mdc.getPublished().size());
Assert.assertEquals("num segments nuked", 3, mdc.getNuked().size());
Assert.assertEquals("delete segment batch call count", 1, mdc.getDeleteSegmentsCount());
Assert.assertTrue(
"expected unused segments get killed",
expectedUnusedSegments.containsAll(mdc.getNuked()) && mdc.getNuked().containsAll(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class TestIndexerMetadataStorageCoordinator implements IndexerMetadataSto
private final Set<DataSegment> published = Sets.newConcurrentHashSet();
private final Set<DataSegment> nuked = Sets.newConcurrentHashSet();
private final List<DataSegment> unusedSegments;
private int deleteSegmentsCount = 0;

public TestIndexerMetadataStorageCoordinator()
{
Expand Down Expand Up @@ -201,6 +202,7 @@ public int deletePendingSegments(String dataSource)
@Override
public void deleteSegments(Set<DataSegment> segments)
{
deleteSegmentsCount++;
Dismissed Show dismissed Hide dismissed
nuked.addAll(segments);
}

Expand All @@ -220,6 +222,11 @@ public Set<DataSegment> getNuked()
return ImmutableSet.copyOf(nuked);
}

public int getDeleteSegmentsCount()
{
return deleteSegmentsCount;
}

public void setUnusedSegments(List<DataSegment> newUnusedSegments)
{
synchronized (unusedSegments) {
Expand Down
Loading
Loading