Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement mutable index using index SPI #10687

Merged
merged 16 commits into from
May 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,12 @@
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.controller.recommender.data.generator.DataGenerator;
Expand All @@ -52,7 +50,9 @@
import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver;
import org.apache.pinot.segment.spi.index.StandardIndexes;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.spi.config.table.IndexConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.DateTimeFormatSpec;
import org.apache.pinot.spi.data.FieldSpec;
Expand All @@ -77,22 +77,20 @@ public class MemoryEstimator {

private final TableConfig _tableConfig;
private final String _tableNameWithType;
private final Schema _schema;
private final File _sampleCompletedSegment;
private final long _sampleSegmentConsumedSeconds;
private final int _totalDocsInSampleSegment;
private final long _maxUsableHostMemory;
private final int _tableRetentionHours;

private SegmentMetadataImpl _segmentMetadata;
private long _sampleCompletedSegmentSizeBytes;
private Set<String> _invertedIndexColumns = new HashSet<>();
private Set<String> _noDictionaryColumns = new HashSet<>();
private Set<String> _varLengthDictionaryColumns = new HashSet<>();
private final SegmentMetadataImpl _segmentMetadata;
private final long _sampleCompletedSegmentSizeBytes;
int _avgMultiValues;

// Working dir will contain statsFile and also the generated segment if requested.
// It will get deleted after memory estimation is done.
private File _workingDir;
private final File _workingDir;

private String[][] _activeMemoryPerHost;
private String[][] _optimalSegmentSize;
Expand All @@ -103,11 +101,12 @@ public class MemoryEstimator {
/**
* Constructor used for processing the given completed segment
*/
public MemoryEstimator(TableConfig tableConfig, File sampleCompletedSegment, double ingestionRatePerPartition,
long maxUsableHostMemory, int tableRetentionHours, File workingDir) {
public MemoryEstimator(TableConfig tableConfig, Schema schema, File sampleCompletedSegment,
double ingestionRatePerPartition, long maxUsableHostMemory, int tableRetentionHours, File workingDir) {
_maxUsableHostMemory = maxUsableHostMemory;
_tableConfig = tableConfig;
_tableNameWithType = tableConfig.getTableName();
_schema = schema;
_sampleCompletedSegment = sampleCompletedSegment;
_tableRetentionHours = tableRetentionHours;

Expand All @@ -120,15 +119,6 @@ public MemoryEstimator(TableConfig tableConfig, File sampleCompletedSegment, dou
_totalDocsInSampleSegment = _segmentMetadata.getTotalDocs();
_sampleSegmentConsumedSeconds = (int) (_totalDocsInSampleSegment / ingestionRatePerPartition);

if (CollectionUtils.isNotEmpty(_tableConfig.getIndexingConfig().getNoDictionaryColumns())) {
_noDictionaryColumns.addAll(_tableConfig.getIndexingConfig().getNoDictionaryColumns());
}
if (CollectionUtils.isNotEmpty(_tableConfig.getIndexingConfig().getVarLengthDictionaryColumns())) {
_varLengthDictionaryColumns.addAll(_tableConfig.getIndexingConfig().getVarLengthDictionaryColumns());
}
if (CollectionUtils.isNotEmpty(_tableConfig.getIndexingConfig().getInvertedIndexColumns())) {
_invertedIndexColumns.addAll(_tableConfig.getIndexingConfig().getInvertedIndexColumns());
}
_avgMultiValues = getAvgMultiValues();
_workingDir = workingDir;
}
Expand All @@ -139,7 +129,8 @@ public MemoryEstimator(TableConfig tableConfig, File sampleCompletedSegment, dou
public MemoryEstimator(TableConfig tableConfig, Schema schema, SchemaWithMetaData schemaWithMetadata,
int numberOfRows, double ingestionRatePerPartition, long maxUsableHostMemory, int tableRetentionHours,
File workingDir) {
this(tableConfig, generateCompletedSegment(schemaWithMetadata, schema, tableConfig, numberOfRows, workingDir),
this(tableConfig, schema,
generateCompletedSegment(schemaWithMetadata, schema, tableConfig, numberOfRows, workingDir),
ingestionRatePerPartition, maxUsableHostMemory, tableRetentionHours, workingDir);
}

Expand Down Expand Up @@ -167,13 +158,11 @@ public File initializeStatsHistory() {

// create a config
RealtimeSegmentConfig.Builder realtimeSegmentConfigBuilder =
new RealtimeSegmentConfig.Builder().setTableNameWithType(_tableNameWithType)
new RealtimeSegmentConfig.Builder(_tableConfig, _schema).setTableNameWithType(_tableNameWithType)
.setSegmentName(_segmentMetadata.getName()).setStreamName(_tableNameWithType)
.setSchema(_segmentMetadata.getSchema()).setCapacity(_segmentMetadata.getTotalDocs())
.setAvgNumMultiValues(_avgMultiValues).setNoDictionaryColumns(_noDictionaryColumns)
.setVarLengthDictionaryColumns(_varLengthDictionaryColumns).setInvertedIndexColumns(_invertedIndexColumns)
.setSegmentZKMetadata(segmentZKMetadata).setOffHeap(true).setMemoryManager(memoryManager)
.setStatsHistory(sampleStatsHistory);
.setAvgNumMultiValues(_avgMultiValues).setSegmentZKMetadata(segmentZKMetadata).setOffHeap(true)
.setMemoryManager(memoryManager).setStatsHistory(sampleStatsHistory);

// create mutable segment impl
MutableSegmentImpl mutableSegmentImpl = new MutableSegmentImpl(realtimeSegmentConfigBuilder.build(), null);
Expand Down Expand Up @@ -256,12 +245,14 @@ public void estimateMemoryUsed(File statsFile, int[] numHosts, int[] numHours, f
}

try {
int invertedColumnsCount = countInvertedColumns();

for (int i = 0; i < numHours.length; i++) {
int numHoursToConsume = numHours[i];
if (numHoursToConsume > retentionHours) {
continue;
}
long secondsToConsume = numHoursToConsume * 3600;
long secondsToConsume = numHoursToConsume * 3600L;
// consuming for _numHoursSampleSegmentConsumed, gives size sampleCompletedSegmentSizeBytes
// hence, consuming for numHoursToConsume would give:
long completedSegmentSizeBytes =
Expand All @@ -272,7 +263,8 @@ public void estimateMemoryUsed(File statsFile, int[] numHosts, int[] numHours, f
int totalDocs = (int) (((double) secondsToConsume / _sampleSegmentConsumedSeconds) * _totalDocsInSampleSegment);
long memoryForConsumingSegmentPerPartition = getMemoryForConsumingSegmentPerPartition(statsFile, totalDocs);

memoryForConsumingSegmentPerPartition += getMemoryForInvertedIndex(memoryForConsumingSegmentPerPartition);
memoryForConsumingSegmentPerPartition += getMemoryForInvertedIndex(
memoryForConsumingSegmentPerPartition, invertedColumnsCount);

int numActiveSegmentsPerPartition = (retentionHours + numHoursToConsume - 1) / numHoursToConsume;
long activeMemoryForCompletedSegmentsPerPartition =
Expand Down Expand Up @@ -329,11 +321,10 @@ private long getMemoryForConsumingSegmentPerPartition(File statsFile, int totalD
SegmentZKMetadata segmentZKMetadata = getSegmentZKMetadata(_segmentMetadata, totalDocs);

RealtimeSegmentConfig.Builder realtimeSegmentConfigBuilder =
new RealtimeSegmentConfig.Builder().setTableNameWithType(_tableNameWithType)
new RealtimeSegmentConfig.Builder(_tableConfig, _schema).setTableNameWithType(_tableNameWithType)
.setSegmentName(_segmentMetadata.getName()).setStreamName(_tableNameWithType)
.setSchema(_segmentMetadata.getSchema()).setCapacity(totalDocs).setAvgNumMultiValues(_avgMultiValues)
.setNoDictionaryColumns(_noDictionaryColumns).setVarLengthDictionaryColumns(_varLengthDictionaryColumns)
.setInvertedIndexColumns(_invertedIndexColumns).setSegmentZKMetadata(segmentZKMetadata).setOffHeap(true)
.setSegmentZKMetadata(segmentZKMetadata).setOffHeap(true)
.setMemoryManager(memoryManager).setStatsHistory(statsHistory);

// create mutable segment impl
Expand Down Expand Up @@ -384,16 +375,23 @@ private int getAvgMultiValues() {
* @param totalMemoryForConsumingSegment
* @return
*/
private long getMemoryForInvertedIndex(long totalMemoryForConsumingSegment) {
private long getMemoryForInvertedIndex(long totalMemoryForConsumingSegment, int invertedColumnsCount) {
// TODO: better way to estimate inverted indexes memory utilization
long totalInvertedIndexSizeBytes = 0;
if (!_invertedIndexColumns.isEmpty()) {
if (invertedColumnsCount > 0) {
long memoryForEachColumn = totalMemoryForConsumingSegment / _segmentMetadata.getAllColumns().size();
totalInvertedIndexSizeBytes = (long) (memoryForEachColumn * 0.3 * _invertedIndexColumns.size());
totalInvertedIndexSizeBytes = (long) (memoryForEachColumn * 0.3 * invertedColumnsCount);
}
return totalInvertedIndexSizeBytes;
}

private int countInvertedColumns() {
Map<String, IndexConfig> invertedConfig = StandardIndexes.inverted().getConfig(_tableConfig, _schema);
return (int) invertedConfig.values().stream()
.filter(IndexConfig::isEnabled)
.count();
}

/**
* Creates a sample segment ZK metadata for the given segment metadata
* @param segmentMetadata
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,19 +180,19 @@ public HLRealtimeSegmentDataManager(final SegmentZKMetadata segmentZKMetadata, f
// lets create a new realtime segment
_segmentLogger.info("Started {} stream provider", _streamConfig.getType());
final int capacity = _streamConfig.getFlushThresholdRows();
boolean nullHandlingEnabled = indexingConfig != null && indexingConfig.isNullHandlingEnabled();
RealtimeSegmentConfig realtimeSegmentConfig =
new RealtimeSegmentConfig.Builder().setTableNameWithType(_tableNameWithType).setSegmentName(_segmentName)
new RealtimeSegmentConfig.Builder(indexLoadingConfig).setTableNameWithType(_tableNameWithType)
.setSegmentName(_segmentName)
.setStreamName(_streamConfig.getTopicName()).setSchema(schema).setTimeColumnName(_timeColumnName)
.setCapacity(capacity).setAvgNumMultiValues(indexLoadingConfig.getRealtimeAvgMultiValueCount())
.setNoDictionaryColumns(indexLoadingConfig.getNoDictionaryColumns())
.setVarLengthDictionaryColumns(indexLoadingConfig.getVarLengthDictionaryColumns())
.setInvertedIndexColumns(invertedIndexColumns).setSegmentZKMetadata(segmentZKMetadata)
.setSegmentZKMetadata(segmentZKMetadata)
.setOffHeap(indexLoadingConfig.isRealtimeOffHeapAllocation()).setMemoryManager(
getMemoryManager(realtimeTableDataManager.getConsumerDir(), _segmentName,
indexLoadingConfig.isRealtimeOffHeapAllocation(),
indexLoadingConfig.isDirectRealtimeOffHeapAllocation(), serverMetrics))
.setStatsHistory(realtimeTableDataManager.getStatsHistory())
.setNullHandlingEnabled(indexingConfig.isNullHandlingEnabled()).build();
.setNullHandlingEnabled(nullHandlingEnabled).build();
_realtimeSegment = new MutableSegmentImpl(realtimeSegmentConfig, serverMetrics);

_notifier = realtimeTableDataManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1396,16 +1396,11 @@ public LLRealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableCo
// Start new realtime segment
String consumerDir = realtimeTableDataManager.getConsumerDir();
RealtimeSegmentConfig.Builder realtimeSegmentConfigBuilder =
new RealtimeSegmentConfig.Builder().setTableNameWithType(_tableNameWithType).setSegmentName(_segmentNameStr)
new RealtimeSegmentConfig.Builder(indexLoadingConfig).setTableNameWithType(_tableNameWithType)
.setSegmentName(_segmentNameStr)
.setStreamName(streamTopic).setSchema(_schema).setTimeColumnName(timeColumnName)
.setCapacity(_segmentMaxRowCount).setAvgNumMultiValues(indexLoadingConfig.getRealtimeAvgMultiValueCount())
.setNoDictionaryColumns(indexLoadingConfig.getNoDictionaryColumns())
.setVarLengthDictionaryColumns(indexLoadingConfig.getVarLengthDictionaryColumns())
.setInvertedIndexColumns(indexLoadingConfig.getInvertedIndexColumns())
.setTextIndexColumns(indexLoadingConfig.getTextIndexColumns())
.setFSTIndexColumns(indexLoadingConfig.getFSTIndexColumns())
.setJsonIndexConfigs(indexLoadingConfig.getJsonIndexConfigs())
.setH3IndexConfigs(indexLoadingConfig.getH3IndexConfigs()).setSegmentZKMetadata(segmentZKMetadata)
.setSegmentZKMetadata(segmentZKMetadata)
.setOffHeap(_isOffHeap).setMemoryManager(_memoryManager)
.setStatsHistory(realtimeTableDataManager.getStatsHistory())
.setAggregateMetrics(indexingConfig.isAggregateMetrics())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,7 @@ private IndexSegment createRealtimeSegment(int index)

RealtimeSegmentConfig realtimeSegmentConfig = new RealtimeSegmentConfig.Builder()
.setTableNameWithType(REALTIME_TABLE_NAME).setSegmentName(segmentName).setSchema(SCHEMA).setCapacity(100000)
.setAvgNumMultiValues(2).setNoDictionaryColumns(Collections.emptySet())
.setJsonIndexConfigs(Collections.emptyMap()).setVarLengthDictionaryColumns(Collections.emptySet())
.setInvertedIndexColumns(Collections.emptySet()).setSegmentZKMetadata(new SegmentZKMetadata(segmentName))
.setAvgNumMultiValues(2).setSegmentZKMetadata(new SegmentZKMetadata(segmentName))
.setMemoryManager(new DirectMemoryManager(segmentName)).setStatsHistory(statsHistory).setAggregateMetrics(false)
.setNullHandlingEnabled(true).setIngestionAggregationConfigs(Collections.emptyList()).build();
MutableSegment mutableSegmentImpl = new MutableSegmentImpl(realtimeSegmentConfig, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@
import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory;
import org.apache.pinot.segment.local.segment.virtualcolumn.VirtualColumnProviderFactory;
import org.apache.pinot.segment.spi.creator.SegmentVersion;
import org.apache.pinot.segment.spi.index.DictionaryIndexConfig;
import org.apache.pinot.segment.spi.index.StandardIndexes;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.spi.config.table.IndexConfig;
import org.apache.pinot.spi.config.table.IndexingConfig;
import org.apache.pinot.spi.config.table.SegmentZKPropsConfig;
import org.apache.pinot.spi.config.table.TableConfig;
Expand Down Expand Up @@ -105,12 +108,15 @@ public void testNoRecordsIndexed()
String segmentName = "testTable__0__0__123456";
IndexingConfig indexingConfig = tableConfig.getIndexingConfig();

DictionaryIndexConfig varLengthDictConf = new DictionaryIndexConfig(false, true);

RealtimeSegmentConfig.Builder realtimeSegmentConfigBuilder =
new RealtimeSegmentConfig.Builder().setTableNameWithType(tableNameWithType).setSegmentName(segmentName)
.setStreamName(tableNameWithType).setSchema(schema).setTimeColumnName(DATE_TIME_COLUMN).setCapacity(1000)
.setAvgNumMultiValues(3).setNoDictionaryColumns(Sets.newHashSet(LONG_COLUMN2))
.setVarLengthDictionaryColumns(Sets.newHashSet(STRING_COLUMN3))
.setInvertedIndexColumns(Sets.newHashSet(STRING_COLUMN1))
.setAvgNumMultiValues(3)
.setIndex(Sets.newHashSet(LONG_COLUMN2), StandardIndexes.dictionary(), DictionaryIndexConfig.DISABLED)
.setIndex(Sets.newHashSet(Sets.newHashSet(STRING_COLUMN3)), StandardIndexes.dictionary(), varLengthDictConf)
.setIndex(Sets.newHashSet(STRING_COLUMN1), StandardIndexes.inverted(), IndexConfig.ENABLED)
.setSegmentZKMetadata(getSegmentZKMetadata(segmentName)).setOffHeap(true)
.setMemoryManager(new DirectMemoryManager(segmentName))
.setStatsHistory(RealtimeSegmentStatsHistory.deserialzeFrom(new File(tmpDir, "stats")))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@
*/
package org.apache.pinot.integration.tests;

import com.google.common.base.Function;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.DriverManager;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -603,17 +603,9 @@ protected void waitForAllDocsLoaded(long timeoutMs)

protected void waitForDocsLoaded(long timeoutMs, boolean raiseError, String tableName) {
final long countStarResult = getCountStarResult();
TestUtils.waitForCondition(new Function<Void, Boolean>() {
@Nullable
@Override
public Boolean apply(@Nullable Void aVoid) {
try {
return getCurrentCountStarResult(tableName) == countStarResult;
} catch (Exception e) {
return null;
}
}
}, 100L, timeoutMs, String.format("Failed to load %d documents", countStarResult), raiseError);
TestUtils.waitForCondition(
() -> getCurrentCountStarResult(tableName) == countStarResult, 100L, timeoutMs,
"Failed to load " + countStarResult + " documents", raiseError, Duration.ofMillis(timeoutMs / 10));
}

/**
Expand Down
Loading