Skip to content

Commit

Permalink
split KillUnusedSegmentsTask to smaller batches (pr feedback)
Browse files Browse the repository at this point in the history
* provide a longer explanation for kill task batchSize parameter

* add logging details for kill batch progress

* javadoc and field name changes
  • Loading branch information
jasonk000 committed Jul 29, 2023
1 parent ffc0659 commit d3d5a71
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 12 deletions.
8 changes: 7 additions & 1 deletion docs/data-management/delete.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,16 @@ The available grammar is:
"dataSource": <task_datasource>,
"interval" : <all_unused_segments_in_this_interval_will_die!>,
"context": <task context>,
"batchSize": <optional batch size, default is 100. Too large will affect overlord stability.>
"batchSize": <optional_batch size>
}
```

Special parameter explanations:

| Parameter |Default| Explanation |
|--------------|-------|--------------------------------------------------------------------------------------------------------|
| batchSize |100 | Split Metadata and Segment storage into smaller batches for processing. The Tasks lockbox is locked during operation, preventing other segment operations. Splitting the task into batches allows the kill job to yield to other task lockbox operations.|

**WARNING:** The `kill` task permanently removes all information about the affected segments from the metadata store and
deep storage. This operation cannot be undone.

Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,17 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
{
private static final Logger LOG = new Logger(KillUnusedSegmentsTask.class);

// We split this to try and keep each nuke operation relatively short, in the case that either
// the database or the storage layer is particularly slow.
private static final int DEFAULT_SEGMENT_NUKE_BATCH_SIZE = 100;

private final boolean markAsUnused;

/** Split processing to try and keep each nuke operation relatively short, in the case that either
* the database or the storage layer is particularly slow.
*/
private final int batchSize;

private long countBatchesIssued = 0;
// counter included primarily for testing
private long numBatchesProcessed = 0;

@JsonCreator
public KillUnusedSegmentsTask(
Expand Down Expand Up @@ -123,9 +126,9 @@ public Set<ResourceAction> getInputSourceResources()

@JsonIgnore
@VisibleForTesting
long getCountBatchesIssued()
long getNumBatchesProcessed()
{
return countBatchesIssued;
return numBatchesProcessed;
}

@Override
Expand All @@ -152,6 +155,9 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception
// task lockbox to run. By splitting the segment list into smaller batches, we have an opportunity to yield the
// lock to other activity that might need to happen using the overlord tasklockbox.

LOG.info("kill starting: id [%s] dataSource [%s] interval [%s], total segments [%d], batches [%d], ([%d] segments per batch)",
getId(), getDataSource(), getInterval(), allUnusedSegments.size(), unusedSegmentBatches.size(), batchSize);

for (final List<DataSegment> unusedSegments : unusedSegmentBatches) {
if (!TaskLocks.isLockCoversSegments(taskLockMap, unusedSegments)) {
throw new ISE(
Expand All @@ -162,17 +168,25 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception
);
}

// Kill segments:
// Kill segments
// Order is important here: we want the nuke action to clean up the metadata records _before_ the
// segments are removed from storage, this helps maintain that we will always have a storage segment if
// the metadata segment is present. If the segment nuke throws an exception, then the segment cleanup is
// abandoned.

toolbox.getTaskActionClient().submit(new SegmentNukeAction(new HashSet<>(unusedSegments)));
toolbox.getDataSegmentKiller().kill(unusedSegments);
countBatchesIssued++;
numBatchesProcessed++;

Check failure

Code scanning / CodeQL

User-controlled data in arithmetic expression High

This arithmetic expression depends on a
user-provided value
, potentially causing an overflow.

if (numBatchesProcessed % 10 == 0) {
LOG.info("kill progress: id [%s] dataSource [%s] batch progress: [%d/%d]",
getId(), getDataSource(), numBatchesProcessed, allUnusedSegments.size());
}
}

LOG.info("kill complete: id [%s] dataSource [%s] interval [%s], total segments [%d], batches [%d]",
getId(), getDataSource(), getInterval(), allUnusedSegments.size(), unusedSegmentBatches.size());

return TaskStatus.success(getId());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public void testKill() throws Exception
newSegment(Intervals.of("2019-01-01/2019-02-01"), version),
newSegment(Intervals.of("2019-04-01/2019-05-01"), version)
);
Assert.assertEquals(1L, task.getCountBatchesIssued());
Assert.assertEquals(1L, task.getNumBatchesProcessed());
}


Expand Down Expand Up @@ -144,7 +144,7 @@ public void testKillWithMarkUnused() throws Exception
newSegment(Intervals.of("2019-01-01/2019-02-01"), version),
newSegment(Intervals.of("2019-04-01/2019-05-01"), version)
);
Assert.assertEquals(1L, task.getCountBatchesIssued());
Assert.assertEquals(1L, task.getNumBatchesProcessed());
}

@Test
Expand Down Expand Up @@ -194,7 +194,7 @@ public void testKillBatchSizeOne() throws Exception
getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(DATA_SOURCE, Intervals.of("2019/2020"));

Assert.assertEquals(Collections.emptyList(), unusedSegments);
Assert.assertEquals(4L, task.getCountBatchesIssued());
Assert.assertEquals(4L, task.getNumBatchesProcessed());
}

@Test
Expand Down Expand Up @@ -229,7 +229,7 @@ public void testKillBatchSizeThree() throws Exception
getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(DATA_SOURCE, Intervals.of("2019/2020"));

Assert.assertEquals(Collections.emptyList(), unusedSegments);
Assert.assertEquals(2L, task.getCountBatchesIssued());
Assert.assertEquals(2L, task.getNumBatchesProcessed());
}

private static DataSegment newSegment(Interval interval, String version)
Expand Down

0 comments on commit d3d5a71

Please sign in to comment.