Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch segment retrieval from the metadata store #15305

Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.UnmodifiableIterator;
import org.apache.druid.java.util.common.CloseableIterators;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
Expand All @@ -40,6 +42,7 @@

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
Expand All @@ -56,6 +59,13 @@ public class SqlSegmentsMetadataQuery
{
private static final Logger log = new Logger(SqlSegmentsMetadataQuery.class);

/**
* Maximum number of intervals to consider for a batch.
* This is similar to {@link IndexerSQLMetadataStorageCoordinator#MAX_NUM_SEGMENTS_TO_ANNOUNCE_AT_ONCE}, but imposed
* on the intervals size.
*/
private static final int MAX_INTERVALS_PER_BATCH = 100;

private final Handle handle;
private final SQLMetadataConnector connector;
private final MetadataStorageTablesConfig dbTables;
Expand Down Expand Up @@ -344,6 +354,42 @@ private CloseableIterator<DataSegment> retrieveSegments(
final boolean used,
@Nullable final Integer limit
)
{
if (intervals.isEmpty()) {
return CloseableIterators.withEmptyBaggage(
retrieveSegmentsInIntervalsBatch(dataSource, intervals, matchMode, used, limit)
);
} else {
final List<List<Interval>> intervalsLists = Lists.partition(new ArrayList<>(intervals), MAX_INTERVALS_PER_BATCH);
final List<Iterator<DataSegment>> resultingIterators = new ArrayList<>();
Integer limitPerBatch = limit;

for (final List<Interval> intervalList : intervalsLists) {
final UnmodifiableIterator<DataSegment> iterator = retrieveSegmentsInIntervalsBatch(dataSource, intervalList, matchMode, used, limitPerBatch);
if (limitPerBatch != null) {
// If limit is provided, we need to shrink the limit for subsequent batches or circuit break if
// we have reached what was requested for.
final List<DataSegment> dataSegments = ImmutableList.copyOf(iterator);
resultingIterators.add(dataSegments.iterator());
if (dataSegments.size() >= limitPerBatch) {
break;
}
limitPerBatch -= dataSegments.size();
} else {
resultingIterators.add(iterator);
}
}
return CloseableIterators.withEmptyBaggage(Iterators.concat(resultingIterators.iterator()));
}
}

private UnmodifiableIterator<DataSegment> retrieveSegmentsInIntervalsBatch(
final String dataSource,
final Collection<Interval> intervals,
final IntervalMode matchMode,
final boolean used,
@Nullable final Integer limit
)
{
// Check if the intervals all support comparing as strings. If so, bake them into the SQL.
final boolean compareAsString = intervals.stream().allMatch(Intervals::canCompareEndpointsAsStrings);
Expand Down Expand Up @@ -372,27 +418,24 @@ private CloseableIterator<DataSegment> retrieveSegments(
sql.map((index, r, ctx) -> JacksonUtils.readValue(jsonMapper, r.getBytes(1), DataSegment.class))
.iterator();

return CloseableIterators.wrap(
Iterators.filter(
resultIterator,
dataSegment -> {
if (intervals.isEmpty()) {
return Iterators.filter(
resultIterator,
dataSegment -> {
if (intervals.isEmpty()) {
return true;
} else {
// Must re-check that the interval matches, even if comparing as string, because the *segment interval*
// might not be string-comparable. (Consider a query interval like "2000-01-01/3000-01-01" and a
// segment interval like "20010/20011".)
for (Interval interval : intervals) {
if (matchMode.apply(interval, dataSegment.getInterval())) {
return true;
} else {
// Must re-check that the interval matches, even if comparing as string, because the *segment interval*
// might not be string-comparable. (Consider a query interval like "2000-01-01/3000-01-01" and a
// segment interval like "20010/20011".)
for (Interval interval : intervals) {
if (matchMode.apply(interval, dataSegment.getInterval())) {
return true;
}
}

return false;
}
}
),
resultIterator

return false;
}
}
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.timeline.DataSegment;
Expand Down Expand Up @@ -69,6 +70,7 @@

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -1062,6 +1064,213 @@ public void testMultiIntervalUsedList() throws IOException
).containsOnlyOnce(defaultSegment3);
}

@Test
public void testRetrieveUsedSegmentsUsingMultipleIntervals() throws IOException
{
final List<DataSegment> segments = createAndGetUsedYearSegments(1900, 2133);
final List<Interval> intervals = segments.stream().map(DataSegment::getInterval).collect(Collectors.toList());

final Collection<DataSegment> actualUsedSegments = coordinator.retrieveUsedSegmentsForIntervals(
DS.WIKI,
intervals,
Segments.ONLY_VISIBLE
);

Assert.assertEquals(segments.size(), actualUsedSegments.size());
Assert.assertTrue(actualUsedSegments.containsAll(segments));
}

@Test
public void testRetrieveAllUsedSegmentsUsingIntervalsOutOfRange() throws IOException
{
final List<DataSegment> segments = createAndGetUsedYearSegments(1905, 1910);
Fixed Show fixed Hide fixed

final Interval outOfRangeInterval = Intervals.of("1700/1800");
Assert.assertTrue(segments.stream()
.anyMatch(segment -> !segment.getInterval().overlaps(outOfRangeInterval)));

final Collection<DataSegment> actualUsedSegments = coordinator.retrieveUsedSegmentsForIntervals(
DS.WIKI,
ImmutableList.of(outOfRangeInterval),
Segments.ONLY_VISIBLE
);

Assert.assertEquals(0, actualUsedSegments.size());
}

@Test
public void testRetrieveAllUsedSegmentsUsingNoIntervals() throws IOException
{
final List<DataSegment> segments = createAndGetUsedYearSegments(1900, 2133);

final Collection<DataSegment> actualUsedSegments = coordinator.retrieveAllUsedSegments(
DS.WIKI,
Segments.ONLY_VISIBLE
);

Assert.assertEquals(segments.size(), actualUsedSegments.size());
Assert.assertTrue(actualUsedSegments.containsAll(segments));
}

@Test
public void testRetrieveUnusedSegmentsUsingSingleIntervalAndNoLimit() throws IOException
{
final List<DataSegment> segments = createAndGetUsedYearSegments(1900, 2133);
markAllSegmentsUnused(new HashSet<>(segments));

final List<DataSegment> actualUnusedSegments = coordinator.retrieveUnusedSegmentsForInterval(
DS.WIKI,
Intervals.of("1900/3000"),
null
);

Assert.assertEquals(segments.size(), actualUnusedSegments.size());
Assert.assertTrue(actualUnusedSegments.containsAll(segments));
}

@Test
public void testRetrieveUnusedSegmentsUsingSingleIntervalAndLimitAtRange() throws IOException
{
final List<DataSegment> segments = createAndGetUsedYearSegments(1900, 2133);
markAllSegmentsUnused(new HashSet<>(segments));

final int requestedLimit = segments.size();
final List<DataSegment> actualUnusedSegments = coordinator.retrieveUnusedSegmentsForInterval(
DS.WIKI,
Intervals.of("1900/3000"),
requestedLimit
);

Assert.assertEquals(requestedLimit, actualUnusedSegments.size());
Assert.assertTrue(actualUnusedSegments.containsAll(segments));
}

@Test
public void testRetrieveUnusedSegmentsUsingSingleIntervalAndLimitInRange() throws IOException
{
final List<DataSegment> segments = createAndGetUsedYearSegments(1900, 2133);
markAllSegmentsUnused(new HashSet<>(segments));

final int requestedLimit = segments.size() - 1;
final List<DataSegment> actualUnusedSegments = coordinator.retrieveUnusedSegmentsForInterval(
DS.WIKI,
Intervals.of("1900/3000"),
requestedLimit
);

Assert.assertEquals(requestedLimit, actualUnusedSegments.size());
Assert.assertTrue(actualUnusedSegments.containsAll(segments.stream().limit(requestedLimit).collect(Collectors.toList())));
}

@Test
public void testRetrieveUnusedSegmentsUsingSingleIntervalAndLimitOutOfRange() throws IOException
{
final List<DataSegment> segments = createAndGetUsedYearSegments(1900, 2133);
markAllSegmentsUnused(new HashSet<>(segments));

final int limit = segments.size() + 1;
final List<DataSegment> actualUnusedSegments = coordinator.retrieveUnusedSegmentsForInterval(
DS.WIKI,
Intervals.of("1900/3000"),
limit
);
Assert.assertEquals(segments.size(), actualUnusedSegments.size());
Assert.assertTrue(actualUnusedSegments.containsAll(segments));
}

@Test
public void testRetrieveUnusedSegmentsUsingSingleIntervalOutOfRange() throws IOException
{
final List<DataSegment> segments = createAndGetUsedYearSegments(1905, 1910);
markAllSegmentsUnused(new HashSet<>(segments));

final Interval outOfRangeInterval = Intervals.of("1700/1800");
Assert.assertTrue(segments.stream()
.anyMatch(segment -> !segment.getInterval().overlaps(outOfRangeInterval)));
final int limit = segments.size() + 1;

final List<DataSegment> actualUnusedSegments = coordinator.retrieveUnusedSegmentsForInterval(
DS.WIKI,
outOfRangeInterval,
limit
);
Assert.assertEquals(0, actualUnusedSegments.size());
}

@Test
public void testRetrieveUnusedSegmentsUsingMultipleIntervalsAndNoLimit() throws IOException
{
final List<DataSegment> segments = createAndGetUsedYearSegments(1900, 2133);
markAllSegmentsUnused(new HashSet<>(segments));

final ImmutableList<DataSegment> actualUnusedSegments = retrieveUnusedSegmentsUsingMultipleIntervalsAndLimit(
segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()),
null
);
Assert.assertEquals(segments.size(), actualUnusedSegments.size());
Assert.assertTrue(segments.containsAll(actualUnusedSegments));
}

@Test
public void testRetrieveUnusedSegmentsUsingMultipleIntervalsAndLimitAtRange() throws IOException
{
final List<DataSegment> segments = createAndGetUsedYearSegments(1900, 2133);
markAllSegmentsUnused(new HashSet<>(segments));

final ImmutableList<DataSegment> actualUnusedSegments = retrieveUnusedSegmentsUsingMultipleIntervalsAndLimit(
segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()),
segments.size()
);
Assert.assertEquals(segments.size(), actualUnusedSegments.size());
Assert.assertTrue(segments.containsAll(actualUnusedSegments));
}

@Test
public void testRetrieveUnusedSegmentsUsingMultipleIntervalsAndLimitInRange() throws IOException
{
final List<DataSegment> segments = createAndGetUsedYearSegments(1900, 2133);
markAllSegmentsUnused(new HashSet<>(segments));

final int requestedLimit = segments.size() - 1;
final ImmutableList<DataSegment> actualUnusedSegments = retrieveUnusedSegmentsUsingMultipleIntervalsAndLimit(
segments.stream().limit(requestedLimit).map(DataSegment::getInterval).collect(Collectors.toList()),
requestedLimit
);
Assert.assertEquals(requestedLimit, actualUnusedSegments.size());
Assert.assertTrue(actualUnusedSegments.containsAll(segments.stream().limit(requestedLimit).collect(Collectors.toList())));
}

@Test
public void testRetrieveUnusedSegmentsUsingMultipleIntervalsAndLimitOutOfRange() throws IOException
{
final List<DataSegment> segments = createAndGetUsedYearSegments(1900, 2133);
markAllSegmentsUnused(new HashSet<>(segments));

final ImmutableList<DataSegment> actualUnusedSegments = retrieveUnusedSegmentsUsingMultipleIntervalsAndLimit(
segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()),
segments.size() + 1
);
Assert.assertEquals(segments.size(), actualUnusedSegments.size());
Assert.assertTrue(actualUnusedSegments.containsAll(segments));
}

@Test
public void testRetrieveUnusedSegmentsUsingIntervalOutOfRange() throws IOException
{
final List<DataSegment> segments = createAndGetUsedYearSegments(1905, 1910);
markAllSegmentsUnused(new HashSet<>(segments));

final Interval outOfRangeInterval = Intervals.of("1700/1800");
Assert.assertTrue(segments.stream()
.anyMatch(segment -> !segment.getInterval().overlaps(outOfRangeInterval)));

final ImmutableList<DataSegment> actualUnusedSegments = retrieveUnusedSegmentsUsingMultipleIntervalsAndLimit(
ImmutableList.of(outOfRangeInterval),
null
);
Assert.assertEquals(0, actualUnusedSegments.size());
}

@Test
public void testSimpleUnusedList() throws IOException
{
Expand Down Expand Up @@ -2713,4 +2922,43 @@ private DataSegment createSegment(Interval interval, String version, ShardSpec s
.size(100)
.build();
}

private List<DataSegment> createAndGetUsedYearSegments(final int startYear, final int endYear) throws IOException
{
final List<DataSegment> segments = new ArrayList<>();

for (int year = startYear; year < endYear; year++) {
segments.add(createSegment(
Intervals.of("%d/%d", year, year + 1),
"version",
new LinearShardSpec(0))
);
}
final Set<DataSegment> segmentsSet = new HashSet<>(segments);
final Set<DataSegment> committedSegments = coordinator.commitSegments(segmentsSet);
Assert.assertTrue(committedSegments.containsAll(new HashSet<>(segments)));

return segments;
}

private ImmutableList<DataSegment> retrieveUnusedSegmentsUsingMultipleIntervalsAndLimit(
final List<Interval> intervals,
final Integer limit
)
{
return derbyConnector.inReadOnlyTransaction(
(handle, status) -> {
try (final CloseableIterator<DataSegment> iterator =
SqlSegmentsMetadataQuery.forHandle(
handle,
derbyConnector,
derbyConnectorRule.metadataTablesConfigSupplier().get(),
mapper
)
.retrieveUnusedSegments(DS.WIKI, intervals, limit)) {
return ImmutableList.copyOf(iterator);
}
}
);
}
}
Loading