diff --git a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java index 76e4f9745762..dd86ad6df37d 100644 --- a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java +++ b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java @@ -22,6 +22,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; +import com.google.common.collect.UnmodifiableIterator; import org.apache.druid.java.util.common.CloseableIterators; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.IAE; @@ -40,6 +42,7 @@ import javax.annotation.Nullable; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Iterator; @@ -56,6 +59,13 @@ public class SqlSegmentsMetadataQuery { private static final Logger log = new Logger(SqlSegmentsMetadataQuery.class); + /** + * Maximum number of intervals to consider for a batch. + * This is similar to {@link IndexerSQLMetadataStorageCoordinator#MAX_NUM_SEGMENTS_TO_ANNOUNCE_AT_ONCE}, but imposed + * on the intervals size. + */ + private static final int MAX_INTERVALS_PER_BATCH = 100; + private final Handle handle; private final SQLMetadataConnector connector; private final MetadataStorageTablesConfig dbTables; @@ -344,6 +354,42 @@ private CloseableIterator retrieveSegments( final boolean used, @Nullable final Integer limit ) + { + if (intervals.isEmpty()) { + return CloseableIterators.withEmptyBaggage( + retrieveSegmentsInIntervalsBatch(dataSource, intervals, matchMode, used, limit) + ); + } else { + final List> intervalsLists = Lists.partition(new ArrayList<>(intervals), MAX_INTERVALS_PER_BATCH); + final List> resultingIterators = new ArrayList<>(); + Integer limitPerBatch = limit; + + for (final List intervalList : intervalsLists) { + final UnmodifiableIterator iterator = retrieveSegmentsInIntervalsBatch(dataSource, intervalList, matchMode, used, limitPerBatch); + if (limitPerBatch != null) { + // If limit is provided, we need to shrink the limit for subsequent batches or circuit break if + // we have reached what was requested for. + final List dataSegments = ImmutableList.copyOf(iterator); + resultingIterators.add(dataSegments.iterator()); + if (dataSegments.size() >= limitPerBatch) { + break; + } + limitPerBatch -= dataSegments.size(); + } else { + resultingIterators.add(iterator); + } + } + return CloseableIterators.withEmptyBaggage(Iterators.concat(resultingIterators.iterator())); + } + } + + private UnmodifiableIterator retrieveSegmentsInIntervalsBatch( + final String dataSource, + final Collection intervals, + final IntervalMode matchMode, + final boolean used, + @Nullable final Integer limit + ) { // Check if the intervals all support comparing as strings. If so, bake them into the SQL. final boolean compareAsString = intervals.stream().allMatch(Intervals::canCompareEndpointsAsStrings); @@ -372,27 +418,24 @@ private CloseableIterator retrieveSegments( sql.map((index, r, ctx) -> JacksonUtils.readValue(jsonMapper, r.getBytes(1), DataSegment.class)) .iterator(); - return CloseableIterators.wrap( - Iterators.filter( - resultIterator, - dataSegment -> { - if (intervals.isEmpty()) { + return Iterators.filter( + resultIterator, + dataSegment -> { + if (intervals.isEmpty()) { + return true; + } else { + // Must re-check that the interval matches, even if comparing as string, because the *segment interval* + // might not be string-comparable. (Consider a query interval like "2000-01-01/3000-01-01" and a + // segment interval like "20010/20011".) + for (Interval interval : intervals) { + if (matchMode.apply(interval, dataSegment.getInterval())) { return true; - } else { - // Must re-check that the interval matches, even if comparing as string, because the *segment interval* - // might not be string-comparable. (Consider a query interval like "2000-01-01/3000-01-01" and a - // segment interval like "20010/20011".) - for (Interval interval : intervals) { - if (matchMode.apply(interval, dataSegment.getInterval())) { - return true; - } - } - - return false; } } - ), - resultIterator + + return false; + } + } ); } diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index a8fc9e923c5c..888c10067176 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -38,6 +38,7 @@ import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.jackson.JacksonUtils; +import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.apache.druid.timeline.DataSegment; @@ -69,6 +70,7 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -1062,6 +1064,213 @@ public void testMultiIntervalUsedList() throws IOException ).containsOnlyOnce(defaultSegment3); } + @Test + public void testRetrieveUsedSegmentsUsingMultipleIntervals() throws IOException + { + final List segments = createAndGetUsedYearSegments(1900, 2133); + final List intervals = segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()); + + final Collection actualUsedSegments = coordinator.retrieveUsedSegmentsForIntervals( + DS.WIKI, + intervals, + Segments.ONLY_VISIBLE + ); + + Assert.assertEquals(segments.size(), actualUsedSegments.size()); + Assert.assertTrue(actualUsedSegments.containsAll(segments)); + } + + @Test + public void testRetrieveAllUsedSegmentsUsingIntervalsOutOfRange() throws IOException + { + final List segments = createAndGetUsedYearSegments(1905, 1910); + + final Interval outOfRangeInterval = Intervals.of("1700/1800"); + Assert.assertTrue(segments.stream() + .anyMatch(segment -> !segment.getInterval().overlaps(outOfRangeInterval))); + + final Collection actualUsedSegments = coordinator.retrieveUsedSegmentsForIntervals( + DS.WIKI, + ImmutableList.of(outOfRangeInterval), + Segments.ONLY_VISIBLE + ); + + Assert.assertEquals(0, actualUsedSegments.size()); + } + + @Test + public void testRetrieveAllUsedSegmentsUsingNoIntervals() throws IOException + { + final List segments = createAndGetUsedYearSegments(1900, 2133); + + final Collection actualUsedSegments = coordinator.retrieveAllUsedSegments( + DS.WIKI, + Segments.ONLY_VISIBLE + ); + + Assert.assertEquals(segments.size(), actualUsedSegments.size()); + Assert.assertTrue(actualUsedSegments.containsAll(segments)); + } + + @Test + public void testRetrieveUnusedSegmentsUsingSingleIntervalAndNoLimit() throws IOException + { + final List segments = createAndGetUsedYearSegments(1900, 2133); + markAllSegmentsUnused(new HashSet<>(segments)); + + final List actualUnusedSegments = coordinator.retrieveUnusedSegmentsForInterval( + DS.WIKI, + Intervals.of("1900/3000"), + null + ); + + Assert.assertEquals(segments.size(), actualUnusedSegments.size()); + Assert.assertTrue(actualUnusedSegments.containsAll(segments)); + } + + @Test + public void testRetrieveUnusedSegmentsUsingSingleIntervalAndLimitAtRange() throws IOException + { + final List segments = createAndGetUsedYearSegments(1900, 2133); + markAllSegmentsUnused(new HashSet<>(segments)); + + final int requestedLimit = segments.size(); + final List actualUnusedSegments = coordinator.retrieveUnusedSegmentsForInterval( + DS.WIKI, + Intervals.of("1900/3000"), + requestedLimit + ); + + Assert.assertEquals(requestedLimit, actualUnusedSegments.size()); + Assert.assertTrue(actualUnusedSegments.containsAll(segments)); + } + + @Test + public void testRetrieveUnusedSegmentsUsingSingleIntervalAndLimitInRange() throws IOException + { + final List segments = createAndGetUsedYearSegments(1900, 2133); + markAllSegmentsUnused(new HashSet<>(segments)); + + final int requestedLimit = segments.size() - 1; + final List actualUnusedSegments = coordinator.retrieveUnusedSegmentsForInterval( + DS.WIKI, + Intervals.of("1900/3000"), + requestedLimit + ); + + Assert.assertEquals(requestedLimit, actualUnusedSegments.size()); + Assert.assertTrue(actualUnusedSegments.containsAll(segments.stream().limit(requestedLimit).collect(Collectors.toList()))); + } + + @Test + public void testRetrieveUnusedSegmentsUsingSingleIntervalAndLimitOutOfRange() throws IOException + { + final List segments = createAndGetUsedYearSegments(1900, 2133); + markAllSegmentsUnused(new HashSet<>(segments)); + + final int limit = segments.size() + 1; + final List actualUnusedSegments = coordinator.retrieveUnusedSegmentsForInterval( + DS.WIKI, + Intervals.of("1900/3000"), + limit + ); + Assert.assertEquals(segments.size(), actualUnusedSegments.size()); + Assert.assertTrue(actualUnusedSegments.containsAll(segments)); + } + + @Test + public void testRetrieveUnusedSegmentsUsingSingleIntervalOutOfRange() throws IOException + { + final List segments = createAndGetUsedYearSegments(1905, 1910); + markAllSegmentsUnused(new HashSet<>(segments)); + + final Interval outOfRangeInterval = Intervals.of("1700/1800"); + Assert.assertTrue(segments.stream() + .anyMatch(segment -> !segment.getInterval().overlaps(outOfRangeInterval))); + final int limit = segments.size() + 1; + + final List actualUnusedSegments = coordinator.retrieveUnusedSegmentsForInterval( + DS.WIKI, + outOfRangeInterval, + limit + ); + Assert.assertEquals(0, actualUnusedSegments.size()); + } + + @Test + public void testRetrieveUnusedSegmentsUsingMultipleIntervalsAndNoLimit() throws IOException + { + final List segments = createAndGetUsedYearSegments(1900, 2133); + markAllSegmentsUnused(new HashSet<>(segments)); + + final ImmutableList actualUnusedSegments = retrieveUnusedSegmentsUsingMultipleIntervalsAndLimit( + segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()), + null + ); + Assert.assertEquals(segments.size(), actualUnusedSegments.size()); + Assert.assertTrue(segments.containsAll(actualUnusedSegments)); + } + + @Test + public void testRetrieveUnusedSegmentsUsingMultipleIntervalsAndLimitAtRange() throws IOException + { + final List segments = createAndGetUsedYearSegments(1900, 2133); + markAllSegmentsUnused(new HashSet<>(segments)); + + final ImmutableList actualUnusedSegments = retrieveUnusedSegmentsUsingMultipleIntervalsAndLimit( + segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()), + segments.size() + ); + Assert.assertEquals(segments.size(), actualUnusedSegments.size()); + Assert.assertTrue(segments.containsAll(actualUnusedSegments)); + } + + @Test + public void testRetrieveUnusedSegmentsUsingMultipleIntervalsAndLimitInRange() throws IOException + { + final List segments = createAndGetUsedYearSegments(1900, 2133); + markAllSegmentsUnused(new HashSet<>(segments)); + + final int requestedLimit = segments.size() - 1; + final ImmutableList actualUnusedSegments = retrieveUnusedSegmentsUsingMultipleIntervalsAndLimit( + segments.stream().limit(requestedLimit).map(DataSegment::getInterval).collect(Collectors.toList()), + requestedLimit + ); + Assert.assertEquals(requestedLimit, actualUnusedSegments.size()); + Assert.assertTrue(actualUnusedSegments.containsAll(segments.stream().limit(requestedLimit).collect(Collectors.toList()))); + } + + @Test + public void testRetrieveUnusedSegmentsUsingMultipleIntervalsAndLimitOutOfRange() throws IOException + { + final List segments = createAndGetUsedYearSegments(1900, 2133); + markAllSegmentsUnused(new HashSet<>(segments)); + + final ImmutableList actualUnusedSegments = retrieveUnusedSegmentsUsingMultipleIntervalsAndLimit( + segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()), + segments.size() + 1 + ); + Assert.assertEquals(segments.size(), actualUnusedSegments.size()); + Assert.assertTrue(actualUnusedSegments.containsAll(segments)); + } + + @Test + public void testRetrieveUnusedSegmentsUsingIntervalOutOfRange() throws IOException + { + final List segments = createAndGetUsedYearSegments(1905, 1910); + markAllSegmentsUnused(new HashSet<>(segments)); + + final Interval outOfRangeInterval = Intervals.of("1700/1800"); + Assert.assertTrue(segments.stream() + .anyMatch(segment -> !segment.getInterval().overlaps(outOfRangeInterval))); + + final ImmutableList actualUnusedSegments = retrieveUnusedSegmentsUsingMultipleIntervalsAndLimit( + ImmutableList.of(outOfRangeInterval), + null + ); + Assert.assertEquals(0, actualUnusedSegments.size()); + } + @Test public void testSimpleUnusedList() throws IOException { @@ -2713,4 +2922,43 @@ private DataSegment createSegment(Interval interval, String version, ShardSpec s .size(100) .build(); } + + private List createAndGetUsedYearSegments(final int startYear, final int endYear) throws IOException + { + final List segments = new ArrayList<>(); + + for (int year = startYear; year < endYear; year++) { + segments.add(createSegment( + Intervals.of("%d/%d", year, year + 1), + "version", + new LinearShardSpec(0)) + ); + } + final Set segmentsSet = new HashSet<>(segments); + final Set committedSegments = coordinator.commitSegments(segmentsSet); + Assert.assertTrue(committedSegments.containsAll(new HashSet<>(segments))); + + return segments; + } + + private ImmutableList retrieveUnusedSegmentsUsingMultipleIntervalsAndLimit( + final List intervals, + final Integer limit + ) + { + return derbyConnector.inReadOnlyTransaction( + (handle, status) -> { + try (final CloseableIterator iterator = + SqlSegmentsMetadataQuery.forHandle( + handle, + derbyConnector, + derbyConnectorRule.metadataTablesConfigSupplier().get(), + mapper + ) + .retrieveUnusedSegments(DS.WIKI, intervals, limit)) { + return ImmutableList.copyOf(iterator); + } + } + ); + } }