Skip to content

Commit

Permalink
Skip tombstone segment refresh in metadata cache (apache#17025)
Browse files Browse the repository at this point in the history
This PR apache#16890 introduced a change to skip adding tombstone segments to the cache.
It turns out that as a side effect tombstone segments appear unavailable in the console. This happens because availability of a segment in Broker is determined from the metadata cache.

The fix is to keep the segment in the metadata cache but skip them from refresh.

This doesn't affect any functionality as metadata query for tombstone returns empty causing continuous refresh of those segments.
  • Loading branch information
findingrish committed Sep 19, 2024
1 parent 11727af commit d7e3e75
Show file tree
Hide file tree
Showing 4 changed files with 241 additions and 129 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,13 @@
* <p>
* This class has an abstract method {@link #refresh(Set, Set)} which the child class must override
* with the logic to build and cache table schema.
* <p>
* Note on handling tombstone segments:
* These segments lack data or column information.
* Additionally, segment metadata queries, which are not yet implemented for tombstone segments
* (see: https://github.com/apache/druid/pull/12137) do not provide metadata for tombstones,
* leading to indefinite refresh attempts for these segments.
* Therefore, these segments are never added to the set of segments being refreshed.
*
* @param <T> The type of information associated with the data source, which must extend {@link DataSourceInformation}.
*/
Expand Down Expand Up @@ -478,13 +485,6 @@ public int getTotalSegments()
@VisibleForTesting
public void addSegment(final DruidServerMetadata server, final DataSegment segment)
{
// Skip adding tombstone segment to the cache. These segments lack data or column information.
// Additionally, segment metadata queries, which are not yet implemented for tombstone segments
// (see: https://github.com/apache/druid/pull/12137) do not provide metadata for tombstones,
// leading to indefinite refresh attempts for these segments.
if (segment.isTombstone()) {
return;
}
// Get lock first so that we won't wait in ConcurrentMap.compute().
synchronized (lock) {
// someday we could hypothetically remove broker special casing, whenever BrokerServerView supports tracking
Expand All @@ -511,7 +511,11 @@ public void addSegment(final DruidServerMetadata server, final DataSegment segme
segmentMetadata = AvailableSegmentMetadata
.builder(segment, isRealtime, ImmutableSet.of(server), null, DEFAULT_NUM_ROWS) // Added without needing a refresh
.build();
markSegmentAsNeedRefresh(segment.getId());
if (segment.isTombstone()) {
log.debug("Skipping refresh for tombstone segment.");
} else {
markSegmentAsNeedRefresh(segment.getId());
}
if (!server.isSegmentReplicationTarget()) {
log.debug("Added new mutable segment [%s].", segment.getId());
markSegmentAsMutable(segment.getId());
Expand Down Expand Up @@ -557,10 +561,6 @@ public void addSegment(final DruidServerMetadata server, final DataSegment segme
@VisibleForTesting
public void removeSegment(final DataSegment segment)
{
// tombstone segments are not present in the cache
if (segment.isTombstone()) {
return;
}
// Get lock first so that we won't wait in ConcurrentMap.compute().
synchronized (lock) {
log.debug("Segment [%s] is gone.", segment.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,9 +374,7 @@ public Iterator<AvailableSegmentMetadata> iterateSegmentMetadata()
.withNumRows(metadata.get().getNumRows())
.build();
} else {
// mark it for refresh, however, this case shouldn't arise by design
markSegmentAsNeedRefresh(segmentId);
log.debug("SchemaMetadata for segmentId[%s] is absent.", segmentId);
markSegmentForRefreshIfNeeded(availableSegmentMetadata.getSegment());
return availableSegmentMetadata;
}
}
Expand All @@ -403,9 +401,7 @@ public AvailableSegmentMetadata getAvailableSegmentMetadata(String datasource, S
.withNumRows(metadata.get().getNumRows())
.build();
} else {
// mark it for refresh, however, this case shouldn't arise by design
markSegmentAsNeedRefresh(segmentId);
log.debug("SchemaMetadata for segmentId [%s] is absent.", segmentId);
markSegmentForRefreshIfNeeded(availableSegmentMetadata.getSegment());
}
return availableSegmentMetadata;
}
Expand Down Expand Up @@ -686,22 +682,14 @@ public RowSignature buildDataSourceRowSignature(final String dataSource)
final Map<String, ColumnType> columnTypes = new LinkedHashMap<>();

if (segmentsMap != null && !segmentsMap.isEmpty()) {
for (SegmentId segmentId : segmentsMap.keySet()) {
for (Map.Entry<SegmentId, AvailableSegmentMetadata> entry : segmentsMap.entrySet()) {
SegmentId segmentId = entry.getKey();
Optional<SchemaPayloadPlus> optionalSchema = segmentSchemaCache.getSchemaForSegment(segmentId);
if (optionalSchema.isPresent()) {
RowSignature rowSignature = optionalSchema.get().getSchemaPayload().getRowSignature();
mergeRowSignature(columnTypes, rowSignature);
} else {
log.debug("SchemaMetadata for segmentId [%s] is absent.", segmentId);

ImmutableDruidDataSource druidDataSource =
sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(segmentId.getDataSource());

if (druidDataSource != null && druidDataSource.getSegment(segmentId) != null) {
// mark it for refresh only if it is used
// however, this case shouldn't arise by design
markSegmentAsNeedRefresh(segmentId);
}
markSegmentForRefreshIfNeeded(entry.getValue().getSegment());
}
}
} else {
Expand Down Expand Up @@ -876,4 +864,32 @@ Optional<RowSignature> mergeOrCreateRowSignature(
return Optional.empty();
}
}

/**
* A segment schema can go missing. To ensure smooth functioning, segment is marked for refresh.
* It need not be refreshed in the following scenarios:
* - Tombstone segment, since they do not have any schema.
* - Unused segment which hasn't been yet removed from the cache.
* Any other scenario needs investigation.
*/
private void markSegmentForRefreshIfNeeded(DataSegment segment)
{
SegmentId id = segment.getId();

log.debug("SchemaMetadata for segmentId [%s] is absent.", id);

if (segment.isTombstone()) {
log.debug("Skipping refresh for tombstone segment [%s].", id);
return;
}

ImmutableDruidDataSource druidDataSource =
sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(segment.getDataSource());

if (druidDataSource != null && druidDataSource.getSegment(id) != null) {
markSegmentAsNeedRefresh(id);
} else {
log.debug("Skipping refresh for unused segment [%s].", id);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.client.InternalQueryConfig;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
Expand Down Expand Up @@ -2220,74 +2221,109 @@ protected void coldDatasourceSchemaExec()
}

@Test
public void testTombstoneSegmentIsNotAdded() throws InterruptedException
public void testTombstoneSegmentIsNotRefreshed() throws IOException
{
String datasource = "newSegmentAddTest";
CountDownLatch addSegmentLatch = new CountDownLatch(1);
String brokerInternalQueryConfigJson = "{\"context\": { \"priority\": 5} }";

TestHelper.makeJsonMapper();
InternalQueryConfig internalQueryConfig = MAPPER.readValue(
MAPPER.writeValueAsString(
MAPPER.readValue(brokerInternalQueryConfigJson, InternalQueryConfig.class)
),
InternalQueryConfig.class
);

QueryLifecycleFactory factoryMock = EasyMock.createMock(QueryLifecycleFactory.class);
QueryLifecycle lifecycleMock = EasyMock.createMock(QueryLifecycle.class);

CoordinatorSegmentMetadataCache schema = new CoordinatorSegmentMetadataCache(
getQueryLifecycleFactory(walker),
factoryMock,
serverView,
SEGMENT_CACHE_CONFIG_DEFAULT,
new NoopEscalator(),
new InternalQueryConfig(),
internalQueryConfig,
new NoopServiceEmitter(),
segmentSchemaCache,
backFillQueue,
sqlSegmentsMetadataManager,
segmentsMetadataManagerConfigSupplier
)
{
@Override
public void addSegment(final DruidServerMetadata server, final DataSegment segment)
{
super.addSegment(server, segment);
if (datasource.equals(segment.getDataSource())) {
addSegmentLatch.countDown();
}
}
};
);

schema.onLeaderStart();
schema.awaitInitialization();
Map<String, Object> queryContext = ImmutableMap.of(
QueryContexts.PRIORITY_KEY, 5,
QueryContexts.BROKER_PARALLEL_MERGE_KEY, false
);

DataSegment segment = new DataSegment(
datasource,
Intervals.of("2001/2002"),
"1",
Collections.emptyMap(),
Collections.emptyList(),
Collections.emptyList(),
TombstoneShardSpec.INSTANCE,
null,
DataSegment segment = newSegment("test", 0);
DataSegment tombstone = DataSegment.builder()
.dataSource("test")
.interval(Intervals.of("2012-01-01/2012-01-02"))
.version(DateTimes.of("2012-01-01T11:22:33.444Z").toString())
.shardSpec(new TombstoneShardSpec())
.loadSpec(Collections.singletonMap(
"type",
DataSegment.TOMBSTONE_LOADSPEC_TYPE
))
.size(0)
.build();

final DruidServer historicalServer = druidServers.stream()
.filter(s -> s.getType().equals(ServerType.HISTORICAL))
.findAny()
.orElse(null);

Assert.assertNotNull(historicalServer);
final DruidServerMetadata historicalServerMetadata = historicalServer.getMetadata();

schema.addSegment(historicalServerMetadata, segment);
schema.addSegment(historicalServerMetadata, tombstone);
Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(tombstone.getId()));

List<SegmentId> segmentIterable = ImmutableList.of(segment.getId(), tombstone.getId());

SegmentMetadataQuery expectedMetadataQuery = new SegmentMetadataQuery(
new TableDataSource(segment.getDataSource()),
new MultipleSpecificSegmentSpec(
segmentIterable.stream()
.filter(id -> !id.equals(tombstone.getId()))
.map(SegmentId::toDescriptor)
.collect(Collectors.toList())
),
new AllColumnIncluderator(),
false,
queryContext,
EnumSet.of(SegmentMetadataQuery.AnalysisType.AGGREGATORS),
false,
null,
0
null
);

Assert.assertEquals(6, schema.getTotalSegments());
EasyMock.expect(factoryMock.factorize()).andReturn(lifecycleMock).once();
EasyMock.expect(lifecycleMock.runSimple(expectedMetadataQuery, AllowAllAuthenticator.ALLOW_ALL_RESULT, Access.OK))
.andReturn(QueryResponse.withEmptyContext(Sequences.empty())).once();

serverView.addSegment(segment, ServerType.HISTORICAL);
Assert.assertTrue(addSegmentLatch.await(1, TimeUnit.SECONDS));
Assert.assertEquals(0, addSegmentLatch.getCount());
EasyMock.replay(factoryMock, lifecycleMock);

Assert.assertEquals(6, schema.getTotalSegments());
List<AvailableSegmentMetadata> metadatas = schema
.getSegmentMetadataSnapshot()
.values()
.stream()
.filter(metadata -> datasource.equals(metadata.getSegment().getDataSource()))
.collect(Collectors.toList());
Assert.assertEquals(0, metadatas.size());
schema.refresh(Collections.singleton(segment.getId()), Collections.singleton("test"));

serverView.removeSegment(segment, ServerType.HISTORICAL);
Assert.assertEquals(6, schema.getTotalSegments());
metadatas = schema
.getSegmentMetadataSnapshot()
.values()
.stream()
.filter(metadata -> datasource.equals(metadata.getSegment().getDataSource()))
.collect(Collectors.toList());
Assert.assertEquals(0, metadatas.size());
// verify that metadata query is not issued for tombstone segment
EasyMock.verify(factoryMock, lifecycleMock);

// Verify that datasource schema building logic doesn't mark the tombstone segment for refresh
Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(tombstone.getId()));

AvailableSegmentMetadata availableSegmentMetadata = schema.getAvailableSegmentMetadata("test", tombstone.getId());
Assert.assertNotNull(availableSegmentMetadata);
// fetching metadata for tombstone segment shouldn't mark it for refresh
Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(tombstone.getId()));

Set<AvailableSegmentMetadata> metadatas = new HashSet<>();
schema.iterateSegmentMetadata().forEachRemaining(metadatas::add);

Assert.assertEquals(1, metadatas.stream().filter(metadata -> metadata.getSegment().isTombstone()).count());

// iterating over entire metadata doesn't cause tombstone to be marked for refresh
Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(tombstone.getId()));
}

@Test
Expand Down Expand Up @@ -2384,6 +2420,27 @@ public void refresh(Set<SegmentId> segmentsToRefresh, Set<String> dataSourcesToR

Assert.assertTrue(schema.getSegmentsNeedingRefresh().contains(segments.get(1).getId()));
Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(segments.get(2).getId()));

AvailableSegmentMetadata availableSegmentMetadata =
schema.getAvailableSegmentMetadata(dataSource, segments.get(0).getId());

Assert.assertNotNull(availableSegmentMetadata);
// fetching metadata for unused segment shouldn't mark it for refresh
Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(segments.get(0).getId()));

Set<AvailableSegmentMetadata> metadatas = new HashSet<>();
schema.iterateSegmentMetadata().forEachRemaining(metadatas::add);

Assert.assertEquals(
1,
metadatas.stream()
.filter(
metadata ->
metadata.getSegment().getId().equals(segments.get(0).getId())).count()
);

// iterating over entire metadata doesn't cause unsed segment to be marked for refresh
Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(segments.get(0).getId()));
}

private void verifyFooDSSchema(CoordinatorSegmentMetadataCache schema, int columns)
Expand Down
Loading

0 comments on commit d7e3e75

Please sign in to comment.