From 60ed36c89bc0e5598af56542a74ac503230959d3 Mon Sep 17 00:00:00 2001 From: Rishabh Singh <6513075+findingrish@users.noreply.github.com> Date: Thu, 19 Sep 2024 14:39:15 +0530 Subject: [PATCH] Skip tombstone segment refresh in metadata cache (#17025) (#17112) This PR #16890 introduced a change to skip adding tombstone segments to the cache. It turns out that as a side effect tombstone segments appear unavailable in the console. This happens because availability of a segment in Broker is determined from the metadata cache. The fix is to keep the segment in the metadata cache but skip them from refresh. This doesn't affect any functionality as metadata query for tombstone returns empty causing continuous refresh of those segments. --- .../AbstractSegmentMetadataCache.java | 24 +-- .../CoordinatorSegmentMetadataCache.java | 50 ++++-- .../CoordinatorSegmentMetadataCacheTest.java | 155 ++++++++++++------ .../BrokerSegmentMetadataCacheTest.java | 141 ++++++++++------ 4 files changed, 241 insertions(+), 129 deletions(-) diff --git a/server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java b/server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java index d918ec5e3f29..99d965ec643e 100644 --- a/server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java +++ b/server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java @@ -102,6 +102,13 @@ *

* This class has an abstract method {@link #refresh(Set, Set)} which the child class must override * with the logic to build and cache table schema. + *

+ * Note on handling tombstone segments: + * These segments lack data or column information. + * Additionally, segment metadata queries, which are not yet implemented for tombstone segments + * (see: https://github.com/apache/druid/pull/12137) do not provide metadata for tombstones, + * leading to indefinite refresh attempts for these segments. + * Therefore, these segments are never added to the set of segments being refreshed. * * @param The type of information associated with the data source, which must extend {@link DataSourceInformation}. */ @@ -478,13 +485,6 @@ public int getTotalSegments() @VisibleForTesting public void addSegment(final DruidServerMetadata server, final DataSegment segment) { - // Skip adding tombstone segment to the cache. These segments lack data or column information. - // Additionally, segment metadata queries, which are not yet implemented for tombstone segments - // (see: https://github.com/apache/druid/pull/12137) do not provide metadata for tombstones, - // leading to indefinite refresh attempts for these segments. - if (segment.isTombstone()) { - return; - } // Get lock first so that we won't wait in ConcurrentMap.compute(). synchronized (lock) { // someday we could hypothetically remove broker special casing, whenever BrokerServerView supports tracking @@ -511,7 +511,11 @@ public void addSegment(final DruidServerMetadata server, final DataSegment segme segmentMetadata = AvailableSegmentMetadata .builder(segment, isRealtime, ImmutableSet.of(server), null, DEFAULT_NUM_ROWS) // Added without needing a refresh .build(); - markSegmentAsNeedRefresh(segment.getId()); + if (segment.isTombstone()) { + log.debug("Skipping refresh for tombstone segment."); + } else { + markSegmentAsNeedRefresh(segment.getId()); + } if (!server.isSegmentReplicationTarget()) { log.debug("Added new mutable segment [%s].", segment.getId()); markSegmentAsMutable(segment.getId()); @@ -557,10 +561,6 @@ public void addSegment(final DruidServerMetadata server, final DataSegment segme @VisibleForTesting public void removeSegment(final DataSegment segment) { - // tombstone segments are not present in the cache - if (segment.isTombstone()) { - return; - } // Get lock first so that we won't wait in ConcurrentMap.compute(). synchronized (lock) { log.debug("Segment [%s] is gone.", segment.getId()); diff --git a/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java b/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java index 321c33fa1dbf..24489e60acdc 100644 --- a/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java +++ b/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java @@ -374,9 +374,7 @@ public Iterator iterateSegmentMetadata() .withNumRows(metadata.get().getNumRows()) .build(); } else { - // mark it for refresh, however, this case shouldn't arise by design - markSegmentAsNeedRefresh(segmentId); - log.debug("SchemaMetadata for segmentId[%s] is absent.", segmentId); + markSegmentForRefreshIfNeeded(availableSegmentMetadata.getSegment()); return availableSegmentMetadata; } } @@ -403,9 +401,7 @@ public AvailableSegmentMetadata getAvailableSegmentMetadata(String datasource, S .withNumRows(metadata.get().getNumRows()) .build(); } else { - // mark it for refresh, however, this case shouldn't arise by design - markSegmentAsNeedRefresh(segmentId); - log.debug("SchemaMetadata for segmentId [%s] is absent.", segmentId); + markSegmentForRefreshIfNeeded(availableSegmentMetadata.getSegment()); } return availableSegmentMetadata; } @@ -686,22 +682,14 @@ public RowSignature buildDataSourceRowSignature(final String dataSource) final Map columnTypes = new LinkedHashMap<>(); if (segmentsMap != null && !segmentsMap.isEmpty()) { - for (SegmentId segmentId : segmentsMap.keySet()) { + for (Map.Entry entry : segmentsMap.entrySet()) { + SegmentId segmentId = entry.getKey(); Optional optionalSchema = segmentSchemaCache.getSchemaForSegment(segmentId); if (optionalSchema.isPresent()) { RowSignature rowSignature = optionalSchema.get().getSchemaPayload().getRowSignature(); mergeRowSignature(columnTypes, rowSignature); } else { - log.debug("SchemaMetadata for segmentId [%s] is absent.", segmentId); - - ImmutableDruidDataSource druidDataSource = - sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(segmentId.getDataSource()); - - if (druidDataSource != null && druidDataSource.getSegment(segmentId) != null) { - // mark it for refresh only if it is used - // however, this case shouldn't arise by design - markSegmentAsNeedRefresh(segmentId); - } + markSegmentForRefreshIfNeeded(entry.getValue().getSegment()); } } } else { @@ -876,4 +864,32 @@ Optional mergeOrCreateRowSignature( return Optional.empty(); } } + + /** + * A segment schema can go missing. To ensure smooth functioning, segment is marked for refresh. + * It need not be refreshed in the following scenarios: + * - Tombstone segment, since they do not have any schema. + * - Unused segment which hasn't been yet removed from the cache. + * Any other scenario needs investigation. + */ + private void markSegmentForRefreshIfNeeded(DataSegment segment) + { + SegmentId id = segment.getId(); + + log.debug("SchemaMetadata for segmentId [%s] is absent.", id); + + if (segment.isTombstone()) { + log.debug("Skipping refresh for tombstone segment [%s].", id); + return; + } + + ImmutableDruidDataSource druidDataSource = + sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(segment.getDataSource()); + + if (druidDataSource != null && druidDataSource.getSegment(id) != null) { + markSegmentAsNeedRefresh(id); + } else { + log.debug("Skipping refresh for unused segment [%s].", id); + } + } } diff --git a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java index 0c099cb551cb..22b0890e855e 100644 --- a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java +++ b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java @@ -32,6 +32,7 @@ import org.apache.druid.client.ImmutableDruidDataSource; import org.apache.druid.client.InternalQueryConfig; import org.apache.druid.data.input.InputRow; +import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.StringUtils; @@ -2220,74 +2221,109 @@ protected void coldDatasourceSchemaExec() } @Test - public void testTombstoneSegmentIsNotAdded() throws InterruptedException + public void testTombstoneSegmentIsNotRefreshed() throws IOException { - String datasource = "newSegmentAddTest"; - CountDownLatch addSegmentLatch = new CountDownLatch(1); + String brokerInternalQueryConfigJson = "{\"context\": { \"priority\": 5} }"; + + TestHelper.makeJsonMapper(); + InternalQueryConfig internalQueryConfig = MAPPER.readValue( + MAPPER.writeValueAsString( + MAPPER.readValue(brokerInternalQueryConfigJson, InternalQueryConfig.class) + ), + InternalQueryConfig.class + ); + + QueryLifecycleFactory factoryMock = EasyMock.createMock(QueryLifecycleFactory.class); + QueryLifecycle lifecycleMock = EasyMock.createMock(QueryLifecycle.class); CoordinatorSegmentMetadataCache schema = new CoordinatorSegmentMetadataCache( - getQueryLifecycleFactory(walker), + factoryMock, serverView, SEGMENT_CACHE_CONFIG_DEFAULT, new NoopEscalator(), - new InternalQueryConfig(), + internalQueryConfig, new NoopServiceEmitter(), segmentSchemaCache, backFillQueue, sqlSegmentsMetadataManager, segmentsMetadataManagerConfigSupplier - ) - { - @Override - public void addSegment(final DruidServerMetadata server, final DataSegment segment) - { - super.addSegment(server, segment); - if (datasource.equals(segment.getDataSource())) { - addSegmentLatch.countDown(); - } - } - }; + ); - schema.onLeaderStart(); - schema.awaitInitialization(); + Map queryContext = ImmutableMap.of( + QueryContexts.PRIORITY_KEY, 5, + QueryContexts.BROKER_PARALLEL_MERGE_KEY, false + ); - DataSegment segment = new DataSegment( - datasource, - Intervals.of("2001/2002"), - "1", - Collections.emptyMap(), - Collections.emptyList(), - Collections.emptyList(), - TombstoneShardSpec.INSTANCE, - null, + DataSegment segment = newSegment("test", 0); + DataSegment tombstone = DataSegment.builder() + .dataSource("test") + .interval(Intervals.of("2012-01-01/2012-01-02")) + .version(DateTimes.of("2012-01-01T11:22:33.444Z").toString()) + .shardSpec(new TombstoneShardSpec()) + .loadSpec(Collections.singletonMap( + "type", + DataSegment.TOMBSTONE_LOADSPEC_TYPE + )) + .size(0) + .build(); + + final DruidServer historicalServer = druidServers.stream() + .filter(s -> s.getType().equals(ServerType.HISTORICAL)) + .findAny() + .orElse(null); + + Assert.assertNotNull(historicalServer); + final DruidServerMetadata historicalServerMetadata = historicalServer.getMetadata(); + + schema.addSegment(historicalServerMetadata, segment); + schema.addSegment(historicalServerMetadata, tombstone); + Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(tombstone.getId())); + + List segmentIterable = ImmutableList.of(segment.getId(), tombstone.getId()); + + SegmentMetadataQuery expectedMetadataQuery = new SegmentMetadataQuery( + new TableDataSource(segment.getDataSource()), + new MultipleSpecificSegmentSpec( + segmentIterable.stream() + .filter(id -> !id.equals(tombstone.getId())) + .map(SegmentId::toDescriptor) + .collect(Collectors.toList()) + ), + new AllColumnIncluderator(), + false, + queryContext, + EnumSet.of(SegmentMetadataQuery.AnalysisType.AGGREGATORS), + false, null, - 0 + null ); - Assert.assertEquals(6, schema.getTotalSegments()); + EasyMock.expect(factoryMock.factorize()).andReturn(lifecycleMock).once(); + EasyMock.expect(lifecycleMock.runSimple(expectedMetadataQuery, AllowAllAuthenticator.ALLOW_ALL_RESULT, Access.OK)) + .andReturn(QueryResponse.withEmptyContext(Sequences.empty())).once(); - serverView.addSegment(segment, ServerType.HISTORICAL); - Assert.assertTrue(addSegmentLatch.await(1, TimeUnit.SECONDS)); - Assert.assertEquals(0, addSegmentLatch.getCount()); + EasyMock.replay(factoryMock, lifecycleMock); - Assert.assertEquals(6, schema.getTotalSegments()); - List metadatas = schema - .getSegmentMetadataSnapshot() - .values() - .stream() - .filter(metadata -> datasource.equals(metadata.getSegment().getDataSource())) - .collect(Collectors.toList()); - Assert.assertEquals(0, metadatas.size()); + schema.refresh(Collections.singleton(segment.getId()), Collections.singleton("test")); - serverView.removeSegment(segment, ServerType.HISTORICAL); - Assert.assertEquals(6, schema.getTotalSegments()); - metadatas = schema - .getSegmentMetadataSnapshot() - .values() - .stream() - .filter(metadata -> datasource.equals(metadata.getSegment().getDataSource())) - .collect(Collectors.toList()); - Assert.assertEquals(0, metadatas.size()); + // verify that metadata query is not issued for tombstone segment + EasyMock.verify(factoryMock, lifecycleMock); + + // Verify that datasource schema building logic doesn't mark the tombstone segment for refresh + Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(tombstone.getId())); + + AvailableSegmentMetadata availableSegmentMetadata = schema.getAvailableSegmentMetadata("test", tombstone.getId()); + Assert.assertNotNull(availableSegmentMetadata); + // fetching metadata for tombstone segment shouldn't mark it for refresh + Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(tombstone.getId())); + + Set metadatas = new HashSet<>(); + schema.iterateSegmentMetadata().forEachRemaining(metadatas::add); + + Assert.assertEquals(1, metadatas.stream().filter(metadata -> metadata.getSegment().isTombstone()).count()); + + // iterating over entire metadata doesn't cause tombstone to be marked for refresh + Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(tombstone.getId())); } @Test @@ -2384,6 +2420,27 @@ public void refresh(Set segmentsToRefresh, Set dataSourcesToR Assert.assertTrue(schema.getSegmentsNeedingRefresh().contains(segments.get(1).getId())); Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(segments.get(2).getId())); + + AvailableSegmentMetadata availableSegmentMetadata = + schema.getAvailableSegmentMetadata(dataSource, segments.get(0).getId()); + + Assert.assertNotNull(availableSegmentMetadata); + // fetching metadata for unused segment shouldn't mark it for refresh + Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(segments.get(0).getId())); + + Set metadatas = new HashSet<>(); + schema.iterateSegmentMetadata().forEachRemaining(metadatas::add); + + Assert.assertEquals( + 1, + metadatas.stream() + .filter( + metadata -> + metadata.getSegment().getId().equals(segments.get(0).getId())).count() + ); + + // iterating over entire metadata doesn't cause unsed segment to be marked for refresh + Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(segments.get(0).getId())); } private void verifyFooDSSchema(CoordinatorSegmentMetadataCache schema, int columns) diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java index d9b24ed011dc..b613c602f633 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java @@ -37,6 +37,7 @@ import org.apache.druid.client.coordinator.CoordinatorClient; import org.apache.druid.client.coordinator.NoopCoordinatorClient; import org.apache.druid.data.input.InputRow; +import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.guava.Sequences; @@ -1139,71 +1140,109 @@ public void testNoDatasourceSchemaWhenNoSegmentMetadata() throws InterruptedExce } @Test - public void testTombstoneSegmentIsNotAdded() throws InterruptedException + public void testTombstoneSegmentIsNotRefreshed() throws IOException { - String datasource = "newSegmentAddTest"; - CountDownLatch addSegmentLatch = new CountDownLatch(1); + String brokerInternalQueryConfigJson = "{\"context\": { \"priority\": 5} }"; + + TestHelper.makeJsonMapper(); + InternalQueryConfig internalQueryConfig = MAPPER.readValue( + MAPPER.writeValueAsString( + MAPPER.readValue(brokerInternalQueryConfigJson, InternalQueryConfig.class) + ), + InternalQueryConfig.class + ); + + QueryLifecycleFactory factoryMock = EasyMock.createMock(QueryLifecycleFactory.class); + QueryLifecycle lifecycleMock = EasyMock.createMock(QueryLifecycle.class); + BrokerSegmentMetadataCache schema = new BrokerSegmentMetadataCache( - CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), + factoryMock, serverView, - BrokerSegmentMetadataCacheConfig.create(), + SEGMENT_CACHE_CONFIG_DEFAULT, new NoopEscalator(), - new InternalQueryConfig(), + internalQueryConfig, new NoopServiceEmitter(), new PhysicalDatasourceMetadataFactory(globalTableJoinable, segmentManager), new NoopCoordinatorClient(), CentralizedDatasourceSchemaConfig.create() - ) - { - @Override - public void addSegment(final DruidServerMetadata server, final DataSegment segment) - { - super.addSegment(server, segment); - if (datasource.equals(segment.getDataSource())) { - addSegmentLatch.countDown(); - } - } - }; + ); - schema.start(); - schema.awaitInitialization(); + Map queryContext = ImmutableMap.of( + QueryContexts.PRIORITY_KEY, 5, + QueryContexts.BROKER_PARALLEL_MERGE_KEY, false + ); - DataSegment segment = new DataSegment( - datasource, - Intervals.of("2001/2002"), - "1", - Collections.emptyMap(), - Collections.emptyList(), - Collections.emptyList(), - TombstoneShardSpec.INSTANCE, - null, + DataSegment segment = newSegment("test", 0); + DataSegment tombstone = DataSegment.builder() + .dataSource("test") + .interval(Intervals.of("2012-01-01/2012-01-02")) + .version(DateTimes.of("2012-01-01T11:22:33.444Z").toString()) + .shardSpec(new TombstoneShardSpec()) + .loadSpec(Collections.singletonMap( + "type", + DataSegment.TOMBSTONE_LOADSPEC_TYPE + )) + .size(0) + .build(); + + final ImmutableDruidServer historicalServer = druidServers.stream() + .filter(s -> s.getType().equals(ServerType.HISTORICAL)) + .findAny() + .orElse(null); + + Assert.assertNotNull(historicalServer); + final DruidServerMetadata historicalServerMetadata = historicalServer.getMetadata(); + + schema.addSegment(historicalServerMetadata, segment); + schema.addSegment(historicalServerMetadata, tombstone); + Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(tombstone.getId())); + + List segmentIterable = ImmutableList.of(segment.getId(), tombstone.getId()); + + SegmentMetadataQuery expectedMetadataQuery = new SegmentMetadataQuery( + new TableDataSource(segment.getDataSource()), + new MultipleSpecificSegmentSpec( + segmentIterable.stream() + .filter(id -> !id.equals(tombstone.getId())) + .map(SegmentId::toDescriptor) + .collect(Collectors.toList()) + ), + new AllColumnIncluderator(), + false, + queryContext, + EnumSet.noneOf(SegmentMetadataQuery.AnalysisType.class), + false, null, - 0 + null ); - Assert.assertEquals(6, schema.getTotalSegments()); + EasyMock.expect(factoryMock.factorize()).andReturn(lifecycleMock).once(); + EasyMock.expect(lifecycleMock.runSimple(expectedMetadataQuery, AllowAllAuthenticator.ALLOW_ALL_RESULT, Access.OK)) + .andReturn(QueryResponse.withEmptyContext(Sequences.empty())); - serverView.addSegment(segment, ServerType.HISTORICAL); - Assert.assertTrue(addSegmentLatch.await(1, TimeUnit.SECONDS)); - Assert.assertEquals(0, addSegmentLatch.getCount()); + EasyMock.replay(factoryMock, lifecycleMock); - Assert.assertEquals(6, schema.getTotalSegments()); - List metadatas = schema - .getSegmentMetadataSnapshot() - .values() - .stream() - .filter(metadata -> datasource.equals(metadata.getSegment().getDataSource())) - .collect(Collectors.toList()); - Assert.assertEquals(0, metadatas.size()); - - serverView.removeSegment(segment, ServerType.HISTORICAL); - Assert.assertEquals(6, schema.getTotalSegments()); - metadatas = schema - .getSegmentMetadataSnapshot() - .values() - .stream() - .filter(metadata -> datasource.equals(metadata.getSegment().getDataSource())) - .collect(Collectors.toList()); - Assert.assertEquals(0, metadatas.size()); + Set segmentsToRefresh = new HashSet<>(); + segmentsToRefresh.add(segment.getId()); + schema.refresh(segmentsToRefresh, Collections.singleton("test")); + + // verify that metadata is not issued for tombstone segment + EasyMock.verify(factoryMock, lifecycleMock); + + // Verify that datasource schema building logic doesn't mark the tombstone segment for refresh + Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(tombstone.getId())); + + AvailableSegmentMetadata availableSegmentMetadata = schema.getAvailableSegmentMetadata("test", tombstone.getId()); + Assert.assertNotNull(availableSegmentMetadata); + // fetching metadata for tombstone segment shouldn't mark it for refresh + Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(tombstone.getId())); + + Set metadatas = new HashSet<>(); + schema.iterateSegmentMetadata().forEachRemaining(metadatas::add); + + Assert.assertEquals(1, metadatas.stream().filter(metadata -> metadata.getSegment().isTombstone()).count()); + + // iterating over entire metadata doesn't cause tombstone to be marked for refresh + Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(tombstone.getId())); } }