Skip to content

Commit

Permalink
Add an option for ingestion task to drop (mark unused) all existing s…
Browse files Browse the repository at this point in the history
…egments that are contained by interval in the ingestionSpec (#11025)

* Auto-Compaction can run indefinitely when segmentGranularity is changed from coarser to finer.

* Add option to drop segments after ingestion

* fix checkstyle

* add tests

* add tests

* add tests

* fix test

* add tests

* fix checkstyle

* fix checkstyle

* add docs

* fix docs

* address comments

* address comments

* fix spelling
  • Loading branch information
maytasm authored Apr 1, 2021
1 parent 67dd61e commit d7f5293
Show file tree
Hide file tree
Showing 51 changed files with 1,663 additions and 144 deletions.
4 changes: 3 additions & 1 deletion docs/ingestion/compaction.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ In cases where you require more control over compaction, you can manually submit
See [Setting up a manual compaction task](#setting-up-manual-compaction) for more about manual compaction tasks.

## Data handling with compaction
During compaction, Druid overwrites the original set of segments with the compacted set. During compaction Druid locks the segments for the time interval being compacted to ensure data consistency. By default, compaction tasks do not modify the underlying data. You can configure the compaction task to change the query granularity or add or remove dimensions in the compaction task. This means that the only changes to query results should be the result of intentional, not automatic, changes.
During compaction, Druid overwrites the original set of segments with the compacted set. Druid also locks the segments for the time interval being compacted to ensure data consistency. By default, compaction tasks do not modify the underlying data. You can configure the compaction task to change the query granularity or add or remove dimensions in the compaction task. This means that the only changes to query results should be the result of intentional, not automatic, changes.

For compaction tasks, `dropExisting` for underlying ingestion tasks is "true". This means that Druid can drop (mark unused) all the existing segments fully within interval for the compaction task. For an example of why this is important, see the suggestion for reindexing with finer granularity under [Implementation considerations](native-batch.md#implementation-considerations).

If an ingestion task needs to write data to a segment for a time interval locked for compaction, by default the ingestion task supersedes the compaction task and the compaction task fails without finishing. For manual compaction tasks you can adjust the input spec interval to avoid conflicts between ingestion and compaction. For automatic compaction, you can set the `skipOffsetFromLatest` key to adjustment the auto compaction starting point from the current time to reduce the chance of conflicts between ingestion and compaction. See [Compaction dynamic configuration](../configuration/index.md#compaction-dynamic-configuration) for more information. Another option is to set the compaction task to higher priority than the ingestion task.

Expand Down
35 changes: 32 additions & 3 deletions docs/ingestion/native-batch.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ The supported compression formats for native batch ingestion are `bz2`, `gz`, `x

- [`static-cloudfiles`](../development/extensions-contrib/cloudfiles.md#firehose)

You may want to consider the below things:
### Implementation considerations

- You may want to control the amount of input data each worker task processes. This can be
controlled using different configurations depending on the phase in parallel ingestion (see [`partitionsSpec`](#partitionsspec) for more details).
Expand All @@ -89,7 +89,33 @@ You may want to consider the below things:
data in segments where it actively adds data: if there are segments in your `granularitySpec`'s intervals that have
no data written by this task, they will be left alone. If any existing segments partially overlap with the
`granularitySpec`'s intervals, the portion of those segments outside the new segments' intervals will still be visible.

- You can set `dropExisting` flag in the `ioConfig` to true if you want the ingestion task to drop all existing segments that
start and end within your `granularitySpec`'s intervals. This applies whether or not the new data covers all existing segments.
`dropExisting` only applies when `appendToExisting` is false and the `granularitySpec` contains an `interval`.

The following examples demonstrate when to set the `dropExisting` property to true in the `ioConfig`:

- Example 1: Consider an existing segment with an interval of 2020-01-01 to 2021-01-01 and YEAR segmentGranularity. You want to
overwrite the whole interval of 2020-01-01 to 2021-01-01 with new data using the finer segmentGranularity of MONTH.
If the replacement data does not have a record within every months from 2020-01-01 to 2021-01-01
Druid cannot drop the original YEAR segment even if it does include all the replacement. Set `dropExisting` to true in this case to drop
the original segment at year `segmentGranularity` since you no longer need it.
- Example 2: Consider the case where you want to re-ingest or overwrite a datasource and the new data does not contains some time intervals that exist
in the datasource. For example, a datasource contains the following data at MONTH segmentGranularity:
January: 1 record
February: 10 records
March: 10 records
You want to re-ingest and overwrite with new data as follows:
January: 0 records
February: 10 records
March: 9 records
Unless you set `dropExisting` to true, the result after ingestion with overwrite using the same MONTH segmentGranularity would be:
January: 1 record
February: 10 records
March: 9 records
This is incorrect since the new data has 0 records for January. Setting `dropExisting` to true to drop the original
segment for January that is not needed since the newly ingested data has no records for January.

### Task syntax

A sample task is shown below:
Expand Down Expand Up @@ -193,6 +219,7 @@ that range if there's some stray data with unexpected timestamps.
|type|The task type, this should always be `index_parallel`.|none|yes|
|inputFormat|[`inputFormat`](./data-formats.md#input-format) to specify how to parse input data.|none|yes|
|appendToExisting|Creates segments as additional shards of the latest version, effectively appending to the segment set instead of replacing it. This means that you can append new segments to any datasource regardless of its original partitioning scheme. You must use the `dynamic` partitioning type for the appended segments. If you specify a different partitioning type, the task fails with an error.|false|no|
|dropExisting|If `true` and `appendToExisting` is `false` and the `granularitySpec` contains an`interval`, then the ingestion task drops (mark unused) all existing segments fully contained by the specified `interval` when the task publishes new segments. If ingestion fails, Druid does not drop or mark unused any segments. In the case of misconfiguration where either `appendToExisting` is `true` or `interval` is not specified in `granularitySpec`, Druid does not drop any segments even if `dropExisting` is `true`.|false|no|

### `tuningConfig`

Expand Down Expand Up @@ -538,7 +565,8 @@ An example of the result is
"l_comment"
]
},
"appendToExisting": false
"appendToExisting": false,
"dropExisting": false
},
"tuningConfig": {
"type": "index_parallel",
Expand Down Expand Up @@ -719,6 +747,7 @@ that range if there's some stray data with unexpected timestamps.
|type|The task type, this should always be "index".|none|yes|
|inputFormat|[`inputFormat`](./data-formats.md#input-format) to specify how to parse input data.|none|yes|
|appendToExisting|Creates segments as additional shards of the latest version, effectively appending to the segment set instead of replacing it. This means that you can append new segments to any datasource regardless of its original partitioning scheme. You must use the `dynamic` partitioning type for the appended segments. If you specify a different partitioning type, the task fails with an error.|false|no|
|dropExisting|If `true` and `appendToExisting` is `false` and the `granularitySpec` contains an`interval`, then the ingestion task drops (mark unused) all existing segments fully contained by the specified `interval` when the task publishes new segments. If ingestion fails, Druid does not drop or mark unused any segments. In the case of misconfiguration where either `appendToExisting` is `true` or `interval` is not specified in `granularitySpec`, Druid does not drop any segments even if `dropExisting` is `true`.|false|no|

### `tuningConfig`

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,22 @@
*/
public class SegmentTransactionalInsertAction implements TaskAction<SegmentPublishResult>
{
/**
* Set of segments that was fully overshadowed by new segments, {@link SegmentTransactionalInsertAction#segments}
*/
@Nullable
private final Set<DataSegment> segmentsToBeOverwritten;
/**
* Set of segments to be inserted into metadata storage
*/
private final Set<DataSegment> segments;
/**
* Set of segments to be dropped (mark unused) when new segments, {@link SegmentTransactionalInsertAction#segments},
* are inserted into metadata storage.
*/
@Nullable
private final Set<DataSegment> segmentsToBeDropped;

@Nullable
private final DataSourceMetadata startMetadata;
@Nullable
Expand All @@ -66,10 +79,11 @@ public class SegmentTransactionalInsertAction implements TaskAction<SegmentPubli

public static SegmentTransactionalInsertAction overwriteAction(
@Nullable Set<DataSegment> segmentsToBeOverwritten,
@Nullable Set<DataSegment> segmentsToBeDropped,
Set<DataSegment> segmentsToPublish
)
{
return new SegmentTransactionalInsertAction(segmentsToBeOverwritten, segmentsToPublish, null, null, null);
return new SegmentTransactionalInsertAction(segmentsToBeOverwritten, segmentsToBeDropped, segmentsToPublish, null, null, null);
}

public static SegmentTransactionalInsertAction appendAction(
Expand All @@ -78,7 +92,7 @@ public static SegmentTransactionalInsertAction appendAction(
@Nullable DataSourceMetadata endMetadata
)
{
return new SegmentTransactionalInsertAction(null, segments, startMetadata, endMetadata, null);
return new SegmentTransactionalInsertAction(null, null, segments, startMetadata, endMetadata, null);
}

public static SegmentTransactionalInsertAction commitMetadataOnlyAction(
Expand All @@ -87,19 +101,21 @@ public static SegmentTransactionalInsertAction commitMetadataOnlyAction(
DataSourceMetadata endMetadata
)
{
return new SegmentTransactionalInsertAction(null, null, startMetadata, endMetadata, dataSource);
return new SegmentTransactionalInsertAction(null, null, null, startMetadata, endMetadata, dataSource);
}

@JsonCreator
private SegmentTransactionalInsertAction(
@JsonProperty("segmentsToBeOverwritten") @Nullable Set<DataSegment> segmentsToBeOverwritten,
@JsonProperty("segmentsToBeDropped") @Nullable Set<DataSegment> segmentsToBeDropped,
@JsonProperty("segments") @Nullable Set<DataSegment> segments,
@JsonProperty("startMetadata") @Nullable DataSourceMetadata startMetadata,
@JsonProperty("endMetadata") @Nullable DataSourceMetadata endMetadata,
@JsonProperty("dataSource") @Nullable String dataSource
)
{
this.segmentsToBeOverwritten = segmentsToBeOverwritten;
this.segmentsToBeDropped = segmentsToBeDropped;
this.segments = segments == null ? ImmutableSet.of() : ImmutableSet.copyOf(segments);
this.startMetadata = startMetadata;
this.endMetadata = endMetadata;
Expand All @@ -113,6 +129,13 @@ public Set<DataSegment> getSegmentsToBeOverwritten()
return segmentsToBeOverwritten;
}

@JsonProperty
@Nullable
public Set<DataSegment> getSegmentsToBeDropped()
{
return segmentsToBeDropped;
}

@JsonProperty
public Set<DataSegment> getSegments()
{
Expand Down Expand Up @@ -176,6 +199,10 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox)
if (segmentsToBeOverwritten != null) {
allSegments.addAll(segmentsToBeOverwritten);
}
if (segmentsToBeDropped != null) {
allSegments.addAll(segmentsToBeDropped);
}

TaskLocks.checkLockCoversSegments(task, toolbox.getTaskLockbox(), allSegments);

if (segmentsToBeOverwritten != null && !segmentsToBeOverwritten.isEmpty()) {
Expand All @@ -194,6 +221,7 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox)
.onValidLocks(
() -> toolbox.getIndexerMetadataStorageCoordinator().announceHistoricalSegments(
segments,
segmentsToBeDropped,
startMetadata,
endMetadata
)
Expand Down Expand Up @@ -305,7 +333,8 @@ public String toString()
", segments=" + SegmentUtils.commaSeparatedIdentifiers(segments) +
", startMetadata=" + startMetadata +
", endMetadata=" + endMetadata +
", dataSource=" + dataSource +
", dataSource='" + dataSource + '\'' +
", segmentsToBeDropped=" + SegmentUtils.commaSeparatedIdentifiers(segmentsToBeDropped) +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -481,6 +482,28 @@ public static Function<Set<DataSegment>, Set<DataSegment>> compactionStateAnnota
}
}

public static Set<DataSegment> getUsedSegmentsWithinInterval(
TaskToolbox toolbox,
String dataSource,
List<Interval> intervals
) throws IOException
{
Set<DataSegment> segmentsFoundForDrop = new HashSet<>();
List<Interval> condensedIntervals = JodaUtils.condenseIntervals(intervals);
if (!intervals.isEmpty()) {
Collection<DataSegment> usedSegment = toolbox.getTaskActionClient().submit(new RetrieveUsedSegmentsAction(dataSource, null, condensedIntervals, Segments.ONLY_VISIBLE));
for (DataSegment segment : usedSegment) {
for (Interval interval : condensedIntervals) {
if (interval.contains(segment.getInterval())) {
segmentsFoundForDrop.add(segment);
break;
}
}
}
}
return segmentsFoundForDrop;
}

@Nullable
static Granularity findGranularityFromSegments(List<DataSegment> segments)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,11 +325,17 @@ public TaskStatus run(final TaskToolbox toolbox)
int sequenceNumber = 0;
String sequenceName = makeSequenceName(getId(), sequenceNumber);

final TransactionalSegmentPublisher publisher = (mustBeNullOrEmptySegments, segments, commitMetadata) -> {
if (mustBeNullOrEmptySegments != null && !mustBeNullOrEmptySegments.isEmpty()) {
final TransactionalSegmentPublisher publisher = (mustBeNullOrEmptyOverwriteSegments, mustBeNullOrEmptyDropSegments, segments, commitMetadata) -> {
if (mustBeNullOrEmptyOverwriteSegments != null && !mustBeNullOrEmptyOverwriteSegments.isEmpty()) {
throw new ISE(
"Stream ingestion task unexpectedly attempted to overwrite segments: %s",
SegmentUtils.commaSeparatedIdentifiers(mustBeNullOrEmptySegments)
SegmentUtils.commaSeparatedIdentifiers(mustBeNullOrEmptyOverwriteSegments)
);
}
if (mustBeNullOrEmptyDropSegments != null && !mustBeNullOrEmptyDropSegments.isEmpty()) {
throw new ISE(
"Stream ingestion task unexpectedly attempted to drop segments: %s",
SegmentUtils.commaSeparatedIdentifiers(mustBeNullOrEmptyDropSegments)
);
}
final SegmentTransactionalInsertAction action = SegmentTransactionalInsertAction.appendAction(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -633,7 +633,8 @@ private static ParallelIndexIOConfig createIoConfig(
toolbox.getConfig()
),
null,
false
false,
true
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -854,9 +854,14 @@ private TaskStatus generateAndPublishSegments(
throw new UOE("[%s] secondary partition type is not supported", partitionsSpec.getType());
}

final TransactionalSegmentPublisher publisher = (segmentsToBeOverwritten, segmentsToPublish, commitMetadata) ->
Set<DataSegment> segmentsFoundForDrop = null;
if (ingestionSchema.getIOConfig().isDropExisting()) {
segmentsFoundForDrop = getUsedSegmentsWithinInterval(toolbox, getDataSource(), ingestionSchema.getDataSchema().getGranularitySpec().inputIntervals());
}

final TransactionalSegmentPublisher publisher = (segmentsToBeOverwritten, segmentsToDrop, segmentsToPublish, commitMetadata) ->
toolbox.getTaskActionClient()
.submit(SegmentTransactionalInsertAction.overwriteAction(segmentsToBeOverwritten, segmentsToPublish));
.submit(SegmentTransactionalInsertAction.overwriteAction(segmentsToBeOverwritten, segmentsToDrop, segmentsToPublish));

String effectiveId = getContextValue(CompactionTask.CTX_KEY_APPENDERATOR_TRACKING_TASK_ID, null);
if (effectiveId == null) {
Expand Down Expand Up @@ -910,7 +915,7 @@ private TaskStatus generateAndPublishSegments(

// Probably we can publish atomicUpdateGroup along with segments.
final SegmentsAndCommitMetadata published =
awaitPublish(driver.publishAll(inputSegments, publisher, annotateFunction), pushTimeout);
awaitPublish(driver.publishAll(inputSegments, segmentsFoundForDrop, publisher, annotateFunction), pushTimeout);
appenderator.close();

ingestionState = IngestionState.COMPLETED;
Expand Down Expand Up @@ -1028,18 +1033,21 @@ public IndexTuningConfig getTuningConfig()
public static class IndexIOConfig implements BatchIOConfig
{
private static final boolean DEFAULT_APPEND_TO_EXISTING = false;
private static final boolean DEFAULT_DROP_EXISTING = false;

private final FirehoseFactory firehoseFactory;
private final InputSource inputSource;
private final InputFormat inputFormat;
private final boolean appendToExisting;
private final boolean dropExisting;

@JsonCreator
public IndexIOConfig(
@Deprecated @JsonProperty("firehose") @Nullable FirehoseFactory firehoseFactory,
@JsonProperty("inputSource") @Nullable InputSource inputSource,
@JsonProperty("inputFormat") @Nullable InputFormat inputFormat,
@JsonProperty("appendToExisting") @Nullable Boolean appendToExisting
@JsonProperty("appendToExisting") @Nullable Boolean appendToExisting,
@JsonProperty("dropExisting") @Nullable Boolean dropExisting
)
{
Checks.checkOneNotNullOrEmpty(
Expand All @@ -1052,13 +1060,18 @@ public IndexIOConfig(
this.inputSource = inputSource;
this.inputFormat = inputFormat;
this.appendToExisting = appendToExisting == null ? DEFAULT_APPEND_TO_EXISTING : appendToExisting;
this.dropExisting = dropExisting == null ? DEFAULT_DROP_EXISTING : dropExisting;
if (this.dropExisting && this.appendToExisting) {
throw new IAE("Cannot both drop existing segments and append to existing segments. "
+ "Either dropExisting or appendToExisting should be set to false");
}
}

// old constructor for backward compatibility
@Deprecated
public IndexIOConfig(FirehoseFactory firehoseFactory, @Nullable Boolean appendToExisting)
public IndexIOConfig(FirehoseFactory firehoseFactory, @Nullable Boolean appendToExisting, @Nullable Boolean dropExisting)
{
this(firehoseFactory, null, null, appendToExisting);
this(firehoseFactory, null, null, appendToExisting, dropExisting);
}

@Nullable
Expand Down Expand Up @@ -1113,6 +1126,13 @@ public boolean isAppendToExisting()
{
return appendToExisting;
}

@Override
@JsonProperty
public boolean isDropExisting()
{
return dropExisting;
}
}

public static class IndexTuningConfig implements AppenderatorConfig
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ final SubTaskSpec<T> newTaskSpec(InputSplit split)
firehoseFactory,
inputSource,
ingestionSchema.getIOConfig().getInputFormat(),
ingestionSchema.getIOConfig().isAppendToExisting()
ingestionSchema.getIOConfig().isAppendToExisting(),
ingestionSchema.getIOConfig().isDropExisting()
),
ingestionSchema.getTuningConfig()
);
Expand Down
Loading

0 comments on commit d7f5293

Please sign in to comment.