Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an option for ingestion task to drop (mark unused) all existing segments that are contained by interval in the ingestionSpec #11025

Merged
merged 17 commits into from
Apr 1, 2021
Merged
4 changes: 3 additions & 1 deletion docs/ingestion/compaction.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ In cases where you require more control over compaction, you can manually submit
See [Setting up a manual compaction task](#setting-up-manual-compaction) for more about manual compaction tasks.

## Data handling with compaction
During compaction, Druid overwrites the original set of segments with the compacted set. During compaction Druid locks the segments for the time interval being compacted to ensure data consistency. By default, compaction tasks do not modify the underlying data. You can configure the compaction task to change the query granularity or add or remove dimensions in the compaction task. This means that the only changes to query results should be the result of intentional, not automatic, changes.
During compaction, Druid overwrites the original set of segments with the compacted set. Druid also locks the segments for the time interval being compacted to ensure data consistency. By default, compaction tasks do not modify the underlying data. You can configure the compaction task to change the query granularity or add or remove dimensions in the compaction task. This means that the only changes to query results should be the result of intentional, not automatic, changes.

For compaction tasks, `dropExisting` for underlying ingestion tasks is "true". This means that Druid can drop (mark unused) all the existing segments fully within interval for the compaction task. For an example of why this is important, see the suggestion for reindexing with finer granularity under [Implementation considerations](native-batch.md#implementation-considerations).

If an ingestion task needs to write data to a segment for a time interval locked for compaction, by default the ingestion task supersedes the compaction task and the compaction task fails without finishing. For manual compaction tasks you can adjust the input spec interval to avoid conflicts between ingestion and compaction. For automatic compaction, you can set the `skipOffsetFromLatest` key to adjustment the auto compaction starting point from the current time to reduce the chance of conflicts between ingestion and compaction. See [Compaction dynamic configuration](../configuration/index.md#compaction-dynamic-configuration) for more information. Another option is to set the compaction task to higher priority than the ingestion task.

Expand Down
35 changes: 32 additions & 3 deletions docs/ingestion/native-batch.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ The supported compression formats for native batch ingestion are `bz2`, `gz`, `x

- [`static-cloudfiles`](../development/extensions-contrib/cloudfiles.md#firehose)

You may want to consider the below things:
### Implementation considerations

- You may want to control the amount of input data each worker task processes. This can be
controlled using different configurations depending on the phase in parallel ingestion (see [`partitionsSpec`](#partitionsspec) for more details).
Expand All @@ -89,7 +89,33 @@ You may want to consider the below things:
data in segments where it actively adds data: if there are segments in your `granularitySpec`'s intervals that have
no data written by this task, they will be left alone. If any existing segments partially overlap with the
`granularitySpec`'s intervals, the portion of those segments outside the new segments' intervals will still be visible.

- You can set `dropExisting` flag in the `ioConfig` to true if you want the ingestion task to drop all existing segments that
start and end within your `granularitySpec`'s intervals. This applies whether or not the new data covers all existing segments.
`dropExisting` only applies when `appendToExisting` is false and the `granularitySpec` contains an `interval`.

The following examples demonstrate when to set the `dropExisting` property to true in the `ioConfig`:

- Example 1: Consider an existing segment with an interval of 2020-01-01 to 2021-01-01 and YEAR segmentGranularity. You want to
overwrite the whole interval of 2020-01-01 to 2021-01-01 with new data using the finer segmentGranularity of MONTH.
If the replacement data does not have a record within every months from 2020-01-01 to 2021-01-01
Druid cannot drop the original YEAR segment even if it does include all the replacement. Set `dropExisting` to true in this case to drop
the original segment at year `segmentGranularity` since you no longer need it.
- Example 2: Consider the case where you want to re-ingest or overwrite a datasource and the new data does not contains some time intervals that exist
in the datasource. For example, a datasource contains the following data at MONTH segmentGranularity:
January: 1 record
February: 10 records
March: 10 records
You want to re-ingest and overwrite with new data as follows:
January: 0 records
February: 10 records
March: 9 records
Unless you set `dropExisting` to true, the result after ingestion with overwrite using the same MONTH segmentGranularity would be:
January: 1 record
February: 10 records
March: 9 records
This is incorrect since the new data has 0 records for January. Setting `dropExisting` to true to drop the original
segment for January that is not needed since the newly ingested data has no records for January.

### Task syntax

A sample task is shown below:
Expand Down Expand Up @@ -193,6 +219,7 @@ that range if there's some stray data with unexpected timestamps.
|type|The task type, this should always be `index_parallel`.|none|yes|
|inputFormat|[`inputFormat`](./data-formats.md#input-format) to specify how to parse input data.|none|yes|
|appendToExisting|Creates segments as additional shards of the latest version, effectively appending to the segment set instead of replacing it. This means that you can append new segments to any datasource regardless of its original partitioning scheme. You must use the `dynamic` partitioning type for the appended segments. If you specify a different partitioning type, the task fails with an error.|false|no|
|dropExisting|If `true` and `appendToExisting` is `false` and the `granularitySpec` contains an`interval`, then the ingestion task drops (mark unused) all existing segments fully contained by the specified `interval` when the task publishes new segments. If ingestion fails, Druid does not drop or mark unused any segments. In the case of misconfiguration where either `appendToExisting` is `true` or `interval` is not specified in `granularitySpec`, Druid does not drop any segments even if `dropExisting` is `true`.|false|no|

### `tuningConfig`

Expand Down Expand Up @@ -538,7 +565,8 @@ An example of the result is
"l_comment"
]
},
"appendToExisting": false
"appendToExisting": false,
"dropExisting": false
},
"tuningConfig": {
"type": "index_parallel",
Expand Down Expand Up @@ -719,6 +747,7 @@ that range if there's some stray data with unexpected timestamps.
|type|The task type, this should always be "index".|none|yes|
|inputFormat|[`inputFormat`](./data-formats.md#input-format) to specify how to parse input data.|none|yes|
|appendToExisting|Creates segments as additional shards of the latest version, effectively appending to the segment set instead of replacing it. This means that you can append new segments to any datasource regardless of its original partitioning scheme. You must use the `dynamic` partitioning type for the appended segments. If you specify a different partitioning type, the task fails with an error.|false|no|
|dropExisting|If `true` and `appendToExisting` is `false` and the `granularitySpec` contains an`interval`, then the ingestion task drops (mark unused) all existing segments fully contained by the specified `interval` when the task publishes new segments. If ingestion fails, Druid does not drop or mark unused any segments. In the case of misconfiguration where either `appendToExisting` is `true` or `interval` is not specified in `granularitySpec`, Druid does not drop any segments even if `dropExisting` is `true`.|false|no|

### `tuningConfig`

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,22 @@
*/
public class SegmentTransactionalInsertAction implements TaskAction<SegmentPublishResult>
{
/**
* Set of segments that was fully overshadowed by new segments, {@link SegmentTransactionalInsertAction#segments}
*/
@Nullable
private final Set<DataSegment> segmentsToBeOverwritten;
/**
* Set of segments to be inserted into metadata storage
*/
private final Set<DataSegment> segments;
/**
* Set of segments to be dropped (mark unused) when new segments, {@link SegmentTransactionalInsertAction#segments},
* are inserted into metadata storage.
*/
@Nullable
private final Set<DataSegment> segmentsToBeDropped;

@Nullable
private final DataSourceMetadata startMetadata;
@Nullable
Expand All @@ -66,10 +79,11 @@ public class SegmentTransactionalInsertAction implements TaskAction<SegmentPubli

public static SegmentTransactionalInsertAction overwriteAction(
@Nullable Set<DataSegment> segmentsToBeOverwritten,
@Nullable Set<DataSegment> segmentsToBeDropped,
Set<DataSegment> segmentsToPublish
)
{
return new SegmentTransactionalInsertAction(segmentsToBeOverwritten, segmentsToPublish, null, null, null);
return new SegmentTransactionalInsertAction(segmentsToBeOverwritten, segmentsToBeDropped, segmentsToPublish, null, null, null);
}

public static SegmentTransactionalInsertAction appendAction(
Expand All @@ -78,7 +92,7 @@ public static SegmentTransactionalInsertAction appendAction(
@Nullable DataSourceMetadata endMetadata
)
{
return new SegmentTransactionalInsertAction(null, segments, startMetadata, endMetadata, null);
return new SegmentTransactionalInsertAction(null, null, segments, startMetadata, endMetadata, null);
}

public static SegmentTransactionalInsertAction commitMetadataOnlyAction(
Expand All @@ -87,19 +101,21 @@ public static SegmentTransactionalInsertAction commitMetadataOnlyAction(
DataSourceMetadata endMetadata
)
{
return new SegmentTransactionalInsertAction(null, null, startMetadata, endMetadata, dataSource);
return new SegmentTransactionalInsertAction(null, null, null, startMetadata, endMetadata, dataSource);
}

@JsonCreator
private SegmentTransactionalInsertAction(
@JsonProperty("segmentsToBeOverwritten") @Nullable Set<DataSegment> segmentsToBeOverwritten,
@JsonProperty("segmentsToBeDropped") @Nullable Set<DataSegment> segmentsToBeDropped,
@JsonProperty("segments") @Nullable Set<DataSegment> segments,
@JsonProperty("startMetadata") @Nullable DataSourceMetadata startMetadata,
@JsonProperty("endMetadata") @Nullable DataSourceMetadata endMetadata,
@JsonProperty("dataSource") @Nullable String dataSource
)
{
this.segmentsToBeOverwritten = segmentsToBeOverwritten;
this.segmentsToBeDropped = segmentsToBeDropped;
this.segments = segments == null ? ImmutableSet.of() : ImmutableSet.copyOf(segments);
this.startMetadata = startMetadata;
this.endMetadata = endMetadata;
Expand All @@ -113,6 +129,13 @@ public Set<DataSegment> getSegmentsToBeOverwritten()
return segmentsToBeOverwritten;
}

@JsonProperty
@Nullable
public Set<DataSegment> getSegmentsToBeDropped()
{
return segmentsToBeDropped;
}

@JsonProperty
public Set<DataSegment> getSegments()
{
Expand Down Expand Up @@ -176,6 +199,10 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox)
if (segmentsToBeOverwritten != null) {
allSegments.addAll(segmentsToBeOverwritten);
}
if (segmentsToBeDropped != null) {
allSegments.addAll(segmentsToBeDropped);
}

TaskLocks.checkLockCoversSegments(task, toolbox.getTaskLockbox(), allSegments);

if (segmentsToBeOverwritten != null && !segmentsToBeOverwritten.isEmpty()) {
Expand All @@ -194,6 +221,7 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox)
.onValidLocks(
() -> toolbox.getIndexerMetadataStorageCoordinator().announceHistoricalSegments(
segments,
segmentsToBeDropped,
startMetadata,
endMetadata
)
Expand Down Expand Up @@ -305,7 +333,8 @@ public String toString()
", segments=" + SegmentUtils.commaSeparatedIdentifiers(segments) +
", startMetadata=" + startMetadata +
", endMetadata=" + endMetadata +
", dataSource=" + dataSource +
", dataSource='" + dataSource + '\'' +
", segmentsToBeDropped=" + SegmentUtils.commaSeparatedIdentifiers(segmentsToBeDropped) +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -481,6 +482,28 @@ public static Function<Set<DataSegment>, Set<DataSegment>> compactionStateAnnota
}
}

public static Set<DataSegment> getUsedSegmentsWithinInterval(
TaskToolbox toolbox,
String dataSource,
List<Interval> intervals
) throws IOException
{
Set<DataSegment> segmentsFoundForDrop = new HashSet<>();
List<Interval> condensedIntervals = JodaUtils.condenseIntervals(intervals);
if (!intervals.isEmpty()) {
Collection<DataSegment> usedSegment = toolbox.getTaskActionClient().submit(new RetrieveUsedSegmentsAction(dataSource, null, condensedIntervals, Segments.ONLY_VISIBLE));
for (DataSegment segment : usedSegment) {
for (Interval interval : condensedIntervals) {
if (interval.contains(segment.getInterval())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would think that the segments returned from the RetrieveUsedSegmentsAction are gaurenteed to be within the interval specified, is that not the case? If so, do we need to check again that the segment is in the interval?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is not the case. For example, if you have a segment with interval 2000-01-01/2001-01-01 and your interval specified is 2000-04-28/2000-04-29. Then if the above segment has data for the day 2000-04-28/2000-04-29, it would be returned by RetrieveUsedSegmentsAction. However, we cannot drop this segment since the interval specified is 2000-04-28/2000-04-29. We can only drop segments that starts and ends within the interval specified.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually let me make the doc a little clearer than we are only dropping segments that starts and ends within the interval specified in granularitySpec

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh I see ok, makes sense.

segmentsFoundForDrop.add(segment);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could add a continue here once a segment is found for efficiency

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. Btw I think it should be a break instead of a continue. This is to break out of the for (Interval interval : condensedIntervals) { when we confirmed that the segment is within one of the interval specified in granularitySpec

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, you're right, should be a break

break;
}
}
}
}
return segmentsFoundForDrop;
}

@Nullable
static Granularity findGranularityFromSegments(List<DataSegment> segments)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,11 +325,17 @@ public TaskStatus run(final TaskToolbox toolbox)
int sequenceNumber = 0;
String sequenceName = makeSequenceName(getId(), sequenceNumber);

final TransactionalSegmentPublisher publisher = (mustBeNullOrEmptySegments, segments, commitMetadata) -> {
if (mustBeNullOrEmptySegments != null && !mustBeNullOrEmptySegments.isEmpty()) {
final TransactionalSegmentPublisher publisher = (mustBeNullOrEmptyOverwriteSegments, mustBeNullOrEmptyDropSegments, segments, commitMetadata) -> {
if (mustBeNullOrEmptyOverwriteSegments != null && !mustBeNullOrEmptyOverwriteSegments.isEmpty()) {
throw new ISE(
"Stream ingestion task unexpectedly attempted to overwrite segments: %s",
SegmentUtils.commaSeparatedIdentifiers(mustBeNullOrEmptySegments)
SegmentUtils.commaSeparatedIdentifiers(mustBeNullOrEmptyOverwriteSegments)
);
}
if (mustBeNullOrEmptyDropSegments != null && !mustBeNullOrEmptyDropSegments.isEmpty()) {
throw new ISE(
"Stream ingestion task unexpectedly attempted to drop segments: %s",
SegmentUtils.commaSeparatedIdentifiers(mustBeNullOrEmptyDropSegments)
);
}
final SegmentTransactionalInsertAction action = SegmentTransactionalInsertAction.appendAction(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -633,7 +633,8 @@ private static ParallelIndexIOConfig createIoConfig(
toolbox.getConfig()
),
null,
false
false,
true
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -854,9 +854,14 @@ private TaskStatus generateAndPublishSegments(
throw new UOE("[%s] secondary partition type is not supported", partitionsSpec.getType());
}

final TransactionalSegmentPublisher publisher = (segmentsToBeOverwritten, segmentsToPublish, commitMetadata) ->
Set<DataSegment> segmentsFoundForDrop = null;
if (ingestionSchema.getIOConfig().isDropExisting()) {
segmentsFoundForDrop = getUsedSegmentsWithinInterval(toolbox, getDataSource(), ingestionSchema.getDataSchema().getGranularitySpec().inputIntervals());
}

final TransactionalSegmentPublisher publisher = (segmentsToBeOverwritten, segmentsToDrop, segmentsToPublish, commitMetadata) ->
toolbox.getTaskActionClient()
.submit(SegmentTransactionalInsertAction.overwriteAction(segmentsToBeOverwritten, segmentsToPublish));
.submit(SegmentTransactionalInsertAction.overwriteAction(segmentsToBeOverwritten, segmentsToDrop, segmentsToPublish));

String effectiveId = getContextValue(CompactionTask.CTX_KEY_APPENDERATOR_TRACKING_TASK_ID, null);
if (effectiveId == null) {
Expand Down Expand Up @@ -910,7 +915,7 @@ private TaskStatus generateAndPublishSegments(

// Probably we can publish atomicUpdateGroup along with segments.
final SegmentsAndCommitMetadata published =
awaitPublish(driver.publishAll(inputSegments, publisher, annotateFunction), pushTimeout);
awaitPublish(driver.publishAll(inputSegments, segmentsFoundForDrop, publisher, annotateFunction), pushTimeout);
appenderator.close();

ingestionState = IngestionState.COMPLETED;
Expand Down Expand Up @@ -1028,18 +1033,21 @@ public IndexTuningConfig getTuningConfig()
public static class IndexIOConfig implements BatchIOConfig
{
private static final boolean DEFAULT_APPEND_TO_EXISTING = false;
private static final boolean DEFAULT_DROP_EXISTING = false;

private final FirehoseFactory firehoseFactory;
private final InputSource inputSource;
private final InputFormat inputFormat;
private final boolean appendToExisting;
private final boolean dropExisting;

@JsonCreator
public IndexIOConfig(
@Deprecated @JsonProperty("firehose") @Nullable FirehoseFactory firehoseFactory,
@JsonProperty("inputSource") @Nullable InputSource inputSource,
@JsonProperty("inputFormat") @Nullable InputFormat inputFormat,
@JsonProperty("appendToExisting") @Nullable Boolean appendToExisting
@JsonProperty("appendToExisting") @Nullable Boolean appendToExisting,
@JsonProperty("dropExisting") @Nullable Boolean dropExisting
)
{
Checks.checkOneNotNullOrEmpty(
Expand All @@ -1052,13 +1060,18 @@ public IndexIOConfig(
this.inputSource = inputSource;
this.inputFormat = inputFormat;
this.appendToExisting = appendToExisting == null ? DEFAULT_APPEND_TO_EXISTING : appendToExisting;
this.dropExisting = dropExisting == null ? DEFAULT_DROP_EXISTING : dropExisting;
if (this.dropExisting && this.appendToExisting) {
throw new IAE("Cannot both drop existing segments and append to existing segments. "
+ "Either dropExisting or appendToExisting should be set to false");
}
}

// old constructor for backward compatibility
@Deprecated
public IndexIOConfig(FirehoseFactory firehoseFactory, @Nullable Boolean appendToExisting)
public IndexIOConfig(FirehoseFactory firehoseFactory, @Nullable Boolean appendToExisting, @Nullable Boolean dropExisting)
{
this(firehoseFactory, null, null, appendToExisting);
this(firehoseFactory, null, null, appendToExisting, dropExisting);
}

@Nullable
Expand Down Expand Up @@ -1113,6 +1126,13 @@ public boolean isAppendToExisting()
{
return appendToExisting;
}

@Override
@JsonProperty
public boolean isDropExisting()
{
return dropExisting;
}
}

public static class IndexTuningConfig implements AppenderatorConfig
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ final SubTaskSpec<T> newTaskSpec(InputSplit split)
firehoseFactory,
inputSource,
ingestionSchema.getIOConfig().getInputFormat(),
ingestionSchema.getIOConfig().isAppendToExisting()
ingestionSchema.getIOConfig().isAppendToExisting(),
ingestionSchema.getIOConfig().isDropExisting()
),
ingestionSchema.getTuningConfig()
);
Expand Down
Loading