Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fail MSQ compaction if multi-valued partition dimensions are found #17344

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.google.inject.Injector;
import org.apache.druid.client.indexing.ClientCompactionRunnerInfo;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
Expand Down Expand Up @@ -84,6 +85,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -130,7 +132,7 @@ public MSQCompactionRunner(
* The following configs aren't supported:
* <ul>
* <li>partitionsSpec of type HashedParititionsSpec.</li>
* <li>'range' partitionsSpec with non-string partition dimensions.</li>
* <li>'range' partitionsSpec with multi-valued or non-string partition dimensions.</li>
* <li>maxTotalRows in DynamicPartitionsSpec.</li>
* <li>Rollup without metricsSpec being specified or vice-versa.</li>
* <li>Any aggregatorFactory {@code A} s.t. {@code A != A.combiningFactory()}.</li>
Expand All @@ -153,13 +155,23 @@ public CompactionConfigValidationResult validateCompactionTask(
);
}
List<CompactionConfigValidationResult> validationResults = new ArrayList<>();
DataSchema dataSchema = Iterables.getOnlyElement(intervalToDataSchemaMap.values());
kfaraz marked this conversation as resolved.
Show resolved Hide resolved
if (compactionTask.getTuningConfig() != null) {
validationResults.add(
ClientCompactionRunnerInfo.validatePartitionsSpecForMSQ(
compactionTask.getTuningConfig().getPartitionsSpec(),
Iterables.getOnlyElement(intervalToDataSchemaMap.values()).getDimensionsSpec().getDimensions()
dataSchema.getDimensionsSpec().getDimensions()
)
);
validationResults.add(
validatePartitionDimensionsNotMultivalued(
compactionTask.getTuningConfig().getPartitionsSpec(),
dataSchema.getDimensionsSpec(),
dataSchema instanceof CombinedDataSchema
? ((CombinedDataSchema) dataSchema).getMultiValuedDimensions()
: null
));
gargvishesh marked this conversation as resolved.
Show resolved Hide resolved

}
if (compactionTask.getGranularitySpec() != null) {
validationResults.add(ClientCompactionRunnerInfo.validateRollupForMSQ(
Expand All @@ -175,6 +187,31 @@ public CompactionConfigValidationResult validateCompactionTask(
.orElse(CompactionConfigValidationResult.success());
}

private CompactionConfigValidationResult validatePartitionDimensionsNotMultivalued(
gargvishesh marked this conversation as resolved.
Show resolved Hide resolved
PartitionsSpec partitionsSpec,
DimensionsSpec dimensionsSpec,
Set<String> multiValuedDimensions
)
{
List<String> dimensionSchemas = dimensionsSpec.getDimensionNames();
if (partitionsSpec instanceof DimensionRangePartitionsSpec
&& dimensionSchemas != null
&& multiValuedDimensions != null) {
gargvishesh marked this conversation as resolved.
Show resolved Hide resolved
Optional<String> multiValuedDimension = ((DimensionRangePartitionsSpec) partitionsSpec)
.getPartitionDimensions()
.stream()
.filter(multiValuedDimensions::contains)
.findAny();
if (multiValuedDimension.isPresent()) {
return CompactionConfigValidationResult.failure(
"MSQ: Multi-valued string partition dimension[%s] not supported with 'range' partition spec",
multiValuedDimension.get()
);
}
}
return CompactionConfigValidationResult.success();
}

@Override
public CurrentSubTaskHolder getCurrentSubTaskHolder()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import org.apache.druid.client.indexing.ClientCompactionTaskGranularitySpec;
import org.apache.druid.client.indexing.ClientCompactionTaskTransformSpec;
Expand Down Expand Up @@ -109,11 +110,15 @@ public class MSQCompactionRunnerTest
);
private static final Map<Interval, DataSchema> INTERVAL_DATASCHEMAS = ImmutableMap.of(
COMPACTION_INTERVAL,
new DataSchema.Builder()
.withDataSource(DATA_SOURCE)
.withTimestamp(new TimestampSpec(TIMESTAMP_COLUMN, null, null))
.withDimensions(new DimensionsSpec(DIMENSIONS))
.build()
new CombinedDataSchema(
DATA_SOURCE,
new TimestampSpec(TIMESTAMP_COLUMN, null, null),
new DimensionsSpec(DIMENSIONS),
null,
null,
null,
ImmutableSet.of(MV_STRING_DIMENSION.getName())
)
);
private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper();
private static final AggregatorFactory AGG1 = new CountAggregatorFactory("agg_0");
Expand All @@ -139,6 +144,7 @@ public void testMultipleDisjointCompactionIntervalsAreInvalid()
null,
Collections.emptyMap(),
null,
null,
null
);
CompactionConfigValidationResult validationResult = MSQ_COMPACTION_RUNNER.validateCompactionTask(
Expand All @@ -160,6 +166,7 @@ public void testHashedPartitionsSpecIsInvalid()
null,
Collections.emptyMap(),
null,
null,
null
);
Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, INTERVAL_DATASCHEMAS).isValid());
Expand All @@ -173,6 +180,7 @@ public void testStringDimensionInRangePartitionsSpecIsValid()
null,
Collections.emptyMap(),
null,
null,
null
);
Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, INTERVAL_DATASCHEMAS).isValid());
Expand All @@ -187,6 +195,7 @@ public void testLongDimensionInRangePartitionsSpecIsInvalid()
null,
Collections.emptyMap(),
null,
null,
null
);

Expand All @@ -200,6 +209,29 @@ public void testLongDimensionInRangePartitionsSpecIsInvalid()
);
}

@Test
public void testMultiValuedDimensionInRangePartitionsSpecIsInvalid()
{
List<String> mvStringPartitionDimension = Collections.singletonList(MV_STRING_DIMENSION.getName());
CompactionTask compactionTask = createCompactionTask(
new DimensionRangePartitionsSpec(TARGET_ROWS_PER_SEGMENT, null, mvStringPartitionDimension, false),
null,
Collections.emptyMap(),
null,
null,
null
);

CompactionConfigValidationResult validationResult = MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask,
INTERVAL_DATASCHEMAS
);
Assert.assertFalse(validationResult.isValid());
Assert.assertEquals(
"MSQ: Multi-valued string partition dimension[mv_string_dim] not supported with 'range' partition spec",
validationResult.getReason()
);
}

@Test
public void testMaxTotalRowsIsInvalid()
{
Expand All @@ -208,6 +240,7 @@ public void testMaxTotalRowsIsInvalid()
null,
Collections.emptyMap(),
null,
null,
null
);
Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, INTERVAL_DATASCHEMAS).isValid());
Expand All @@ -221,6 +254,7 @@ public void testDynamicPartitionsSpecIsValid()
null,
Collections.emptyMap(),
null,
null,
null
);
Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, INTERVAL_DATASCHEMAS).isValid());
Expand All @@ -234,6 +268,7 @@ public void testQueryGranularityAllIsValid()
null,
Collections.emptyMap(),
new ClientCompactionTaskGranularitySpec(null, Granularities.ALL, null),
null,
null
);
Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, INTERVAL_DATASCHEMAS).isValid());
Expand All @@ -247,26 +282,28 @@ public void testRollupFalseWithMetricsSpecIsInValid()
null,
Collections.emptyMap(),
new ClientCompactionTaskGranularitySpec(null, null, false),
null,
AGGREGATORS.toArray(new AggregatorFactory[0])
);
Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, INTERVAL_DATASCHEMAS).isValid());
}

@Test
public void testRollupTrueWithoutMetricsSpecIsInValid()
public void testRollupTrueWithoutMetricsSpecIsInvalid()
{
CompactionTask compactionTask = createCompactionTask(
new DynamicPartitionsSpec(3, null),
null,
Collections.emptyMap(),
new ClientCompactionTaskGranularitySpec(null, null, true),
null,
null
);
Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, INTERVAL_DATASCHEMAS).isValid());
}

@Test
public void testMSQEngineWithUnsupportedMetricsSpecIsInValid()
public void testMSQEngineWithUnsupportedMetricsSpecIsInvalid()
{
// Aggregators having different input and ouput column names are unsupported.
final String inputColName = "added";
Expand All @@ -276,6 +313,7 @@ public void testMSQEngineWithUnsupportedMetricsSpecIsInValid()
null,
Collections.emptyMap(),
new ClientCompactionTaskGranularitySpec(null, null, true),
null,
new AggregatorFactory[]{new LongSumAggregatorFactory(outputColName, inputColName)}
);
CompactionConfigValidationResult validationResult = MSQ_COMPACTION_RUNNER.validateCompactionTask(
Expand All @@ -292,7 +330,7 @@ public void testMSQEngineWithUnsupportedMetricsSpecIsInValid()
@Test
public void testRunCompactionTasksWithEmptyTaskListFails() throws Exception
{
CompactionTask compactionTask = createCompactionTask(null, null, Collections.emptyMap(), null, null);
CompactionTask compactionTask = createCompactionTask(null, null, Collections.emptyMap(), null, null, null);
TaskStatus taskStatus = MSQ_COMPACTION_RUNNER.runCompactionTasks(compactionTask, Collections.emptyMap(), null);
Assert.assertTrue(taskStatus.isFailure());
}
Expand All @@ -307,6 +345,7 @@ public void testMSQControllerTaskSpecWithScanIsValid() throws JsonProcessingExce
dimFilter,
Collections.emptyMap(),
null,
null,
null
);

Expand Down Expand Up @@ -384,6 +423,7 @@ public void testMSQControllerTaskSpecWithAggregatorsIsValid() throws JsonProcess
dimFilter,
Collections.emptyMap(),
null,
null,
null
);

Expand Down Expand Up @@ -481,6 +521,7 @@ private CompactionTask createCompactionTask(
@Nullable DimFilter dimFilter,
Map<String, Object> contextParams,
@Nullable ClientCompactionTaskGranularitySpec granularitySpec,
@Nullable List<DimensionSchema> dimensionSchemas,
@Nullable AggregatorFactory[] metricsSpec
)
{
Expand All @@ -504,6 +545,7 @@ private CompactionTask createCompactionTask(
))
.transformSpec(transformSpec)
.granularitySpec(granularitySpec)
.dimensionsSpec(new DimensionsSpec(dimensionSchemas))
.metricsSpec(metricsSpec)
.compactionRunner(MSQ_COMPACTION_RUNNER)
.context(context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.druid.indexer.Checks;
import org.apache.druid.indexer.Property;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
Expand Down Expand Up @@ -80,6 +81,7 @@
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.incremental.AppendableIndexSpec;
import org.apache.druid.segment.indexing.CombinedDataSchema;
import org.apache.druid.segment.indexing.DataSchema;
Expand Down Expand Up @@ -452,6 +454,45 @@ public boolean isPerfectRollup()
return tuningConfig != null && tuningConfig.isForceGuaranteedRollup();
}

/**
* In MSQ, MVDs need conversion to array for rollup, and they aren't supported as partition columns. This check
* therefore is particularly for MSQCompactionRunner; it returns true when dimension types aren't known
* from CompactionTask spec, or if either rollup or partition dimensions contain any string-type column.
gargvishesh marked this conversation as resolved.
Show resolved Hide resolved
*/
boolean needMultiValuedDimensions()
kfaraz marked this conversation as resolved.
Show resolved Hide resolved
{
if (compactionRunner instanceof NativeCompactionRunner) {
return false;
}
// Rollup can be true even when granularitySpec is not known since rollup is then decided based on segment analysis
boolean isPossiblyRollup = !(granularitySpec != null && Boolean.FALSE.equals(granularitySpec.isRollup()));
gargvishesh marked this conversation as resolved.
Show resolved Hide resolved
boolean isRangePartitioned = tuningConfig != null
&& tuningConfig.getPartitionsSpec() instanceof DimensionRangePartitionsSpec;

if (dimensionsSpec == null || dimensionsSpec.getDimensions().isEmpty()) {
return (isPossiblyRollup || isRangePartitioned);
gargvishesh marked this conversation as resolved.
Show resolved Hide resolved
} else {
boolean isRollupOnStringDimension = isPossiblyRollup &&
dimensionsSpec.getDimensions()
.stream()
.anyMatch(dim -> ColumnType.STRING.equals(
dim.getColumnType()));
gargvishesh marked this conversation as resolved.
Show resolved Hide resolved

boolean isPartitionedOnStringDimension =
isRangePartitioned &&
dimensionsSpec.getDimensions()
.stream()
.anyMatch(
dim -> ColumnType.STRING.equals(
dim.getColumnType())
gargvishesh marked this conversation as resolved.
Show resolved Hide resolved
&& ((DimensionRangePartitionsSpec) tuningConfig.getPartitionsSpec())
.getPartitionDimensions()
.contains(dim.getName())
);
return isRollupOnStringDimension || isPartitionedOnStringDimension;
}
}

@Override
public TaskStatus runTask(TaskToolbox toolbox) throws Exception
{
Expand All @@ -466,7 +507,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception
metricsSpec,
granularitySpec,
getMetricBuilder(),
!(compactionRunner instanceof NativeCompactionRunner)
this.needMultiValuedDimensions()
);

registerResourceCloserOnAbnormalExit(compactionRunner.getCurrentSubTaskHolder());
Expand Down Expand Up @@ -794,23 +835,25 @@ static class ExistingSegmentAnalyzer
this.needMultiValuedDimensions = needMultiValuedDimensions;
}

private boolean shouldFetchSegments()
/**
* Segments are downloaded even when just needMultiValuedDimensions=true since MSQ switches to dynamic partitioning
* on finding any 'range' partition dimension to be multivalued at runtime, which ends up in a mismatch between
* the compaction config and the actual segments (lastCompactionState), leading to repeated compactions.
*/
private boolean shouldDownloadSegments()
{
// Don't fetch segments for just needMultiValueDimensions
return needRollup || needQueryGranularity || needDimensionsSpec || needMetricsSpec;

return needRollup || needQueryGranularity || needDimensionsSpec || needMetricsSpec || needMultiValuedDimensions;
}

public void fetchAndProcessIfNeeded()
{
if (!shouldFetchSegments()) {
if (!shouldDownloadSegments()) {
// Nothing to do; short-circuit and don't fetch segments.
return;
}

if (needMultiValuedDimensions) {
multiValuedDimensions = new HashSet<>();
}

multiValuedDimensions = new HashSet<>();
final List<Pair<DataSegment, Supplier<ResourceHolder<QueryableIndex>>>> segments = sortSegmentsListNewestFirst();

for (Pair<DataSegment, Supplier<ResourceHolder<QueryableIndex>>> segmentPair : segments) {
Expand Down
Loading
Loading