Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch ingestion replace #12137

Merged
merged 46 commits into from
Mar 9, 2022
Merged
Show file tree
Hide file tree
Changes from 40 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
682f957
Tombstone support for replace functionality
Dec 21, 2021
2be134d
A used segment interval is the interval of a current used segment tha…
Jan 11, 2022
26177b6
Update compaction test to match replace behavior
Jan 12, 2022
3b99a81
Adapt ITAutoCompactionTest to work with tombstones rather than droppi…
Jan 14, 2022
ac44e56
Style plus simple queriableindex test
Jan 14, 2022
86fd028
Add segment cache loader tombstone test
Jan 15, 2022
4855929
Add more tests
Jan 15, 2022
f2e52ad
Add a method to the LogicalSegment to test whether it has any data
Jan 18, 2022
84ea1a7
Test filter with some empty logical segments
Jan 18, 2022
b66d8a1
Refactor more compaction/dropexisting tests
Jan 19, 2022
4d5df0a
Code coverage
Jan 19, 2022
b4a935a
Support for all empty segments
Jan 20, 2022
6481003
Skip tombstones when looking-up broker's timeline. Discard changes ma…
Jan 20, 2022
6141c18
Fix null ptr when segment does not have a queriable index
Jan 21, 2022
5428675
Add support for empty replace interval (all input data has been filte…
Jan 25, 2022
3352fb6
Fixed coverage & style
Jan 27, 2022
173c2e1
Find tombstone versions from lock versions
Jan 28, 2022
d77a0c4
Test failures & style
Jan 28, 2022
46cc72e
Interner was making this fail since the two segments were consider eq…
Jan 28, 2022
a685cc6
Cleanup tombstone version code
Jan 28, 2022
d0fbef9
Force timeChunkLock whenever replace (i.e. dropExisting=true) is bein…
Feb 1, 2022
a7ec8dc
Reject replace spec when input intervals are empty
Feb 2, 2022
faf2720
Documentation
Feb 3, 2022
4fe9548
Style and unit test
Feb 3, 2022
404072c
Restore test code deleted by mistake
Feb 4, 2022
c6d0271
Allocate forces TIME_CHUNK locking and uses lock versions. TombstoneS…
Feb 10, 2022
617eb0b
Unused imports. Dead code. Test coverage.
Feb 21, 2022
a977f21
Coverage.
Feb 22, 2022
151d674
Prevent killer from throwing an exception for tombstones. This is the…
Feb 23, 2022
3f5d1dc
Fix OmniKiller + more test coverage.
Feb 23, 2022
c62d36b
Tombstones are now marked using a shard spec
Feb 23, 2022
0ad63a8
Drop a segment factory.json in the segment cache for tombstones
Feb 25, 2022
8680fe4
Merge branch 'master' into batch_ingestion_replace
Feb 27, 2022
55c0a38
Style
Feb 28, 2022
9f7cc10
Style + coverage
Feb 28, 2022
f4c3440
Merge branch 'temp' into batch_ingestion_replace
Feb 28, 2022
15f9db3
style
Feb 28, 2022
1f6a80e
Add TombstoneLoadSpec.class to mapper in test
Mar 1, 2022
db30f5f
Merge branch 'master' into batch_ingestion_replace
Mar 5, 2022
ca1d5a2
Merge branch 'master' into batch_ingestion_replace
Mar 5, 2022
36d638c
Merge branch 'master' into batch_ingestion_replace
Mar 8, 2022
c14cdfc
Update core/src/main/java/org/apache/druid/segment/loading/TombstoneL…
Mar 8, 2022
fcd2833
Update docs/configuration/index.md
Mar 8, 2022
d6a0179
Typo
Mar 8, 2022
c966bdb
Integrated replace with an existing test since the replace part was r…
Mar 8, 2022
933a3fc
Range does not work with multi-dim
Mar 8, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.initialization;

import com.fasterxml.jackson.core.Version;
import com.fasterxml.jackson.databind.Module;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import org.apache.druid.segment.loading.TombstoneLoadSpec;

import java.util.List;

public class TombstoneDataStorageModule implements DruidModule
{
public static final String SCHEME = "tombstone";

@Override
public List<? extends Module> getJacksonModules()
{
return ImmutableList.of(
new Module()
{
@Override
public String getModuleName()
{
return "DruidTombstoneStorage-" + System.identityHashCode(this);
}

@Override
public Version version()
{
return Version.unknownVersion();
}

@Override
public void setupModule(SetupContext context)
{
context.registerSubtypes(TombstoneLoadSpec.class);
}
}
);
}

@Override
public void configure(Binder binder)
{

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.segment.loading;

import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.io.Files;
import org.apache.druid.initialization.TombstoneDataStorageModule;

import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;

@JsonTypeName(TombstoneDataStorageModule.SCHEME)
public class TombstoneLoadSpec implements LoadSpec
{
@Override
public LoadSpecResult loadSegment(File destDir) throws SegmentLoadingException
{
try {
return new LoadSpecResult(witeFactoryFile(destDir));
}
catch (IOException e) {
throw new SegmentLoadingException(
"Failed to create factory.json for tombstone in dir [%s]",
destDir.getAbsolutePath()
);

}
}

@VisibleForTesting
public static int witeFactoryFile(File destDir) throws IOException
loquisgon marked this conversation as resolved.
Show resolved Hide resolved
{
final String factoryJSONString = "{\"type\":\"tombstoneSegmentFactory\"}";
final File factoryJson = new File(destDir, "factory.json");
factoryJson.createNewFile();
Files.write(factoryJSONString.getBytes(StandardCharsets.UTF_8), factoryJson);
return factoryJSONString.length();
}
}
16 changes: 16 additions & 0 deletions core/src/main/java/org/apache/druid/timeline/DataSegment.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@
@PublicApi
public class DataSegment implements Comparable<DataSegment>, Overshadowable<DataSegment>
{

public static final String TOMBSTONE_LOADSPEC_TYPE = "tombstone";

/*
* The difference between this class and org.apache.druid.segment.Segment is that this class contains the segment
* metadata only, while org.apache.druid.segment.Segment represents the actual body of segment data, queryable.
Expand Down Expand Up @@ -211,6 +214,7 @@ public DataSegment(
)
{
this.id = SegmentId.of(dataSource, interval, version, shardSpec);
// prune loadspec if needed
this.loadSpec = pruneSpecsHolder.pruneLoadSpec ? PRUNED_LOAD_SPEC : prepareLoadSpec(loadSpec);
// Deduplicating dimensions and metrics lists as a whole because they are very likely the same for the same
// dataSource
Expand Down Expand Up @@ -343,6 +347,11 @@ public SegmentId getId()
return id;
}

public boolean isTombstone()
{
return getShardSpec().getType().equals(ShardSpec.Type.TOMBSTONE);
}

@Override
public boolean overshadows(DataSegment other)
{
Expand Down Expand Up @@ -596,4 +605,11 @@ public DataSegment build()
);
}
}

@Override
public boolean hasData()
{
return !isTombstone();
}

}
17 changes: 17 additions & 0 deletions core/src/main/java/org/apache/druid/timeline/LogicalSegment.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,24 @@
@PublicApi
public interface LogicalSegment
{

// With the addition of tombstones the broker and other places may need to be aware of
// them...
// The reason this is an ENUM and not simply boolean is that when we add awareness to the broker
// that tombstones should have been loaded, but they are not we might want to introduce a third value:
// MISSING
enum Status
{
READY, /* It has data and it is ready */
EMPTY /* It has no data */
}

Interval getInterval();

Interval getTrueInterval();

default Status getStatus()
{
return Status.READY;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,9 @@ default boolean containsRootPartition(T other)
* {@link VersionedIntervalTimeline}.
*/
short getAtomicUpdateGroupSize();

default boolean hasData()
{
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,4 +106,13 @@ public String toString()
", object=" + object +
'}';
}

@Override
public LogicalSegment.Status getStatus()
{
if (object.hasData()) {
return Status.READY;
}
return Status.EMPTY;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,20 @@ public static VersionedIntervalTimeline<String, DataSegment> forSegments(Iterato

private final Comparator<? super VersionType> versionComparator;

// Set this to true if the client needs to skip tombstones upon lookup (like the broker)
private boolean skipObjectsWithNoData = false;

public VersionedIntervalTimeline(Comparator<? super VersionType> versionComparator)
{
this.versionComparator = versionComparator;
}

public VersionedIntervalTimeline(Comparator<? super VersionType> versionComparator, boolean skipObjectsWithNoData)
{
this(versionComparator);
this.skipObjectsWithNoData = skipObjectsWithNoData;
}

public static void addSegments(
VersionedIntervalTimeline<String, DataSegment> timeline,
Iterator<DataSegment> segments
Expand Down Expand Up @@ -743,7 +752,9 @@ private List<TimelineObjectHolder<VersionType, ObjectType>> lookup(Interval inte
Interval timelineInterval = entry.getKey();
TimelineEntry val = entry.getValue();

if (timelineInterval.overlaps(interval)) {
// exclude empty partition holders (i.e. tombstones) since they do not add value
// for higher level code...they have no data rows...
if ((!skipObjectsWithNoData || val.partitionHolder.hasData()) && timelineInterval.overlaps(interval)) {
retVal.add(
new TimelineObjectHolder<>(
timelineInterval,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,4 +161,16 @@ public String toString()
"overshadowableManager=" + overshadowableManager +
'}';
}

public boolean hasData()
{
// it has data as long as one of the payloads has data, otherwise it does not
for (T payload : payloads()) {
if (payload.hasData()) {
return true;
}
}
return false;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes({
@JsonSubTypes.Type(name = ShardSpec.Type.NONE, value = NoneShardSpec.class),
@JsonSubTypes.Type(name = ShardSpec.Type.TOMBSTONE, value = TombstoneShardSpec.class),
@JsonSubTypes.Type(name = ShardSpec.Type.SINGLE, value = SingleDimensionShardSpec.class),
@JsonSubTypes.Type(name = ShardSpec.Type.RANGE, value = DimensionRangeShardSpec.class),
@JsonSubTypes.Type(name = ShardSpec.Type.LINEAR, value = LinearShardSpec.class),
Expand Down Expand Up @@ -168,5 +169,7 @@ interface Type
String BUCKET_HASH = "bucket_hash";
String BUCKET_SINGLE_DIM = "bucket_single_dim";
String BUCKET_RANGE = "bucket_range";

String TOMBSTONE = "tombstone";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.timeline.partition;

import com.google.common.base.Objects;

public class TombstonePartitionedChunk<T> implements PartitionChunk<T>
{
private final T object;

public static <T> TombstonePartitionedChunk<T> make(T obj)
{
return new TombstonePartitionedChunk<T>(obj);
}

public TombstonePartitionedChunk(T object)
{
this.object = object;
}

@Override
public T getObject()
{
return object;
}

@Override
public boolean abuts(final PartitionChunk<T> other)
{
return false;
}

@Override
public boolean isStart()
{
return true;
}

@Override
public boolean isEnd()
{
return true;
}

@Override
public int getChunkNumber()
{
return 0;
}

@Override
public int compareTo(PartitionChunk<T> other)
{
if (other instanceof TombstonePartitionedChunk) {
return 0;
} else {
throw new IllegalArgumentException("Cannot compare against something that is not a TombstonePartitionedChunk.");
}
}

@Override
@SuppressWarnings("unchecked")
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

return compareTo((TombstonePartitionedChunk<T>) o) == 0;
}

@Override
public int hashCode()
{
return Objects.hashCode(0);
}

@Override
public String toString()
{
return "TombstonePartitionedChunk{" +
"chunkNumber=" + 0 +
", chunks=" + 1 +
", object=" + object +
'}';
}
}
Loading