Skip to content

Commit

Permalink
split KillUnusedSegmentsTask to smaller batches
Browse files Browse the repository at this point in the history
Processing in smaller chunks allows the task execution to yield the TaskLockbox lock,
which allows the overlord to continue being responsive to other tasks and users while
this particular kill task is executing.
  • Loading branch information
jasonk000 committed Jul 27, 2023
1 parent 9a9038c commit a372da0
Showing 1 changed file with 31 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import org.apache.druid.client.indexing.ClientKillUnusedSegmentsTaskQuery;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskLock;
Expand Down Expand Up @@ -60,6 +61,10 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
{
private static final Logger LOG = new Logger(KillUnusedSegmentsTask.class);

// We split this to try and keep each nuke operation relatively short, in the case that either
// the database or the storage layer is particularly slow.
private static final int SEGMENT_NUKE_BATCH_SIZE = 10_000;

private final boolean markAsUnused;

@JsonCreator
Expand Down Expand Up @@ -114,23 +119,37 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception
}

// List unused segments
final List<DataSegment> unusedSegments = toolbox
final List<DataSegment> allUnusedSegments = toolbox
.getTaskActionClient()
.submit(new RetrieveUnusedSegmentsAction(getDataSource(), getInterval()));

if (!TaskLocks.isLockCoversSegments(taskLockMap, unusedSegments)) {
throw new ISE(
"Locks[%s] for task[%s] can't cover segments[%s]",
taskLockMap.values().stream().flatMap(List::stream).collect(Collectors.toList()),
getId(),
unusedSegments
);
final List<List<DataSegment>> unusedSegmentBatches = Lists.partition(allUnusedSegments, SEGMENT_NUKE_BATCH_SIZE);

// The individual activities here on the toolbox have possibility to run for a longer period of time,
// since they involve calls to metadata storage and archival object storage. And, the tasks take hold of the
// task lockbox to run. By splitting the segment list into smaller batches, we have an opportunity to yield the
// lock to other activity that might need to happen using the overlord tasklockbox.

for (final List<DataSegment> unusedSegments : unusedSegmentBatches) {
if (!TaskLocks.isLockCoversSegments(taskLockMap, unusedSegments)) {
throw new ISE(
"Locks[%s] for task[%s] can't cover segments[%s]",
taskLockMap.values().stream().flatMap(List::stream).collect(Collectors.toList()),
getId(),
unusedSegments
);
}

// Kill segments:
// Order is important here: we want the nuke action to clean up the metadata records _before_ the
// segments are removed from storage, this helps maintain that we will always have a storage segment if
// the metadata segment is present. If the segment nuke throws an exception, then the segment cleanup is
// abandoned.

toolbox.getTaskActionClient().submit(new SegmentNukeAction(new HashSet<>(unusedSegments)));
toolbox.getDataSegmentKiller().kill(unusedSegments);
}

// Kill segments
toolbox.getTaskActionClient().submit(new SegmentNukeAction(new HashSet<>(unusedSegments)));
toolbox.getDataSegmentKiller().kill(unusedSegments);

return TaskStatus.success(getId());
}

Expand Down

0 comments on commit a372da0

Please sign in to comment.