Skip to content

Commit

Permalink
Batch ingestion replace (apache#12137)
Browse files Browse the repository at this point in the history
* Tombstone support for replace functionality

* A used segment interval is the interval of a current used segment that overlaps any of the input intervals for the spec

* Update compaction test to match replace behavior

* Adapt ITAutoCompactionTest to work with tombstones rather than dropping segments. Add support for tombstones in the broker.

* Style plus simple queriableindex test

* Add segment cache loader tombstone test

* Add more tests

* Add a method to the LogicalSegment to test whether it has any data

* Test filter with some empty logical segments

* Refactor more compaction/dropexisting tests

* Code coverage

* Support for all empty segments

* Skip tombstones when looking-up broker's timeline. Discard changes made to tool chest to avoid empty segments since they will no longer have empty segments after lookup because we are skipping over them.

* Fix null ptr when segment does not have a queriable index

* Add support for empty replace interval (all input data has been filtered out)

* Fixed coverage & style

* Find tombstone versions from lock versions

* Test failures & style

* Interner was making this fail since the two segments were consider equal due to their id's being equal

* Cleanup tombstone version code

* Force timeChunkLock whenever replace (i.e. dropExisting=true) is being used

* Reject replace spec when input intervals are empty

* Documentation

* Style and unit test

* Restore test code deleted by mistake

* Allocate forces TIME_CHUNK locking and uses lock versions. TombstoneShardSpec added.

* Unused imports. Dead code. Test coverage.

* Coverage.

* Prevent killer from throwing an exception for tombstones. This is the killer used in the peon for killing segments.

* Fix OmniKiller + more test coverage.

* Tombstones are now marked using a shard spec

* Drop a segment factory.json in the segment cache for tombstones

* Style

* Style + coverage

* style

* Add TombstoneLoadSpec.class to mapper in test

* Update core/src/main/java/org/apache/druid/segment/loading/TombstoneLoadSpec.java

Typo

Co-authored-by: Jonathan Wei <jon-wei@users.noreply.github.com>

* Update docs/configuration/index.md

Missing

Co-authored-by: Jonathan Wei <jon-wei@users.noreply.github.com>

* Typo

* Integrated replace with an existing test since the replace part was redundant and more importantly, the test file was very close or exceeding the 10 min default "no output" CI Travis threshold.

* Range does not work with multi-dim

Co-authored-by: Jonathan Wei <jon-wei@users.noreply.github.com>
  • Loading branch information
2 people authored and TSFenwick committed Apr 11, 2022
1 parent b25cf28 commit 5be3e5b
Show file tree
Hide file tree
Showing 70 changed files with 2,721 additions and 342 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.initialization;

import com.fasterxml.jackson.core.Version;
import com.fasterxml.jackson.databind.Module;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import org.apache.druid.segment.loading.TombstoneLoadSpec;

import java.util.List;

public class TombstoneDataStorageModule implements DruidModule
{
public static final String SCHEME = "tombstone";

@Override
public List<? extends Module> getJacksonModules()
{
return ImmutableList.of(
new Module()
{
@Override
public String getModuleName()
{
return "DruidTombstoneStorage-" + System.identityHashCode(this);
}

@Override
public Version version()
{
return Version.unknownVersion();
}

@Override
public void setupModule(SetupContext context)
{
context.registerSubtypes(TombstoneLoadSpec.class);
}
}
);
}

@Override
public void configure(Binder binder)
{

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.segment.loading;

import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.io.Files;
import org.apache.druid.initialization.TombstoneDataStorageModule;

import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;

@JsonTypeName(TombstoneDataStorageModule.SCHEME)
public class TombstoneLoadSpec implements LoadSpec
{
@Override
public LoadSpecResult loadSegment(File destDir) throws SegmentLoadingException
{
try {
return new LoadSpecResult(writeFactoryFile(destDir));
}
catch (IOException e) {
throw new SegmentLoadingException(
"Failed to create factory.json for tombstone in dir [%s]",
destDir.getAbsolutePath()
);

}
}

@VisibleForTesting
public static int writeFactoryFile(File destDir) throws IOException
{
final String factoryJSONString = "{\"type\":\"tombstoneSegmentFactory\"}";
final File factoryJson = new File(destDir, "factory.json");
factoryJson.createNewFile();
Files.write(factoryJSONString.getBytes(StandardCharsets.UTF_8), factoryJson);
return factoryJSONString.length();
}
}
16 changes: 16 additions & 0 deletions core/src/main/java/org/apache/druid/timeline/DataSegment.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@
@PublicApi
public class DataSegment implements Comparable<DataSegment>, Overshadowable<DataSegment>
{

public static final String TOMBSTONE_LOADSPEC_TYPE = "tombstone";

/*
* The difference between this class and org.apache.druid.segment.Segment is that this class contains the segment
* metadata only, while org.apache.druid.segment.Segment represents the actual body of segment data, queryable.
Expand Down Expand Up @@ -211,6 +214,7 @@ public DataSegment(
)
{
this.id = SegmentId.of(dataSource, interval, version, shardSpec);
// prune loadspec if needed
this.loadSpec = pruneSpecsHolder.pruneLoadSpec ? PRUNED_LOAD_SPEC : prepareLoadSpec(loadSpec);
// Deduplicating dimensions and metrics lists as a whole because they are very likely the same for the same
// dataSource
Expand Down Expand Up @@ -343,6 +347,11 @@ public SegmentId getId()
return id;
}

public boolean isTombstone()
{
return getShardSpec().getType().equals(ShardSpec.Type.TOMBSTONE);
}

@Override
public boolean overshadows(DataSegment other)
{
Expand Down Expand Up @@ -596,4 +605,11 @@ public DataSegment build()
);
}
}

@Override
public boolean hasData()
{
return !isTombstone();
}

}
17 changes: 17 additions & 0 deletions core/src/main/java/org/apache/druid/timeline/LogicalSegment.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,24 @@
@PublicApi
public interface LogicalSegment
{

// With the addition of tombstones the broker and other places may need to be aware of
// them...
// The reason this is an ENUM and not simply boolean is that when we add awareness to the broker
// that tombstones should have been loaded, but they are not we might want to introduce a third value:
// MISSING
enum Status
{
READY, /* It has data and it is ready */
EMPTY /* It has no data */
}

Interval getInterval();

Interval getTrueInterval();

default Status getStatus()
{
return Status.READY;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,9 @@ default boolean containsRootPartition(T other)
* {@link VersionedIntervalTimeline}.
*/
short getAtomicUpdateGroupSize();

default boolean hasData()
{
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,4 +106,13 @@ public String toString()
", object=" + object +
'}';
}

@Override
public LogicalSegment.Status getStatus()
{
if (object.hasData()) {
return Status.READY;
}
return Status.EMPTY;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,20 @@ public static VersionedIntervalTimeline<String, DataSegment> forSegments(Iterato

private final Comparator<? super VersionType> versionComparator;

// Set this to true if the client needs to skip tombstones upon lookup (like the broker)
private boolean skipObjectsWithNoData = false;

public VersionedIntervalTimeline(Comparator<? super VersionType> versionComparator)
{
this.versionComparator = versionComparator;
}

public VersionedIntervalTimeline(Comparator<? super VersionType> versionComparator, boolean skipObjectsWithNoData)
{
this(versionComparator);
this.skipObjectsWithNoData = skipObjectsWithNoData;
}

public static void addSegments(
VersionedIntervalTimeline<String, DataSegment> timeline,
Iterator<DataSegment> segments
Expand Down Expand Up @@ -743,7 +752,9 @@ private List<TimelineObjectHolder<VersionType, ObjectType>> lookup(Interval inte
Interval timelineInterval = entry.getKey();
TimelineEntry val = entry.getValue();

if (timelineInterval.overlaps(interval)) {
// exclude empty partition holders (i.e. tombstones) since they do not add value
// for higher level code...they have no data rows...
if ((!skipObjectsWithNoData || val.partitionHolder.hasData()) && timelineInterval.overlaps(interval)) {
retVal.add(
new TimelineObjectHolder<>(
timelineInterval,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,4 +161,16 @@ public String toString()
"overshadowableManager=" + overshadowableManager +
'}';
}

public boolean hasData()
{
// it has data as long as one of the payloads has data, otherwise it does not
for (T payload : payloads()) {
if (payload.hasData()) {
return true;
}
}
return false;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes({
@JsonSubTypes.Type(name = ShardSpec.Type.NONE, value = NoneShardSpec.class),
@JsonSubTypes.Type(name = ShardSpec.Type.TOMBSTONE, value = TombstoneShardSpec.class),
@JsonSubTypes.Type(name = ShardSpec.Type.SINGLE, value = SingleDimensionShardSpec.class),
@JsonSubTypes.Type(name = ShardSpec.Type.RANGE, value = DimensionRangeShardSpec.class),
@JsonSubTypes.Type(name = ShardSpec.Type.LINEAR, value = LinearShardSpec.class),
Expand Down Expand Up @@ -168,5 +169,7 @@ interface Type
String BUCKET_HASH = "bucket_hash";
String BUCKET_SINGLE_DIM = "bucket_single_dim";
String BUCKET_RANGE = "bucket_range";

String TOMBSTONE = "tombstone";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.timeline.partition;

import com.google.common.base.Objects;

public class TombstonePartitionedChunk<T> implements PartitionChunk<T>
{
private final T object;

public static <T> TombstonePartitionedChunk<T> make(T obj)
{
return new TombstonePartitionedChunk<T>(obj);
}

public TombstonePartitionedChunk(T object)
{
this.object = object;
}

@Override
public T getObject()
{
return object;
}

@Override
public boolean abuts(final PartitionChunk<T> other)
{
return false;
}

@Override
public boolean isStart()
{
return true;
}

@Override
public boolean isEnd()
{
return true;
}

@Override
public int getChunkNumber()
{
return 0;
}

@Override
public int compareTo(PartitionChunk<T> other)
{
if (other instanceof TombstonePartitionedChunk) {
return 0;
} else {
throw new IllegalArgumentException("Cannot compare against something that is not a TombstonePartitionedChunk.");
}
}

@Override
@SuppressWarnings("unchecked")
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

return compareTo((TombstonePartitionedChunk<T>) o) == 0;
}

@Override
public int hashCode()
{
return Objects.hashCode(0);
}

@Override
public String toString()
{
return "TombstonePartitionedChunk{" +
"chunkNumber=" + 0 +
", chunks=" + 1 +
", object=" + object +
'}';
}
}
Loading

0 comments on commit 5be3e5b

Please sign in to comment.