Skip to content

Commit

Permalink
insert explict mappings for objects in nested output to avoid clashes…
Browse files Browse the repository at this point in the history
… with

index templates

fixes elastic#51321
  • Loading branch information
Hendrik Muhs committed Jan 23, 2020
1 parent 23be11c commit 54ecb1f
Show file tree
Hide file tree
Showing 3 changed files with 203 additions and 47 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.transform.integration;

import org.elasticsearch.client.Request;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.junit.Before;

import java.io.IOException;
import java.util.List;
import java.util.Map;

import static org.hamcrest.Matchers.equalTo;

public class TransformPivotRestSpecialCasesIT extends TransformRestTestCase {
private static boolean indicesCreated = false;

// preserve indices in order to reuse source indices in several test cases
@Override
protected boolean preserveIndicesUponCompletion() {
return true;
}

@Before
public void createIndexes() throws IOException {

// it's not possible to run it as @BeforeClass as clients aren't initialized then, so we need this little hack
if (indicesCreated) {
return;
}

createReviewsIndex();
indicesCreated = true;
}

public void testIndexTemplateMappingClash() throws Exception {
String transformId = "special_pivot_template_mappings_clash";
String transformIndex = "special_pivot_template_mappings_clash";

// create a template that defines a field "rating" with a type "float" which will clash later with
// output field "rating.avg" in the pivot config
final Request createIndexTemplateRequest = new Request("PUT", "_template/special_pivot_template");

String template = "{"
+ "\"index_patterns\" : [\"special_pivot_template*\"],"
+ " \"mappings\" : {"
+ " \"properties\": {"
+ " \"rating\":{"
+ " \"type\": \"float\"\n"
+ " }"
+ " }"
+ " }"
+ "}";


createIndexTemplateRequest.setJsonEntity(template);
Map<String, Object> createIndexTemplateResponse = entityAsMap(client().performRequest(createIndexTemplateRequest));
assertThat(createIndexTemplateResponse.get("acknowledged"), equalTo(Boolean.TRUE));

final Request createTransformRequest = new Request("PUT", getTransformEndpoint() + transformId);

String config = "{"
+ " \"source\": {\"index\":\""
+ REVIEWS_INDEX_NAME
+ "\"},"
+ " \"dest\": {\"index\":\""
+ transformIndex
+ "\"},";

config += " \"pivot\": {"
+ " \"group_by\": {"
+ " \"reviewer\": {"
+ " \"terms\": {"
+ " \"field\": \"user_id\""
+ " } } },"
+ " \"aggregations\": {"
+ " \"rating.avg\": {"
+ " \"avg\": {"
+ " \"field\": \"stars\""
+ " } }"
+ " } }"
+ "}";

createTransformRequest.setJsonEntity(config);
Map<String, Object> createTransformResponse = entityAsMap(client().performRequest(createTransformRequest));
assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));

startAndWaitForTransform(transformId, transformIndex);
assertTrue(indexExists(transformIndex));

// we expect 27 documents as there shall be 27 user_id's
Map<String, Object> indexStats = getAsMap(transformIndex + "/_stats");
assertEquals(27, XContentMapValues.extractValue("_all.total.docs.count", indexStats));

// get and check some users
Map<String, Object> searchResult = getAsMap(transformIndex + "/_search?q=reviewer:user_4");

assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
Number actual = (Number) ((List<?>) XContentMapValues.extractValue("hits.hits._source.rating.avg", searchResult)).get(0);
assertEquals(3.878048780, actual.doubleValue(), 0.000001);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,9 @@ static int getDataFrameCheckpoint(String transformId) throws IOException {
Response statsResponse = client().performRequest(new Request("GET", getTransformEndpoint() + transformId + "/_stats"));

Map<?, ?> transformStatsAsMap = (Map<?, ?>) ((List<?>) entityAsMap(statsResponse).get("transforms")).get(0);

// assert that the transform did not fail
assertNotEquals("failed", XContentMapValues.extractValue("state", transformStatsAsMap));
return (int) XContentMapValues.extractValue("checkpointing.last.checkpoint", transformStatsAsMap);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ public final class SchemaUtil {
NUMERIC_FIELD_MAPPER_TYPES = types;
}

private SchemaUtil() {
}
private SchemaUtil() {}

public static boolean isNumericType(String type) {
return type != null && NUMERIC_FIELD_MAPPER_TYPES.contains(type);
Expand All @@ -59,27 +58,29 @@ public static boolean isNumericType(String type) {
* @param source Source index that contains the data to pivot
* @param listener Listener to alert on success or failure.
*/
public static void deduceMappings(final Client client,
final PivotConfig config,
final String[] source,
final ActionListener<Map<String, String>> listener) {
public static void deduceMappings(
final Client client,
final PivotConfig config,
final String[] source,
final ActionListener<Map<String, String>> listener
) {
// collects the fieldnames used as source for aggregations
Map<String, String> aggregationSourceFieldNames = new HashMap<>();
// collects the aggregation types by source name
Map<String, String> aggregationTypes = new HashMap<>();
// collects the fieldnames and target fieldnames used for grouping
Map<String, String> fieldNamesForGrouping = new HashMap<>();

config.getGroupConfig().getGroups().forEach((destinationFieldName, group) -> {
fieldNamesForGrouping.put(destinationFieldName, group.getField());
});
config.getGroupConfig()
.getGroups()
.forEach((destinationFieldName, group) -> { fieldNamesForGrouping.put(destinationFieldName, group.getField()); });

for (AggregationBuilder agg : config.getAggregationConfig().getAggregatorFactories()) {
if (agg instanceof ValuesSourceAggregationBuilder) {
ValuesSourceAggregationBuilder<?, ?> valueSourceAggregation = (ValuesSourceAggregationBuilder<?, ?>) agg;
aggregationSourceFieldNames.put(valueSourceAggregation.getName(), valueSourceAggregation.field());
aggregationTypes.put(valueSourceAggregation.getName(), valueSourceAggregation.getType());
} else if(agg instanceof ScriptedMetricAggregationBuilder || agg instanceof MultiValuesSourceAggregationBuilder) {
} else if (agg instanceof ScriptedMetricAggregationBuilder || agg instanceof MultiValuesSourceAggregationBuilder) {
aggregationTypes.put(agg.getName(), agg.getType());
} else {
// execution should not reach this point
Expand All @@ -98,13 +99,17 @@ public static void deduceMappings(final Client client,
allFieldNames.putAll(aggregationSourceFieldNames);
allFieldNames.putAll(fieldNamesForGrouping);

getSourceFieldMappings(client, source, allFieldNames.values().toArray(new String[0]),
getSourceFieldMappings(
client,
source,
allFieldNames.values().toArray(new String[0]),
ActionListener.wrap(
sourceMappings -> listener.onResponse(resolveMappings(aggregationSourceFieldNames,
aggregationTypes,
fieldNamesForGrouping,
sourceMappings)),
listener::onFailure));
sourceMappings -> listener.onResponse(
resolveMappings(aggregationSourceFieldNames, aggregationTypes, fieldNamesForGrouping, sourceMappings)
),
listener::onFailure
)
);
}

/**
Expand All @@ -115,36 +120,37 @@ public static void deduceMappings(final Client client,
* @param index The index, or index pattern, from which to gather all the field mappings
* @param listener The listener to be alerted on success or failure.
*/
public static void getDestinationFieldMappings(final Client client,
final String index,
final ActionListener<Map<String, String>> listener) {
FieldCapabilitiesRequest fieldCapabilitiesRequest = new FieldCapabilitiesRequest()
.indices(index)
public static void getDestinationFieldMappings(
final Client client,
final String index,
final ActionListener<Map<String, String>> listener
) {
FieldCapabilitiesRequest fieldCapabilitiesRequest = new FieldCapabilitiesRequest().indices(index)
.fields("*")
.indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
ClientHelper.executeAsyncWithOrigin(client,
ClientHelper.executeAsyncWithOrigin(
client,
ClientHelper.TRANSFORM_ORIGIN,
FieldCapabilitiesAction.INSTANCE,
fieldCapabilitiesRequest,
ActionListener.wrap(
r -> listener.onResponse(extractFieldMappings(r)),
listener::onFailure
));
ActionListener.wrap(r -> listener.onResponse(extractFieldMappings(r)), listener::onFailure)
);
}

private static Map<String, String> resolveMappings(Map<String, String> aggregationSourceFieldNames,
Map<String, String> aggregationTypes,
Map<String, String> fieldNamesForGrouping,
Map<String, String> sourceMappings) {
private static Map<String, String> resolveMappings(
Map<String, String> aggregationSourceFieldNames,
Map<String, String> aggregationTypes,
Map<String, String> fieldNamesForGrouping,
Map<String, String> sourceMappings
) {
Map<String, String> targetMapping = new HashMap<>();

aggregationTypes.forEach((targetFieldName, aggregationName) -> {
String sourceFieldName = aggregationSourceFieldNames.get(targetFieldName);
String sourceMapping = sourceFieldName == null ? null : sourceMappings.get(sourceFieldName);
String destinationMapping = Aggregations.resolveTargetMapping(aggregationName, sourceMapping);

logger.debug("Deduced mapping for: [{}], agg type [{}] to [{}]",
targetFieldName, aggregationName, destinationMapping);
logger.debug("Deduced mapping for: [{}], agg type [{}] to [{}]", targetFieldName, aggregationName, destinationMapping);

if (Aggregations.isDynamicMapping(destinationMapping)) {
logger.debug("Dynamic target mapping set for field [{}] and aggregation [{}]", targetFieldName, aggregationName);
Expand All @@ -165,34 +171,75 @@ private static Map<String, String> resolveMappings(Map<String, String> aggregati
targetMapping.put(targetFieldName, "keyword");
}
});

// insert object mappings for nested fields
insertNestedObjectMappings(targetMapping);

return targetMapping;
}

/*
* Very "magic" helper method to extract the source mappings
*/
private static void getSourceFieldMappings(Client client, String[] index, String[] fields,
ActionListener<Map<String, String>> listener) {
FieldCapabilitiesRequest fieldCapabilitiesRequest = new FieldCapabilitiesRequest()
.indices(index)
private static void getSourceFieldMappings(
Client client,
String[] index,
String[] fields,
ActionListener<Map<String, String>> listener
) {
FieldCapabilitiesRequest fieldCapabilitiesRequest = new FieldCapabilitiesRequest().indices(index)
.fields(fields)
.indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
client.execute(FieldCapabilitiesAction.INSTANCE, fieldCapabilitiesRequest, ActionListener.wrap(
response -> listener.onResponse(extractFieldMappings(response)),
listener::onFailure));
client.execute(
FieldCapabilitiesAction.INSTANCE,
fieldCapabilitiesRequest,
ActionListener.wrap(response -> listener.onResponse(extractFieldMappings(response)), listener::onFailure)
);
}

private static Map<String, String> extractFieldMappings(FieldCapabilitiesResponse response) {
Map<String, String> extractedTypes = new HashMap<>();

response.get().forEach((fieldName, capabilitiesMap) -> {
// TODO: overwrites types, requires resolve if
// types are mixed
capabilitiesMap.forEach((name, capability) -> {
logger.trace("Extracted type for [{}] : [{}]", fieldName, capability.getType());
extractedTypes.put(fieldName, capability.getType());
});
});
response.get()
.forEach(
(fieldName, capabilitiesMap) -> {
// TODO: overwrites types, requires resolve if
// types are mixed
capabilitiesMap.forEach((name, capability) -> {
logger.trace("Extracted type for [{}] : [{}]", fieldName, capability.getType());
extractedTypes.put(fieldName, capability.getType());
});
}
);
return extractedTypes;
}

/**
* Insert object mappings for fields like:
*
* a.b.c : some_type
*
* in which case it creates additional mappings:
*
* a.b : object
* a : object
*
* avoids snafu with index templates injecting incompatible mappings
*
* @param fieldMappings field mappings to inject to
*/
static void insertNestedObjectMappings(Map<String, String> fieldMappings) {
Map<String, String> additionalMappings = new HashMap<>();
fieldMappings.keySet().stream().filter(key -> key.contains(".")).forEach(key -> {
int pos;
String objectKey = key;
// lastIndexOf returns -1 on mismatch, but to disallow empty strings check for > 0
while ((pos = objectKey.lastIndexOf(".")) > 0) {
objectKey = objectKey.substring(0, pos);
additionalMappings.putIfAbsent(objectKey, "object");
}
});

additionalMappings.forEach(fieldMappings::putIfAbsent);
}
}

0 comments on commit 54ecb1f

Please sign in to comment.