Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fail MSQ compaction if multi-valued partition dimensions are found #17344

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public MSQCompactionRunner(
* The following configs aren't supported:
* <ul>
* <li>partitionsSpec of type HashedParititionsSpec.</li>
* <li>'range' partitionsSpec with non-string partition dimensions.</li>
* <li>'range' partitionsSpec with non-single-valued-string partition dimensions.</li>
gargvishesh marked this conversation as resolved.
Show resolved Hide resolved
* <li>maxTotalRows in DynamicPartitionsSpec.</li>
* <li>Rollup without metricsSpec being specified or vice-versa.</li>
* <li>Any aggregatorFactory {@code A} s.t. {@code A != A.combiningFactory()}.</li>
Expand All @@ -153,11 +153,15 @@ public CompactionConfigValidationResult validateCompactionTask(
);
}
List<CompactionConfigValidationResult> validationResults = new ArrayList<>();
DataSchema dataSchema = Iterables.getOnlyElement(intervalToDataSchemaMap.values());
kfaraz marked this conversation as resolved.
Show resolved Hide resolved
if (compactionTask.getTuningConfig() != null) {
validationResults.add(
ClientCompactionRunnerInfo.validatePartitionsSpecForMSQ(
compactionTask.getTuningConfig().getPartitionsSpec(),
Iterables.getOnlyElement(intervalToDataSchemaMap.values()).getDimensionsSpec().getDimensions()
dataSchema.getDimensionsSpec().getDimensions(),
dataSchema instanceof CombinedDataSchema
? ((CombinedDataSchema) dataSchema).getMultiValuedDimensions()
: null
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import org.apache.druid.client.indexing.ClientCompactionTaskGranularitySpec;
import org.apache.druid.client.indexing.ClientCompactionTaskTransformSpec;
Expand Down Expand Up @@ -109,11 +110,15 @@ public class MSQCompactionRunnerTest
);
private static final Map<Interval, DataSchema> INTERVAL_DATASCHEMAS = ImmutableMap.of(
COMPACTION_INTERVAL,
new DataSchema.Builder()
.withDataSource(DATA_SOURCE)
.withTimestamp(new TimestampSpec(TIMESTAMP_COLUMN, null, null))
.withDimensions(new DimensionsSpec(DIMENSIONS))
.build()
new CombinedDataSchema(
DATA_SOURCE,
new TimestampSpec(TIMESTAMP_COLUMN, null, null),
new DimensionsSpec(DIMENSIONS),
null,
null,
null,
ImmutableSet.of(MV_STRING_DIMENSION.getName())
)
);
private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper();
private static final AggregatorFactory AGG1 = new CountAggregatorFactory("agg_0");
Expand Down Expand Up @@ -200,6 +205,28 @@ public void testLongDimensionInRangePartitionsSpecIsInvalid()
);
}

@Test
public void testMultiValuedDimensionInRangePartitionsSpecIsInvalid()
{
List<String> mvStringPartitionDimension = Collections.singletonList(MV_STRING_DIMENSION.getName());
CompactionTask compactionTask = createCompactionTask(
new DimensionRangePartitionsSpec(TARGET_ROWS_PER_SEGMENT, null, mvStringPartitionDimension, false),
null,
Collections.emptyMap(),
null,
null
);

CompactionConfigValidationResult validationResult = MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask,
INTERVAL_DATASCHEMAS
);
Assert.assertFalse(validationResult.isValid());
Assert.assertEquals(
"MSQ: Multi-valued string partition dimension[mv_string_dim] not supported with 'range' partition spec",
validationResult.getReason()
);
}

@Test
public void testMaxTotalRowsIsInvalid()
{
Expand Down Expand Up @@ -253,7 +280,7 @@ public void testRollupFalseWithMetricsSpecIsInValid()
}

@Test
public void testRollupTrueWithoutMetricsSpecIsInValid()
public void testRollupTrueWithoutMetricsSpecIsInvalid()
{
CompactionTask compactionTask = createCompactionTask(
new DynamicPartitionsSpec(3, null),
Expand All @@ -266,7 +293,7 @@ public void testRollupTrueWithoutMetricsSpecIsInValid()
}

@Test
public void testMSQEngineWithUnsupportedMetricsSpecIsInValid()
public void testMSQEngineWithUnsupportedMetricsSpecIsInvalid()
{
// Aggregators having different input and ouput column names are unsupported.
final String inputColName = "added";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -107,7 +108,7 @@ public static CompactionConfigValidationResult validateCompactionConfig(
* Checks if the provided compaction config is supported by MSQ. The following configs aren't supported:
* <ul>
* <li>partitionsSpec of type HashedParititionsSpec.</li>
* <li>'range' partitionsSpec with non-string partition dimensions.</li>
* <li>'range' partitionsSpec with non-single-valued-string partition dimensions.</li>
gargvishesh marked this conversation as resolved.
Show resolved Hide resolved
* <li>maxTotalRows in DynamicPartitionsSpec.</li>
* <li>Rollup without metricsSpec being specified or vice-versa.</li>
* <li>Any aggregatorFactory {@code A} s.t. {@code A != A.combiningFactory()}.</li>
Expand All @@ -119,7 +120,9 @@ private static CompactionConfigValidationResult compactionConfigSupportedByMSQEn
if (newConfig.getTuningConfig() != null) {
validationResults.add(validatePartitionsSpecForMSQ(
newConfig.getTuningConfig().getPartitionsSpec(),
newConfig.getDimensionsSpec() == null ? null : newConfig.getDimensionsSpec().getDimensions()
newConfig.getDimensionsSpec() == null ? null : newConfig.getDimensionsSpec().getDimensions(),
// multi-valued dimensions info is not available inside the compaction config
null
));
}
if (newConfig.getGranularitySpec() != null) {
Expand All @@ -138,13 +141,19 @@ private static CompactionConfigValidationResult compactionConfigSupportedByMSQEn

/**
* Validate that partitionSpec is either 'dynamic` or 'range'. If 'dynamic', ensure 'maxTotalRows' is null. If range
* ensure all partition columns are of string type.
* ensure all partition columns are of single-valued string type .
gargvishesh marked this conversation as resolved.
Show resolved Hide resolved
*/
public static CompactionConfigValidationResult validatePartitionsSpecForMSQ(
@Nullable PartitionsSpec partitionsSpec,
@Nullable List<DimensionSchema> dimensionSchemas
@Nullable List<DimensionSchema> dimensionSchemas,
@Nullable Set<String> multiValuedDimensions
)
{
if (partitionsSpec == null) {
return CompactionConfigValidationResult.failure(
"MSQ: tuningConfig.partitionsSpec must be set"
kfaraz marked this conversation as resolved.
Show resolved Hide resolved
);
}
if (!(partitionsSpec instanceof DimensionRangePartitionsSpec
|| partitionsSpec instanceof DynamicPartitionsSpec)) {
return CompactionConfigValidationResult.failure(
Expand All @@ -163,17 +172,26 @@ public static CompactionConfigValidationResult validatePartitionsSpecForMSQ(
Map<String, DimensionSchema> dimensionSchemaMap = dimensionSchemas.stream().collect(
Collectors.toMap(DimensionSchema::getName, Function.identity())
);
Optional<String> nonStringDimension = ((DimensionRangePartitionsSpec) partitionsSpec)
Optional<String> unsupportedDimension = ((DimensionRangePartitionsSpec) partitionsSpec)
.getPartitionDimensions()
.stream()
.filter(dim -> !ColumnType.STRING.equals(dimensionSchemaMap.get(dim).getColumnType()))
.filter(dim -> !ColumnType.STRING.equals(dimensionSchemaMap.get(dim).getColumnType())
|| multiValuedDimensions != null && multiValuedDimensions.contains(dim))
kfaraz marked this conversation as resolved.
Show resolved Hide resolved
.findAny();
if (nonStringDimension.isPresent()) {
return CompactionConfigValidationResult.failure(
"MSQ: Non-string partition dimension[%s] of type[%s] not supported with 'range' partition spec",
nonStringDimension.get(),
dimensionSchemaMap.get(nonStringDimension.get()).getTypeName()
);
if (unsupportedDimension.isPresent()) {
DimensionSchema unsupportedDimensionSchema = dimensionSchemaMap.get(unsupportedDimension.get());
if (ColumnType.STRING.equals(unsupportedDimensionSchema.getColumnType())) {
return CompactionConfigValidationResult.failure(
"MSQ: Multi-valued string partition dimension[%s] not supported with 'range' partition spec",
unsupportedDimensionSchema.getName()
);
} else {
return CompactionConfigValidationResult.failure(
"MSQ: Non-string partition dimension[%s] of type[%s] not supported with 'range' partition spec",
unsupportedDimensionSchema.getName(),
unsupportedDimensionSchema.getTypeName()
);
}
}
}
return CompactionConfigValidationResult.success();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,27 @@ public void testMSQEngineWithMaxTotalRowsIsInvalid()
);
}

@Test
public void testMSQEngineWithNullPartitionsSpecIsInvalid()
{
DataSourceCompactionConfig compactionConfig = createMSQCompactionConfig(
null,
Collections.emptyMap(),
null,
null,
null
);
CompactionConfigValidationResult validationResult = ClientCompactionRunnerInfo.validateCompactionConfig(
compactionConfig,
CompactionEngine.NATIVE
);
Assert.assertFalse(validationResult.isValid());
Assert.assertEquals(
"MSQ: tuningConfig.partitionsSpec must be specified",
validationResult.getReason()
);
}

@Test
public void testMSQEngineWithDynamicPartitionsSpecIsValid()
{
Expand Down Expand Up @@ -253,7 +274,7 @@ private static DataSourceCompactionConfig createMSQCompactionConfig(
List<DimensionSchema> dimensions
)
{
final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
return new DataSourceCompactionConfig(
"dataSource",
null,
500L,
Expand All @@ -268,12 +289,11 @@ private static DataSourceCompactionConfig createMSQCompactionConfig(
CompactionEngine.MSQ,
context
);
return config;
}

private static UserCompactionTaskQueryTuningConfig createTuningConfig(PartitionsSpec partitionsSpec)
{
final UserCompactionTaskQueryTuningConfig tuningConfig = new UserCompactionTaskQueryTuningConfig(
return new UserCompactionTaskQueryTuningConfig(
40000,
null,
2000L,
Expand Down Expand Up @@ -302,6 +322,5 @@ private static UserCompactionTaskQueryTuningConfig createTuningConfig(Partitions
100,
2
);
return tuningConfig;
}
}
Loading