diff --git a/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestSpecialCasesIT.java b/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestSpecialCasesIT.java new file mode 100644 index 0000000000000..56fcc673ff65e --- /dev/null +++ b/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestSpecialCasesIT.java @@ -0,0 +1,106 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.transform.integration; + +import org.elasticsearch.client.Request; +import org.elasticsearch.common.xcontent.support.XContentMapValues; +import org.junit.Before; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.Matchers.equalTo; + +public class TransformPivotRestSpecialCasesIT extends TransformRestTestCase { + private static boolean indicesCreated = false; + + // preserve indices in order to reuse source indices in several test cases + @Override + protected boolean preserveIndicesUponCompletion() { + return true; + } + + @Before + public void createIndexes() throws IOException { + + // it's not possible to run it as @BeforeClass as clients aren't initialized then, so we need this little hack + if (indicesCreated) { + return; + } + + createReviewsIndex(); + indicesCreated = true; + } + + public void testIndexTemplateMappingClash() throws Exception { + String transformId = "special_pivot_template_mappings_clash"; + String transformIndex = "special_pivot_template_mappings_clash"; + + // create a template that defines a field "rating" with a type "float" which will clash later with + // output field "rating.avg" in the pivot config + final Request createIndexTemplateRequest = new Request("PUT", "_template/special_pivot_template"); + + String template = "{" + + "\"index_patterns\" : [\"special_pivot_template*\"]," + + " \"mappings\" : {" + + " \"properties\": {" + + " \"rating\":{" + + " \"type\": \"float\"\n" + + " }" + + " }" + + " }" + + "}"; + + + createIndexTemplateRequest.setJsonEntity(template); + Map createIndexTemplateResponse = entityAsMap(client().performRequest(createIndexTemplateRequest)); + assertThat(createIndexTemplateResponse.get("acknowledged"), equalTo(Boolean.TRUE)); + + final Request createTransformRequest = new Request("PUT", getTransformEndpoint() + transformId); + + String config = "{" + + " \"source\": {\"index\":\"" + + REVIEWS_INDEX_NAME + + "\"}," + + " \"dest\": {\"index\":\"" + + transformIndex + + "\"},"; + + config += " \"pivot\": {" + + " \"group_by\": {" + + " \"reviewer\": {" + + " \"terms\": {" + + " \"field\": \"user_id\"" + + " } } }," + + " \"aggregations\": {" + + " \"rating.avg\": {" + + " \"avg\": {" + + " \"field\": \"stars\"" + + " } }" + + " } }" + + "}"; + + createTransformRequest.setJsonEntity(config); + Map createTransformResponse = entityAsMap(client().performRequest(createTransformRequest)); + assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE)); + + startAndWaitForTransform(transformId, transformIndex); + assertTrue(indexExists(transformIndex)); + + // we expect 27 documents as there shall be 27 user_id's + Map indexStats = getAsMap(transformIndex + "/_stats"); + assertEquals(27, XContentMapValues.extractValue("_all.total.docs.count", indexStats)); + + // get and check some users + Map searchResult = getAsMap(transformIndex + "/_search?q=reviewer:user_4"); + + assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult)); + Number actual = (Number) ((List) XContentMapValues.extractValue("hits.hits._source.rating.avg", searchResult)).get(0); + assertEquals(3.878048780, actual.doubleValue(), 0.000001); + } +} diff --git a/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java b/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java index d9c61dddeab47..cb931cd58b906 100644 --- a/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java +++ b/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java @@ -424,6 +424,9 @@ static int getDataFrameCheckpoint(String transformId) throws IOException { Response statsResponse = client().performRequest(new Request("GET", getTransformEndpoint() + transformId + "/_stats")); Map transformStatsAsMap = (Map) ((List) entityAsMap(statsResponse).get("transforms")).get(0); + + // assert that the transform did not fail + assertNotEquals("failed", XContentMapValues.extractValue("state", transformStatsAsMap)); return (int) XContentMapValues.extractValue("checkpointing.last.checkpoint", transformStatsAsMap); } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/SchemaUtil.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/SchemaUtil.java index e409f923edebd..b9b3d121e5aea 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/SchemaUtil.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/SchemaUtil.java @@ -42,8 +42,7 @@ public final class SchemaUtil { NUMERIC_FIELD_MAPPER_TYPES = types; } - private SchemaUtil() { - } + private SchemaUtil() {} public static boolean isNumericType(String type) { return type != null && NUMERIC_FIELD_MAPPER_TYPES.contains(type); @@ -59,10 +58,12 @@ public static boolean isNumericType(String type) { * @param source Source index that contains the data to pivot * @param listener Listener to alert on success or failure. */ - public static void deduceMappings(final Client client, - final PivotConfig config, - final String[] source, - final ActionListener> listener) { + public static void deduceMappings( + final Client client, + final PivotConfig config, + final String[] source, + final ActionListener> listener + ) { // collects the fieldnames used as source for aggregations Map aggregationSourceFieldNames = new HashMap<>(); // collects the aggregation types by source name @@ -70,16 +71,16 @@ public static void deduceMappings(final Client client, // collects the fieldnames and target fieldnames used for grouping Map fieldNamesForGrouping = new HashMap<>(); - config.getGroupConfig().getGroups().forEach((destinationFieldName, group) -> { - fieldNamesForGrouping.put(destinationFieldName, group.getField()); - }); + config.getGroupConfig() + .getGroups() + .forEach((destinationFieldName, group) -> { fieldNamesForGrouping.put(destinationFieldName, group.getField()); }); for (AggregationBuilder agg : config.getAggregationConfig().getAggregatorFactories()) { if (agg instanceof ValuesSourceAggregationBuilder) { ValuesSourceAggregationBuilder valueSourceAggregation = (ValuesSourceAggregationBuilder) agg; aggregationSourceFieldNames.put(valueSourceAggregation.getName(), valueSourceAggregation.field()); aggregationTypes.put(valueSourceAggregation.getName(), valueSourceAggregation.getType()); - } else if(agg instanceof ScriptedMetricAggregationBuilder || agg instanceof MultiValuesSourceAggregationBuilder) { + } else if (agg instanceof ScriptedMetricAggregationBuilder || agg instanceof MultiValuesSourceAggregationBuilder) { aggregationTypes.put(agg.getName(), agg.getType()); } else { // execution should not reach this point @@ -98,13 +99,17 @@ public static void deduceMappings(final Client client, allFieldNames.putAll(aggregationSourceFieldNames); allFieldNames.putAll(fieldNamesForGrouping); - getSourceFieldMappings(client, source, allFieldNames.values().toArray(new String[0]), + getSourceFieldMappings( + client, + source, + allFieldNames.values().toArray(new String[0]), ActionListener.wrap( - sourceMappings -> listener.onResponse(resolveMappings(aggregationSourceFieldNames, - aggregationTypes, - fieldNamesForGrouping, - sourceMappings)), - listener::onFailure)); + sourceMappings -> listener.onResponse( + resolveMappings(aggregationSourceFieldNames, aggregationTypes, fieldNamesForGrouping, sourceMappings) + ), + listener::onFailure + ) + ); } /** @@ -115,27 +120,29 @@ public static void deduceMappings(final Client client, * @param index The index, or index pattern, from which to gather all the field mappings * @param listener The listener to be alerted on success or failure. */ - public static void getDestinationFieldMappings(final Client client, - final String index, - final ActionListener> listener) { - FieldCapabilitiesRequest fieldCapabilitiesRequest = new FieldCapabilitiesRequest() - .indices(index) + public static void getDestinationFieldMappings( + final Client client, + final String index, + final ActionListener> listener + ) { + FieldCapabilitiesRequest fieldCapabilitiesRequest = new FieldCapabilitiesRequest().indices(index) .fields("*") .indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN); - ClientHelper.executeAsyncWithOrigin(client, + ClientHelper.executeAsyncWithOrigin( + client, ClientHelper.TRANSFORM_ORIGIN, FieldCapabilitiesAction.INSTANCE, fieldCapabilitiesRequest, - ActionListener.wrap( - r -> listener.onResponse(extractFieldMappings(r)), - listener::onFailure - )); + ActionListener.wrap(r -> listener.onResponse(extractFieldMappings(r)), listener::onFailure) + ); } - private static Map resolveMappings(Map aggregationSourceFieldNames, - Map aggregationTypes, - Map fieldNamesForGrouping, - Map sourceMappings) { + private static Map resolveMappings( + Map aggregationSourceFieldNames, + Map aggregationTypes, + Map fieldNamesForGrouping, + Map sourceMappings + ) { Map targetMapping = new HashMap<>(); aggregationTypes.forEach((targetFieldName, aggregationName) -> { @@ -143,8 +150,7 @@ private static Map resolveMappings(Map aggregati String sourceMapping = sourceFieldName == null ? null : sourceMappings.get(sourceFieldName); String destinationMapping = Aggregations.resolveTargetMapping(aggregationName, sourceMapping); - logger.debug("Deduced mapping for: [{}], agg type [{}] to [{}]", - targetFieldName, aggregationName, destinationMapping); + logger.debug("Deduced mapping for: [{}], agg type [{}] to [{}]", targetFieldName, aggregationName, destinationMapping); if (Aggregations.isDynamicMapping(destinationMapping)) { logger.debug("Dynamic target mapping set for field [{}] and aggregation [{}]", targetFieldName, aggregationName); @@ -165,34 +171,75 @@ private static Map resolveMappings(Map aggregati targetMapping.put(targetFieldName, "keyword"); } }); + + // insert object mappings for nested fields + insertNestedObjectMappings(targetMapping); + return targetMapping; } /* * Very "magic" helper method to extract the source mappings */ - private static void getSourceFieldMappings(Client client, String[] index, String[] fields, - ActionListener> listener) { - FieldCapabilitiesRequest fieldCapabilitiesRequest = new FieldCapabilitiesRequest() - .indices(index) + private static void getSourceFieldMappings( + Client client, + String[] index, + String[] fields, + ActionListener> listener + ) { + FieldCapabilitiesRequest fieldCapabilitiesRequest = new FieldCapabilitiesRequest().indices(index) .fields(fields) .indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN); - client.execute(FieldCapabilitiesAction.INSTANCE, fieldCapabilitiesRequest, ActionListener.wrap( - response -> listener.onResponse(extractFieldMappings(response)), - listener::onFailure)); + client.execute( + FieldCapabilitiesAction.INSTANCE, + fieldCapabilitiesRequest, + ActionListener.wrap(response -> listener.onResponse(extractFieldMappings(response)), listener::onFailure) + ); } private static Map extractFieldMappings(FieldCapabilitiesResponse response) { Map extractedTypes = new HashMap<>(); - response.get().forEach((fieldName, capabilitiesMap) -> { - // TODO: overwrites types, requires resolve if - // types are mixed - capabilitiesMap.forEach((name, capability) -> { - logger.trace("Extracted type for [{}] : [{}]", fieldName, capability.getType()); - extractedTypes.put(fieldName, capability.getType()); - }); - }); + response.get() + .forEach( + (fieldName, capabilitiesMap) -> { + // TODO: overwrites types, requires resolve if + // types are mixed + capabilitiesMap.forEach((name, capability) -> { + logger.trace("Extracted type for [{}] : [{}]", fieldName, capability.getType()); + extractedTypes.put(fieldName, capability.getType()); + }); + } + ); return extractedTypes; } + + /** + * Insert object mappings for fields like: + * + * a.b.c : some_type + * + * in which case it creates additional mappings: + * + * a.b : object + * a : object + * + * avoids snafu with index templates injecting incompatible mappings + * + * @param fieldMappings field mappings to inject to + */ + static void insertNestedObjectMappings(Map fieldMappings) { + Map additionalMappings = new HashMap<>(); + fieldMappings.keySet().stream().filter(key -> key.contains(".")).forEach(key -> { + int pos; + String objectKey = key; + // lastIndexOf returns -1 on mismatch, but to disallow empty strings check for > 0 + while ((pos = objectKey.lastIndexOf(".")) > 0) { + objectKey = objectKey.substring(0, pos); + additionalMappings.putIfAbsent(objectKey, "object"); + } + }); + + additionalMappings.forEach(fieldMappings::putIfAbsent); + } }