Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fail MSQ compaction if multi-valued partition dimensions are found #17344

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.google.inject.Injector;
import org.apache.druid.client.indexing.ClientCompactionRunnerInfo;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
Expand Down Expand Up @@ -84,6 +85,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -130,7 +132,7 @@ public MSQCompactionRunner(
* The following configs aren't supported:
* <ul>
* <li>partitionsSpec of type HashedParititionsSpec.</li>
* <li>'range' partitionsSpec with non-string partition dimensions.</li>
* <li>'range' partitionsSpec with multi-valued or non-string partition dimensions.</li>
* <li>maxTotalRows in DynamicPartitionsSpec.</li>
* <li>Rollup without metricsSpec being specified or vice-versa.</li>
* <li>Any aggregatorFactory {@code A} s.t. {@code A != A.combiningFactory()}.</li>
Expand All @@ -153,13 +155,24 @@ public CompactionConfigValidationResult validateCompactionTask(
);
}
List<CompactionConfigValidationResult> validationResults = new ArrayList<>();
DataSchema dataSchema = Iterables.getOnlyElement(intervalToDataSchemaMap.values());
kfaraz marked this conversation as resolved.
Show resolved Hide resolved
if (compactionTask.getTuningConfig() != null) {
validationResults.add(
ClientCompactionRunnerInfo.validatePartitionsSpecForMSQ(
compactionTask.getTuningConfig().getPartitionsSpec(),
Iterables.getOnlyElement(intervalToDataSchemaMap.values()).getDimensionsSpec().getDimensions()
dataSchema.getDimensionsSpec().getDimensions()
)
);
validationResults.add(
validatePartitionDimensionsAreNotMultiValued(
compactionTask.getTuningConfig().getPartitionsSpec(),
dataSchema.getDimensionsSpec(),
dataSchema instanceof CombinedDataSchema
? ((CombinedDataSchema) dataSchema).getMultiValuedDimensions()
: null
)
);

}
if (compactionTask.getGranularitySpec() != null) {
validationResults.add(ClientCompactionRunnerInfo.validateRollupForMSQ(
Expand All @@ -175,6 +188,32 @@ public CompactionConfigValidationResult validateCompactionTask(
.orElse(CompactionConfigValidationResult.success());
}

private CompactionConfigValidationResult validatePartitionDimensionsAreNotMultiValued(
PartitionsSpec partitionsSpec,
DimensionsSpec dimensionsSpec,
Set<String> multiValuedDimensions
)
{
List<String> dimensionSchemas = dimensionsSpec.getDimensionNames();
if (partitionsSpec instanceof DimensionRangePartitionsSpec
&& dimensionSchemas != null
&& multiValuedDimensions != null
&& !multiValuedDimensions.isEmpty()) {
Optional<String> multiValuedDimension = ((DimensionRangePartitionsSpec) partitionsSpec)
.getPartitionDimensions()
.stream()
.filter(multiValuedDimensions::contains)
.findAny();
if (multiValuedDimension.isPresent()) {
return CompactionConfigValidationResult.failure(
"MSQ: Multi-valued string partition dimension[%s] not supported with 'range' partition spec",
multiValuedDimension.get()
);
}
}
return CompactionConfigValidationResult.success();
}

@Override
public CurrentSubTaskHolder getCurrentSubTaskHolder()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import org.apache.druid.client.indexing.ClientCompactionTaskGranularitySpec;
import org.apache.druid.client.indexing.ClientCompactionTaskTransformSpec;
Expand Down Expand Up @@ -109,11 +110,15 @@ public class MSQCompactionRunnerTest
);
private static final Map<Interval, DataSchema> INTERVAL_DATASCHEMAS = ImmutableMap.of(
COMPACTION_INTERVAL,
new DataSchema.Builder()
.withDataSource(DATA_SOURCE)
.withTimestamp(new TimestampSpec(TIMESTAMP_COLUMN, null, null))
.withDimensions(new DimensionsSpec(DIMENSIONS))
.build()
new CombinedDataSchema(
DATA_SOURCE,
new TimestampSpec(TIMESTAMP_COLUMN, null, null),
new DimensionsSpec(DIMENSIONS),
null,
null,
null,
ImmutableSet.of(MV_STRING_DIMENSION.getName())
)
);
private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper();
private static final AggregatorFactory AGG1 = new CountAggregatorFactory("agg_0");
Expand All @@ -139,6 +144,7 @@ public void testMultipleDisjointCompactionIntervalsAreInvalid()
null,
Collections.emptyMap(),
null,
null,
null
);
CompactionConfigValidationResult validationResult = MSQ_COMPACTION_RUNNER.validateCompactionTask(
Expand All @@ -160,6 +166,7 @@ public void testHashedPartitionsSpecIsInvalid()
null,
Collections.emptyMap(),
null,
null,
null
);
Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, INTERVAL_DATASCHEMAS).isValid());
Expand All @@ -173,6 +180,7 @@ public void testStringDimensionInRangePartitionsSpecIsValid()
null,
Collections.emptyMap(),
null,
null,
null
);
Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, INTERVAL_DATASCHEMAS).isValid());
Expand All @@ -187,6 +195,7 @@ public void testLongDimensionInRangePartitionsSpecIsInvalid()
null,
Collections.emptyMap(),
null,
null,
null
);

Expand All @@ -200,6 +209,29 @@ public void testLongDimensionInRangePartitionsSpecIsInvalid()
);
}

@Test
public void testMultiValuedDimensionInRangePartitionsSpecIsInvalid()
{
List<String> mvStringPartitionDimension = Collections.singletonList(MV_STRING_DIMENSION.getName());
CompactionTask compactionTask = createCompactionTask(
new DimensionRangePartitionsSpec(TARGET_ROWS_PER_SEGMENT, null, mvStringPartitionDimension, false),
null,
Collections.emptyMap(),
null,
null,
null
);

CompactionConfigValidationResult validationResult = MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask,
INTERVAL_DATASCHEMAS
);
Assert.assertFalse(validationResult.isValid());
Assert.assertEquals(
"MSQ: Multi-valued string partition dimension[mv_string_dim] not supported with 'range' partition spec",
validationResult.getReason()
);
}

@Test
public void testMaxTotalRowsIsInvalid()
{
Expand All @@ -208,6 +240,7 @@ public void testMaxTotalRowsIsInvalid()
null,
Collections.emptyMap(),
null,
null,
null
);
Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, INTERVAL_DATASCHEMAS).isValid());
Expand All @@ -221,6 +254,7 @@ public void testDynamicPartitionsSpecIsValid()
null,
Collections.emptyMap(),
null,
null,
null
);
Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, INTERVAL_DATASCHEMAS).isValid());
Expand All @@ -234,6 +268,7 @@ public void testQueryGranularityAllIsValid()
null,
Collections.emptyMap(),
new ClientCompactionTaskGranularitySpec(null, Granularities.ALL, null),
null,
null
);
Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, INTERVAL_DATASCHEMAS).isValid());
Expand All @@ -247,26 +282,28 @@ public void testRollupFalseWithMetricsSpecIsInValid()
null,
Collections.emptyMap(),
new ClientCompactionTaskGranularitySpec(null, null, false),
null,
AGGREGATORS.toArray(new AggregatorFactory[0])
);
Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, INTERVAL_DATASCHEMAS).isValid());
}

@Test
public void testRollupTrueWithoutMetricsSpecIsInValid()
public void testRollupTrueWithoutMetricsSpecIsInvalid()
{
CompactionTask compactionTask = createCompactionTask(
new DynamicPartitionsSpec(3, null),
null,
Collections.emptyMap(),
new ClientCompactionTaskGranularitySpec(null, null, true),
null,
null
);
Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, INTERVAL_DATASCHEMAS).isValid());
}

@Test
public void testMSQEngineWithUnsupportedMetricsSpecIsInValid()
public void testMSQEngineWithUnsupportedMetricsSpecIsInvalid()
{
// Aggregators having different input and ouput column names are unsupported.
final String inputColName = "added";
Expand All @@ -276,6 +313,7 @@ public void testMSQEngineWithUnsupportedMetricsSpecIsInValid()
null,
Collections.emptyMap(),
new ClientCompactionTaskGranularitySpec(null, null, true),
null,
new AggregatorFactory[]{new LongSumAggregatorFactory(outputColName, inputColName)}
);
CompactionConfigValidationResult validationResult = MSQ_COMPACTION_RUNNER.validateCompactionTask(
Expand All @@ -292,7 +330,7 @@ public void testMSQEngineWithUnsupportedMetricsSpecIsInValid()
@Test
public void testRunCompactionTasksWithEmptyTaskListFails() throws Exception
{
CompactionTask compactionTask = createCompactionTask(null, null, Collections.emptyMap(), null, null);
CompactionTask compactionTask = createCompactionTask(null, null, Collections.emptyMap(), null, null, null);
TaskStatus taskStatus = MSQ_COMPACTION_RUNNER.runCompactionTasks(compactionTask, Collections.emptyMap(), null);
Assert.assertTrue(taskStatus.isFailure());
}
Expand All @@ -307,6 +345,7 @@ public void testMSQControllerTaskSpecWithScanIsValid() throws JsonProcessingExce
dimFilter,
Collections.emptyMap(),
null,
null,
null
);

Expand Down Expand Up @@ -384,6 +423,7 @@ public void testMSQControllerTaskSpecWithAggregatorsIsValid() throws JsonProcess
dimFilter,
Collections.emptyMap(),
null,
null,
null
);

Expand Down Expand Up @@ -481,6 +521,7 @@ private CompactionTask createCompactionTask(
@Nullable DimFilter dimFilter,
Map<String, Object> contextParams,
@Nullable ClientCompactionTaskGranularitySpec granularitySpec,
@Nullable List<DimensionSchema> dimensionSchemas,
@Nullable AggregatorFactory[] metricsSpec
)
{
Expand All @@ -504,6 +545,7 @@ private CompactionTask createCompactionTask(
))
.transformSpec(transformSpec)
.granularitySpec(granularitySpec)
.dimensionsSpec(new DimensionsSpec(dimensionSchemas))
.metricsSpec(metricsSpec)
.compactionRunner(MSQ_COMPACTION_RUNNER)
.context(context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.druid.indexer.Checks;
import org.apache.druid.indexer.Property;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
Expand Down Expand Up @@ -80,6 +81,7 @@
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.incremental.AppendableIndexSpec;
import org.apache.druid.segment.indexing.CombinedDataSchema;
import org.apache.druid.segment.indexing.DataSchema;
Expand Down Expand Up @@ -452,6 +454,50 @@ public boolean isPerfectRollup()
return tuningConfig != null && tuningConfig.isForceGuaranteedRollup();
}

/**
* Checks if multi-valued string dimensions need to be analyzed by downloading the segments.
* This method returns true only for MSQ engine when either of the following holds true:
* <ul>
* <li> Range partitioning is done on a string dimension or an unknown dimension
* (since MSQ does not support partitioning on a multi-valued string dimension) </li>
* <li> Rollup is done on a string dimension or an unknown dimension
* (since MSQ requires multi-valued string dimensions to be converted to arrays for rollup) </li>
* </ul>
* @return false for native engine, true for MSQ engine only when partitioning or rollup is done on a string
* or unknown dimension.
*/
boolean identifyMultiValuedDimensions()
{
if (compactionRunner instanceof NativeCompactionRunner) {
return false;
}
// Rollup can be true even when granularitySpec is not known since rollup is then decided based on segment analysis
final boolean isPossiblyRollup = granularitySpec == null || !Boolean.FALSE.equals(granularitySpec.isRollup());
boolean isRangePartitioned = tuningConfig != null
&& tuningConfig.getPartitionsSpec() instanceof DimensionRangePartitionsSpec;

if (dimensionsSpec == null || dimensionsSpec.getDimensions().isEmpty()) {
return isPossiblyRollup || isRangePartitioned;
} else {
boolean isRollupOnStringDimension = isPossiblyRollup &&
dimensionsSpec.getDimensions()
.stream()
.anyMatch(dim -> ColumnType.STRING.equals(dim.getColumnType()));

boolean isPartitionedOnStringDimension =
isRangePartitioned &&
dimensionsSpec.getDimensions()
.stream()
.anyMatch(
dim -> ColumnType.STRING.equals(dim.getColumnType())
&& ((DimensionRangePartitionsSpec) tuningConfig.getPartitionsSpec())
.getPartitionDimensions()
.contains(dim.getName())
);
return isRollupOnStringDimension || isPartitionedOnStringDimension;
}
}

@Override
public TaskStatus runTask(TaskToolbox toolbox) throws Exception
{
Expand All @@ -466,7 +512,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception
metricsSpec,
granularitySpec,
getMetricBuilder(),
!(compactionRunner instanceof NativeCompactionRunner)
this.identifyMultiValuedDimensions()
);

registerResourceCloserOnAbnormalExit(compactionRunner.getCurrentSubTaskHolder());
Expand Down Expand Up @@ -794,23 +840,25 @@ static class ExistingSegmentAnalyzer
this.needMultiValuedDimensions = needMultiValuedDimensions;
}

private boolean shouldFetchSegments()
/**
* Segments are downloaded even when just needMultiValuedDimensions=true since MSQ switches to dynamic partitioning
* on finding any 'range' partition dimension to be multivalued at runtime, which ends up in a mismatch between
* the compaction config and the actual segments (lastCompactionState), leading to repeated compactions.
*/
private boolean shouldDownloadSegments()
{
// Don't fetch segments for just needMultiValueDimensions
return needRollup || needQueryGranularity || needDimensionsSpec || needMetricsSpec;

return needRollup || needQueryGranularity || needDimensionsSpec || needMetricsSpec || needMultiValuedDimensions;
}

public void fetchAndProcessIfNeeded()
{
if (!shouldFetchSegments()) {
if (!shouldDownloadSegments()) {
// Nothing to do; short-circuit and don't fetch segments.
return;
}

if (needMultiValuedDimensions) {
multiValuedDimensions = new HashSet<>();
}

multiValuedDimensions = new HashSet<>();
final List<Pair<DataSegment, Supplier<ResourceHolder<QueryableIndex>>>> segments = sortSegmentsListNewestFirst();

for (Pair<DataSegment, Supplier<ResourceHolder<QueryableIndex>>> segmentPair : segments) {
Expand Down
Loading
Loading