Skip to content

Commit

Permalink
Batch segment retrieval from the metadata store (apache#15305)
Browse files Browse the repository at this point in the history
* Add a unit test that fails when used segments with too many intervals are retrieved.

- This is a failing test case that needs to be ignored.

* Batch the intervals (use 100 as it's consistent with batching in other places).

* move the filtering inside the batch

* Account for limit cross the batch splits.

* Adjustments

* Fixup and add tests

* small refactor

* add more tests.

* remove wrapper.

* Minor edits

* assert out of range
  • Loading branch information
abhishekrb19 authored and ycp2 committed Nov 17, 2023
1 parent 8661c47 commit 4663a5a
Show file tree
Hide file tree
Showing 2 changed files with 309 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.UnmodifiableIterator;
import org.apache.druid.java.util.common.CloseableIterators;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
Expand All @@ -40,6 +42,7 @@

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
Expand All @@ -56,6 +59,13 @@ public class SqlSegmentsMetadataQuery
{
private static final Logger log = new Logger(SqlSegmentsMetadataQuery.class);

/**
* Maximum number of intervals to consider for a batch.
* This is similar to {@link IndexerSQLMetadataStorageCoordinator#MAX_NUM_SEGMENTS_TO_ANNOUNCE_AT_ONCE}, but imposed
* on the intervals size.
*/
private static final int MAX_INTERVALS_PER_BATCH = 100;

private final Handle handle;
private final SQLMetadataConnector connector;
private final MetadataStorageTablesConfig dbTables;
Expand Down Expand Up @@ -344,6 +354,42 @@ private CloseableIterator<DataSegment> retrieveSegments(
final boolean used,
@Nullable final Integer limit
)
{
if (intervals.isEmpty()) {
return CloseableIterators.withEmptyBaggage(
retrieveSegmentsInIntervalsBatch(dataSource, intervals, matchMode, used, limit)
);
} else {
final List<List<Interval>> intervalsLists = Lists.partition(new ArrayList<>(intervals), MAX_INTERVALS_PER_BATCH);
final List<Iterator<DataSegment>> resultingIterators = new ArrayList<>();
Integer limitPerBatch = limit;

for (final List<Interval> intervalList : intervalsLists) {
final UnmodifiableIterator<DataSegment> iterator = retrieveSegmentsInIntervalsBatch(dataSource, intervalList, matchMode, used, limitPerBatch);
if (limitPerBatch != null) {
// If limit is provided, we need to shrink the limit for subsequent batches or circuit break if
// we have reached what was requested for.
final List<DataSegment> dataSegments = ImmutableList.copyOf(iterator);
resultingIterators.add(dataSegments.iterator());
if (dataSegments.size() >= limitPerBatch) {
break;
}
limitPerBatch -= dataSegments.size();
} else {
resultingIterators.add(iterator);
}
}
return CloseableIterators.withEmptyBaggage(Iterators.concat(resultingIterators.iterator()));
}
}

private UnmodifiableIterator<DataSegment> retrieveSegmentsInIntervalsBatch(
final String dataSource,
final Collection<Interval> intervals,
final IntervalMode matchMode,
final boolean used,
@Nullable final Integer limit
)
{
// Check if the intervals all support comparing as strings. If so, bake them into the SQL.
final boolean compareAsString = intervals.stream().allMatch(Intervals::canCompareEndpointsAsStrings);
Expand Down Expand Up @@ -372,27 +418,24 @@ private CloseableIterator<DataSegment> retrieveSegments(
sql.map((index, r, ctx) -> JacksonUtils.readValue(jsonMapper, r.getBytes(1), DataSegment.class))
.iterator();

return CloseableIterators.wrap(
Iterators.filter(
resultIterator,
dataSegment -> {
if (intervals.isEmpty()) {
return Iterators.filter(
resultIterator,
dataSegment -> {
if (intervals.isEmpty()) {
return true;
} else {
// Must re-check that the interval matches, even if comparing as string, because the *segment interval*
// might not be string-comparable. (Consider a query interval like "2000-01-01/3000-01-01" and a
// segment interval like "20010/20011".)
for (Interval interval : intervals) {
if (matchMode.apply(interval, dataSegment.getInterval())) {
return true;
} else {
// Must re-check that the interval matches, even if comparing as string, because the *segment interval*
// might not be string-comparable. (Consider a query interval like "2000-01-01/3000-01-01" and a
// segment interval like "20010/20011".)
for (Interval interval : intervals) {
if (matchMode.apply(interval, dataSegment.getInterval())) {
return true;
}
}

return false;
}
}
),
resultIterator

return false;
}
}
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.timeline.DataSegment;
Expand Down Expand Up @@ -69,6 +70,7 @@

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -1062,6 +1064,213 @@ public void testMultiIntervalUsedList() throws IOException
).containsOnlyOnce(defaultSegment3);
}

@Test
public void testRetrieveUsedSegmentsUsingMultipleIntervals() throws IOException
{
final List<DataSegment> segments = createAndGetUsedYearSegments(1900, 2133);
final List<Interval> intervals = segments.stream().map(DataSegment::getInterval).collect(Collectors.toList());

final Collection<DataSegment> actualUsedSegments = coordinator.retrieveUsedSegmentsForIntervals(
DS.WIKI,
intervals,
Segments.ONLY_VISIBLE
);

Assert.assertEquals(segments.size(), actualUsedSegments.size());
Assert.assertTrue(actualUsedSegments.containsAll(segments));
}

@Test
public void testRetrieveAllUsedSegmentsUsingIntervalsOutOfRange() throws IOException
{
final List<DataSegment> segments = createAndGetUsedYearSegments(1905, 1910);

final Interval outOfRangeInterval = Intervals.of("1700/1800");
Assert.assertTrue(segments.stream()
.anyMatch(segment -> !segment.getInterval().overlaps(outOfRangeInterval)));

final Collection<DataSegment> actualUsedSegments = coordinator.retrieveUsedSegmentsForIntervals(
DS.WIKI,
ImmutableList.of(outOfRangeInterval),
Segments.ONLY_VISIBLE
);

Assert.assertEquals(0, actualUsedSegments.size());
}

@Test
public void testRetrieveAllUsedSegmentsUsingNoIntervals() throws IOException
{
final List<DataSegment> segments = createAndGetUsedYearSegments(1900, 2133);

final Collection<DataSegment> actualUsedSegments = coordinator.retrieveAllUsedSegments(
DS.WIKI,
Segments.ONLY_VISIBLE
);

Assert.assertEquals(segments.size(), actualUsedSegments.size());
Assert.assertTrue(actualUsedSegments.containsAll(segments));
}

@Test
public void testRetrieveUnusedSegmentsUsingSingleIntervalAndNoLimit() throws IOException
{
final List<DataSegment> segments = createAndGetUsedYearSegments(1900, 2133);
markAllSegmentsUnused(new HashSet<>(segments));

final List<DataSegment> actualUnusedSegments = coordinator.retrieveUnusedSegmentsForInterval(
DS.WIKI,
Intervals.of("1900/3000"),
null
);

Assert.assertEquals(segments.size(), actualUnusedSegments.size());
Assert.assertTrue(actualUnusedSegments.containsAll(segments));
}

@Test
public void testRetrieveUnusedSegmentsUsingSingleIntervalAndLimitAtRange() throws IOException
{
final List<DataSegment> segments = createAndGetUsedYearSegments(1900, 2133);
markAllSegmentsUnused(new HashSet<>(segments));

final int requestedLimit = segments.size();
final List<DataSegment> actualUnusedSegments = coordinator.retrieveUnusedSegmentsForInterval(
DS.WIKI,
Intervals.of("1900/3000"),
requestedLimit
);

Assert.assertEquals(requestedLimit, actualUnusedSegments.size());
Assert.assertTrue(actualUnusedSegments.containsAll(segments));
}

@Test
public void testRetrieveUnusedSegmentsUsingSingleIntervalAndLimitInRange() throws IOException
{
final List<DataSegment> segments = createAndGetUsedYearSegments(1900, 2133);
markAllSegmentsUnused(new HashSet<>(segments));

final int requestedLimit = segments.size() - 1;
final List<DataSegment> actualUnusedSegments = coordinator.retrieveUnusedSegmentsForInterval(
DS.WIKI,
Intervals.of("1900/3000"),
requestedLimit
);

Assert.assertEquals(requestedLimit, actualUnusedSegments.size());
Assert.assertTrue(actualUnusedSegments.containsAll(segments.stream().limit(requestedLimit).collect(Collectors.toList())));
}

@Test
public void testRetrieveUnusedSegmentsUsingSingleIntervalAndLimitOutOfRange() throws IOException
{
final List<DataSegment> segments = createAndGetUsedYearSegments(1900, 2133);
markAllSegmentsUnused(new HashSet<>(segments));

final int limit = segments.size() + 1;
final List<DataSegment> actualUnusedSegments = coordinator.retrieveUnusedSegmentsForInterval(
DS.WIKI,
Intervals.of("1900/3000"),
limit
);
Assert.assertEquals(segments.size(), actualUnusedSegments.size());
Assert.assertTrue(actualUnusedSegments.containsAll(segments));
}

@Test
public void testRetrieveUnusedSegmentsUsingSingleIntervalOutOfRange() throws IOException
{
final List<DataSegment> segments = createAndGetUsedYearSegments(1905, 1910);
markAllSegmentsUnused(new HashSet<>(segments));

final Interval outOfRangeInterval = Intervals.of("1700/1800");
Assert.assertTrue(segments.stream()
.anyMatch(segment -> !segment.getInterval().overlaps(outOfRangeInterval)));
final int limit = segments.size() + 1;

final List<DataSegment> actualUnusedSegments = coordinator.retrieveUnusedSegmentsForInterval(
DS.WIKI,
outOfRangeInterval,
limit
);
Assert.assertEquals(0, actualUnusedSegments.size());
}

@Test
public void testRetrieveUnusedSegmentsUsingMultipleIntervalsAndNoLimit() throws IOException
{
final List<DataSegment> segments = createAndGetUsedYearSegments(1900, 2133);
markAllSegmentsUnused(new HashSet<>(segments));

final ImmutableList<DataSegment> actualUnusedSegments = retrieveUnusedSegmentsUsingMultipleIntervalsAndLimit(
segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()),
null
);
Assert.assertEquals(segments.size(), actualUnusedSegments.size());
Assert.assertTrue(segments.containsAll(actualUnusedSegments));
}

@Test
public void testRetrieveUnusedSegmentsUsingMultipleIntervalsAndLimitAtRange() throws IOException
{
final List<DataSegment> segments = createAndGetUsedYearSegments(1900, 2133);
markAllSegmentsUnused(new HashSet<>(segments));

final ImmutableList<DataSegment> actualUnusedSegments = retrieveUnusedSegmentsUsingMultipleIntervalsAndLimit(
segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()),
segments.size()
);
Assert.assertEquals(segments.size(), actualUnusedSegments.size());
Assert.assertTrue(segments.containsAll(actualUnusedSegments));
}

@Test
public void testRetrieveUnusedSegmentsUsingMultipleIntervalsAndLimitInRange() throws IOException
{
final List<DataSegment> segments = createAndGetUsedYearSegments(1900, 2133);
markAllSegmentsUnused(new HashSet<>(segments));

final int requestedLimit = segments.size() - 1;
final ImmutableList<DataSegment> actualUnusedSegments = retrieveUnusedSegmentsUsingMultipleIntervalsAndLimit(
segments.stream().limit(requestedLimit).map(DataSegment::getInterval).collect(Collectors.toList()),
requestedLimit
);
Assert.assertEquals(requestedLimit, actualUnusedSegments.size());
Assert.assertTrue(actualUnusedSegments.containsAll(segments.stream().limit(requestedLimit).collect(Collectors.toList())));
}

@Test
public void testRetrieveUnusedSegmentsUsingMultipleIntervalsAndLimitOutOfRange() throws IOException
{
final List<DataSegment> segments = createAndGetUsedYearSegments(1900, 2133);
markAllSegmentsUnused(new HashSet<>(segments));

final ImmutableList<DataSegment> actualUnusedSegments = retrieveUnusedSegmentsUsingMultipleIntervalsAndLimit(
segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()),
segments.size() + 1
);
Assert.assertEquals(segments.size(), actualUnusedSegments.size());
Assert.assertTrue(actualUnusedSegments.containsAll(segments));
}

@Test
public void testRetrieveUnusedSegmentsUsingIntervalOutOfRange() throws IOException
{
final List<DataSegment> segments = createAndGetUsedYearSegments(1905, 1910);
markAllSegmentsUnused(new HashSet<>(segments));

final Interval outOfRangeInterval = Intervals.of("1700/1800");
Assert.assertTrue(segments.stream()
.anyMatch(segment -> !segment.getInterval().overlaps(outOfRangeInterval)));

final ImmutableList<DataSegment> actualUnusedSegments = retrieveUnusedSegmentsUsingMultipleIntervalsAndLimit(
ImmutableList.of(outOfRangeInterval),
null
);
Assert.assertEquals(0, actualUnusedSegments.size());
}

@Test
public void testSimpleUnusedList() throws IOException
{
Expand Down Expand Up @@ -2713,4 +2922,43 @@ private DataSegment createSegment(Interval interval, String version, ShardSpec s
.size(100)
.build();
}

private List<DataSegment> createAndGetUsedYearSegments(final int startYear, final int endYear) throws IOException
{
final List<DataSegment> segments = new ArrayList<>();

for (int year = startYear; year < endYear; year++) {
segments.add(createSegment(
Intervals.of("%d/%d", year, year + 1),
"version",
new LinearShardSpec(0))
);
}
final Set<DataSegment> segmentsSet = new HashSet<>(segments);
final Set<DataSegment> committedSegments = coordinator.commitSegments(segmentsSet);
Assert.assertTrue(committedSegments.containsAll(new HashSet<>(segments)));

return segments;
}

private ImmutableList<DataSegment> retrieveUnusedSegmentsUsingMultipleIntervalsAndLimit(
final List<Interval> intervals,
final Integer limit
)
{
return derbyConnector.inReadOnlyTransaction(
(handle, status) -> {
try (final CloseableIterator<DataSegment> iterator =
SqlSegmentsMetadataQuery.forHandle(
handle,
derbyConnector,
derbyConnectorRule.metadataTablesConfigSupplier().get(),
mapper
)
.retrieveUnusedSegments(DS.WIKI, intervals, limit)) {
return ImmutableList.copyOf(iterator);
}
}
);
}
}

0 comments on commit 4663a5a

Please sign in to comment.