Skip to content

Commit

Permalink
Use intermediate-persist IndexSpec during multiphase merge. (#11940)
Browse files Browse the repository at this point in the history
* Use intermediate-persist IndexSpec during multiphase merge.

The main change is the addition of an intermediate-persist IndexSpec
to the main "merge" method in IndexMerger. There are also a few minor
adjustments to the IndexMerger interface to encourage more harmonious
usage of its methods in the future.

* Additional changes inspired by the test coverage checker.

- Remove unused-in-production IndexMerger methods "append" and "convert".
- Add additional unit tests to UnifiedIndexerAppenderatorsManager.

* Additional adjustments.

* Even more additional adjustments.

* Test fixes.
  • Loading branch information
gianm authored Nov 29, 2021
1 parent f536f31 commit f6e6ca2
Show file tree
Hide file tree
Showing 19 changed files with 412 additions and 1,701 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,13 @@ public static List<DataSegmentAndIndexZipFilePath> getPublishedSegmentAndIndexZi
FileSystem fs = descriptorInfoDir.getFileSystem(conf);

for (FileStatus status : fs.listStatus(descriptorInfoDir)) {
final DataSegmentAndIndexZipFilePath segmentAndIndexZipFilePath = jsonMapper.readValue((InputStream) fs.open(status.getPath()), DataSegmentAndIndexZipFilePath.class);
final DataSegmentAndIndexZipFilePath segmentAndIndexZipFilePath = jsonMapper.readValue((InputStream) fs.open(
status.getPath()), DataSegmentAndIndexZipFilePath.class);
publishedSegmentAndIndexZipFilePathsBuilder.add(segmentAndIndexZipFilePath);
log.info("Adding segment %s to the list of published segments", segmentAndIndexZipFilePath.getSegment().getId());
log.info(
"Adding segment %s to the list of published segments",
segmentAndIndexZipFilePath.getSegment().getId()
);
}
}
catch (FileNotFoundException e) {
Expand Down Expand Up @@ -303,10 +307,10 @@ private static IncrementalIndex makeIncrementalIndex(

// Build the incremental-index according to the spec that was chosen by the user
IncrementalIndex newIndex = tuningConfig.getAppendableIndexSpec().builder()
.setIndexSchema(indexSchema)
.setMaxRowCount(tuningConfig.getMaxRowsInMemory())
.setMaxBytesInMemory(tuningConfig.getMaxBytesInMemoryOrDefault())
.build();
.setIndexSchema(indexSchema)
.setMaxRowCount(tuningConfig.getMaxRowsInMemory())
.setMaxBytesInMemory(tuningConfig.getMaxBytesInMemoryOrDefault())
.build();

if (oldDimOrder != null && !indexSchema.getDimensionsSpec().hasCustomDimensions()) {
newIndex.loadDimensionIterable(oldDimOrder, oldCapabilities);
Expand Down Expand Up @@ -609,7 +613,18 @@ protected File mergeQueryableIndex(
{
boolean rollup = config.getSchema().getDataSchema().getGranularitySpec().isRollup();
return HadoopDruidIndexerConfig.INDEX_MERGER_V9
.mergeQueryableIndex(indexes, rollup, aggs, null, file, config.getIndexSpec(), progressIndicator, null, -1);
.mergeQueryableIndex(
indexes,
rollup,
aggs,
null,
file,
config.getIndexSpec(),
config.getIndexSpecForIntermediatePersists(),
progressIndicator,
null,
-1
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.BaseProgressIndicator;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMerger;
import org.apache.druid.segment.IndexMergerV9;
Expand Down Expand Up @@ -339,8 +340,11 @@ private static Pair<File, List<String>> mergeSegmentsInSamePartition(
indexesToMerge,
dataSchema.getGranularitySpec().isRollup(),
dataSchema.getAggregators(),
null,
outDir,
tuningConfig.getIndexSpec(),
tuningConfig.getIndexSpecForIntermediatePersists(),
new BaseProgressIndicator(),
tuningConfig.getSegmentWriteOutMediumFactory(),
tuningConfig.getMaxColumnsToMerge()
)
Expand Down
114 changes: 76 additions & 38 deletions processing/src/main/java/org/apache/druid/segment/IndexMerger.java
Original file line number Diff line number Diff line change
Expand Up @@ -188,33 +188,60 @@ static <T extends Comparable<? super T>> ArrayList<T> mergeIndexed(List<Iterable
return Lists.newArrayList(retVal);
}

File persist(
/**
* Equivalent to {@link #persist(IncrementalIndex, Interval, File, IndexSpec, ProgressIndicator, SegmentWriteOutMediumFactory)}
* without a progress indicator and with interval set to {@link IncrementalIndex#getInterval()}.
*/
default File persist(
IncrementalIndex index,
File outDir,
IndexSpec indexSpec,
@Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory
) throws IOException;
) throws IOException
{
return persist(
index,
index.getInterval(),
outDir,
indexSpec,
new BaseProgressIndicator(),
segmentWriteOutMediumFactory
);
}

/**
* This is *not* thread-safe and havok will ensue if this is called and writes are still occurring
* on the IncrementalIndex object.
*
* @param index the IncrementalIndex to persist
* @param dataInterval the Interval that the data represents
* @param outDir the directory to persist the data to
*
* @return the index output directory
*
* @throws IOException if an IO error occurs persisting the index
* Equivalent to {@link #persist(IncrementalIndex, Interval, File, IndexSpec, ProgressIndicator, SegmentWriteOutMediumFactory)}
* without a progress indicator.
*/
File persist(
default File persist(
IncrementalIndex index,
Interval dataInterval,
File outDir,
IndexSpec indexSpec,
@Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory
) throws IOException;
) throws IOException
{
return persist(index, dataInterval, outDir, indexSpec, new BaseProgressIndicator(), segmentWriteOutMediumFactory);
}

/**
* Persist an IncrementalIndex to disk in such a way that it can be loaded back up as a {@link QueryableIndex}.
*
* This is *not* thread-safe and havoc will ensue if this is called and writes are still occurring on the
* IncrementalIndex object.
*
* @param index the IncrementalIndex to persist
* @param dataInterval the Interval that the data represents. Typically, this is the same as the
* interval from the corresponding {@link org.apache.druid.timeline.SegmentId}.
* @param outDir the directory to persist the data to
* @param indexSpec storage and compression options
* @param progress an object that will receive progress updates
* @param segmentWriteOutMediumFactory controls allocation of temporary data structures
*
* @return the index output directory
*
* @throws IOException if an IO error occurs persisting the index
*/
File persist(
IncrementalIndex index,
Interval dataInterval,
Expand All @@ -224,39 +251,61 @@ File persist(
@Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory
) throws IOException;

File mergeQueryableIndex(
List<QueryableIndex> indexes,
boolean rollup,
AggregatorFactory[] metricAggs,
File outDir,
IndexSpec indexSpec,
@Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
int maxColumnsToMerge
) throws IOException;

File mergeQueryableIndex(
/**
* Merge a collection of {@link QueryableIndex}.
*
* Only used as a convenience method in tests. In production code, use the full version
* {@link #mergeQueryableIndex(List, boolean, AggregatorFactory[], DimensionsSpec, File, IndexSpec, IndexSpec, ProgressIndicator, SegmentWriteOutMediumFactory, int)}.
*/
@VisibleForTesting
default File mergeQueryableIndex(
List<QueryableIndex> indexes,
boolean rollup,
AggregatorFactory[] metricAggs,
@Nullable DimensionsSpec dimensionsSpec,
File outDir,
IndexSpec indexSpec,
@Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
int maxColumnsToMerge
) throws IOException;
) throws IOException
{
return mergeQueryableIndex(
indexes,
rollup,
metricAggs,
null,
outDir,
indexSpec,
indexSpec,
new BaseProgressIndicator(),
segmentWriteOutMediumFactory,
maxColumnsToMerge
);
}

/**
* Merge a collection of {@link QueryableIndex}.
*/
File mergeQueryableIndex(
List<QueryableIndex> indexes,
boolean rollup,
AggregatorFactory[] metricAggs,
@Nullable DimensionsSpec dimensionsSpec,
File outDir,
IndexSpec indexSpec,
IndexSpec indexSpecForIntermediatePersists,
ProgressIndicator progress,
@Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
int maxColumnsToMerge
) throws IOException;

/**
* Only used as a convenience method in tests.
*
* In production code, to merge multiple {@link QueryableIndex}, use
* {@link #mergeQueryableIndex(List, boolean, AggregatorFactory[], DimensionsSpec, File, IndexSpec, IndexSpec, ProgressIndicator, SegmentWriteOutMediumFactory, int)}.
* To merge multiple {@link IncrementalIndex}, call one of the {@link #persist} methods and then merge the resulting
* {@link QueryableIndex}.
*/
@VisibleForTesting
File merge(
List<IndexableAdapter> indexes,
Expand All @@ -267,17 +316,6 @@ File merge(
int maxColumnsToMerge
) throws IOException;

// Faster than IndexMaker
File convert(File inDir, File outDir, IndexSpec indexSpec) throws IOException;

File append(
List<IndexableAdapter> indexes,
AggregatorFactory[] aggregators,
File outDir,
IndexSpec indexSpec,
@Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory
) throws IOException;

/**
* This method applies {@link DimensionMerger#convertSortedSegmentRowValuesToMergedRowValues(int, ColumnValueSelector)} to
* all dimension column selectors of the given sourceRowIterator, using the given index number.
Expand Down
Loading

0 comments on commit f6e6ca2

Please sign in to comment.