Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[7.x][Transform] avoid mapping problems with index templates (#51368) #51519

Merged
merged 1 commit into from
Jan 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.transform.integration;

import org.elasticsearch.client.Request;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.junit.Before;

import java.io.IOException;
import java.util.List;
import java.util.Map;

import static org.hamcrest.Matchers.equalTo;

public class TransformPivotRestSpecialCasesIT extends TransformRestTestCase {
private static boolean indicesCreated = false;

// preserve indices in order to reuse source indices in several test cases
@Override
protected boolean preserveIndicesUponCompletion() {
return true;
}

@Before
public void createIndexes() throws IOException {

// it's not possible to run it as @BeforeClass as clients aren't initialized then, so we need this little hack
if (indicesCreated) {
return;
}

createReviewsIndex();
indicesCreated = true;
}

public void testIndexTemplateMappingClash() throws Exception {
String transformId = "special_pivot_template_mappings_clash";
String transformIndex = "special_pivot_template_mappings_clash";

// create a template that defines a field "rating" with a type "float" which will clash later with
// output field "rating.avg" in the pivot config
final Request createIndexTemplateRequest = new Request("PUT", "_template/special_pivot_template");

String template = "{"
+ "\"index_patterns\" : [\"special_pivot_template*\"],"
+ " \"mappings\" : {"
+ " \"properties\": {"
+ " \"rating\":{"
+ " \"type\": \"float\"\n"
+ " }"
+ " }"
+ " }"
+ "}";

createIndexTemplateRequest.setJsonEntity(template);
Map<String, Object> createIndexTemplateResponse = entityAsMap(client().performRequest(createIndexTemplateRequest));
assertThat(createIndexTemplateResponse.get("acknowledged"), equalTo(Boolean.TRUE));

final Request createTransformRequest = new Request("PUT", getTransformEndpoint() + transformId);

String config = "{"
+ " \"source\": {\"index\":\""
+ REVIEWS_INDEX_NAME
+ "\"},"
+ " \"dest\": {\"index\":\""
+ transformIndex
+ "\"},";

config += " \"pivot\": {"
+ " \"group_by\": {"
+ " \"reviewer\": {"
+ " \"terms\": {"
+ " \"field\": \"user_id\""
+ " } } },"
+ " \"aggregations\": {"
+ " \"rating.avg\": {"
+ " \"avg\": {"
+ " \"field\": \"stars\""
+ " } }"
+ " } }"
+ "}";

createTransformRequest.setJsonEntity(config);
Map<String, Object> createTransformResponse = entityAsMap(client().performRequest(createTransformRequest));
assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));

startAndWaitForTransform(transformId, transformIndex);
assertTrue(indexExists(transformIndex));

// we expect 27 documents as there shall be 27 user_id's
Map<String, Object> indexStats = getAsMap(transformIndex + "/_stats");
assertEquals(27, XContentMapValues.extractValue("_all.total.docs.count", indexStats));

// get and check some users
Map<String, Object> searchResult = getAsMap(transformIndex + "/_search?q=reviewer:user_4");

assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
Number actual = (Number) ((List<?>) XContentMapValues.extractValue("hits.hits._source.rating.avg", searchResult)).get(0);
assertEquals(3.878048780, actual.doubleValue(), 0.000001);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,9 @@ static int getTransformCheckpoint(String transformId) throws IOException {
Response statsResponse = client().performRequest(new Request("GET", getTransformEndpoint() + transformId + "/_stats"));

Map<?, ?> transformStatsAsMap = (Map<?, ?>) ((List<?>) entityAsMap(statsResponse).get("transforms")).get(0);

// assert that the transform did not fail
assertNotEquals("failed", XContentMapValues.extractValue("state", transformStatsAsMap));
return (int) XContentMapValues.extractValue("checkpointing.last.checkpoint", transformStatsAsMap);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ public final class SchemaUtil {
NUMERIC_FIELD_MAPPER_TYPES = types;
}

private SchemaUtil() {
}
private SchemaUtil() {}

public static boolean isNumericType(String type) {
return type != null && NUMERIC_FIELD_MAPPER_TYPES.contains(type);
Expand All @@ -59,27 +58,29 @@ public static boolean isNumericType(String type) {
* @param source Source index that contains the data to pivot
* @param listener Listener to alert on success or failure.
*/
public static void deduceMappings(final Client client,
final PivotConfig config,
final String[] source,
final ActionListener<Map<String, String>> listener) {
public static void deduceMappings(
final Client client,
final PivotConfig config,
final String[] source,
final ActionListener<Map<String, String>> listener
) {
// collects the fieldnames used as source for aggregations
Map<String, String> aggregationSourceFieldNames = new HashMap<>();
// collects the aggregation types by source name
Map<String, String> aggregationTypes = new HashMap<>();
// collects the fieldnames and target fieldnames used for grouping
Map<String, String> fieldNamesForGrouping = new HashMap<>();

config.getGroupConfig().getGroups().forEach((destinationFieldName, group) -> {
fieldNamesForGrouping.put(destinationFieldName, group.getField());
});
config.getGroupConfig()
.getGroups()
.forEach((destinationFieldName, group) -> { fieldNamesForGrouping.put(destinationFieldName, group.getField()); });

for (AggregationBuilder agg : config.getAggregationConfig().getAggregatorFactories()) {
if (agg instanceof ValuesSourceAggregationBuilder) {
ValuesSourceAggregationBuilder<?, ?> valueSourceAggregation = (ValuesSourceAggregationBuilder<?, ?>) agg;
aggregationSourceFieldNames.put(valueSourceAggregation.getName(), valueSourceAggregation.field());
aggregationTypes.put(valueSourceAggregation.getName(), valueSourceAggregation.getType());
} else if(agg instanceof ScriptedMetricAggregationBuilder || agg instanceof MultiValuesSourceAggregationBuilder) {
} else if (agg instanceof ScriptedMetricAggregationBuilder || agg instanceof MultiValuesSourceAggregationBuilder) {
aggregationTypes.put(agg.getName(), agg.getType());
} else {
// execution should not reach this point
Expand All @@ -98,13 +99,17 @@ public static void deduceMappings(final Client client,
allFieldNames.putAll(aggregationSourceFieldNames);
allFieldNames.putAll(fieldNamesForGrouping);

getSourceFieldMappings(client, source, allFieldNames.values().toArray(new String[0]),
getSourceFieldMappings(
client,
source,
allFieldNames.values().toArray(new String[0]),
ActionListener.wrap(
sourceMappings -> listener.onResponse(resolveMappings(aggregationSourceFieldNames,
aggregationTypes,
fieldNamesForGrouping,
sourceMappings)),
listener::onFailure));
sourceMappings -> listener.onResponse(
resolveMappings(aggregationSourceFieldNames, aggregationTypes, fieldNamesForGrouping, sourceMappings)
),
listener::onFailure
)
);
}

/**
Expand All @@ -115,36 +120,37 @@ public static void deduceMappings(final Client client,
* @param index The index, or index pattern, from which to gather all the field mappings
* @param listener The listener to be alerted on success or failure.
*/
public static void getDestinationFieldMappings(final Client client,
final String index,
final ActionListener<Map<String, String>> listener) {
FieldCapabilitiesRequest fieldCapabilitiesRequest = new FieldCapabilitiesRequest()
.indices(index)
public static void getDestinationFieldMappings(
final Client client,
final String index,
final ActionListener<Map<String, String>> listener
) {
FieldCapabilitiesRequest fieldCapabilitiesRequest = new FieldCapabilitiesRequest().indices(index)
.fields("*")
.indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
ClientHelper.executeAsyncWithOrigin(client,
ClientHelper.executeAsyncWithOrigin(
client,
ClientHelper.TRANSFORM_ORIGIN,
FieldCapabilitiesAction.INSTANCE,
fieldCapabilitiesRequest,
ActionListener.wrap(
r -> listener.onResponse(extractFieldMappings(r)),
listener::onFailure
));
ActionListener.wrap(r -> listener.onResponse(extractFieldMappings(r)), listener::onFailure)
);
}

private static Map<String, String> resolveMappings(Map<String, String> aggregationSourceFieldNames,
Map<String, String> aggregationTypes,
Map<String, String> fieldNamesForGrouping,
Map<String, String> sourceMappings) {
private static Map<String, String> resolveMappings(
Map<String, String> aggregationSourceFieldNames,
Map<String, String> aggregationTypes,
Map<String, String> fieldNamesForGrouping,
Map<String, String> sourceMappings
) {
Map<String, String> targetMapping = new HashMap<>();

aggregationTypes.forEach((targetFieldName, aggregationName) -> {
String sourceFieldName = aggregationSourceFieldNames.get(targetFieldName);
String sourceMapping = sourceFieldName == null ? null : sourceMappings.get(sourceFieldName);
String destinationMapping = Aggregations.resolveTargetMapping(aggregationName, sourceMapping);

logger.debug("Deduced mapping for: [{}], agg type [{}] to [{}]",
targetFieldName, aggregationName, destinationMapping);
logger.debug("Deduced mapping for: [{}], agg type [{}] to [{}]", targetFieldName, aggregationName, destinationMapping);

if (Aggregations.isDynamicMapping(destinationMapping)) {
logger.debug("Dynamic target mapping set for field [{}] and aggregation [{}]", targetFieldName, aggregationName);
Expand All @@ -165,34 +171,75 @@ private static Map<String, String> resolveMappings(Map<String, String> aggregati
targetMapping.put(targetFieldName, "keyword");
}
});

// insert object mappings for nested fields
insertNestedObjectMappings(targetMapping);

return targetMapping;
}

/*
* Very "magic" helper method to extract the source mappings
*/
private static void getSourceFieldMappings(Client client, String[] index, String[] fields,
ActionListener<Map<String, String>> listener) {
FieldCapabilitiesRequest fieldCapabilitiesRequest = new FieldCapabilitiesRequest()
.indices(index)
private static void getSourceFieldMappings(
Client client,
String[] index,
String[] fields,
ActionListener<Map<String, String>> listener
) {
FieldCapabilitiesRequest fieldCapabilitiesRequest = new FieldCapabilitiesRequest().indices(index)
.fields(fields)
.indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
client.execute(FieldCapabilitiesAction.INSTANCE, fieldCapabilitiesRequest, ActionListener.wrap(
response -> listener.onResponse(extractFieldMappings(response)),
listener::onFailure));
client.execute(
FieldCapabilitiesAction.INSTANCE,
fieldCapabilitiesRequest,
ActionListener.wrap(response -> listener.onResponse(extractFieldMappings(response)), listener::onFailure)
);
}

private static Map<String, String> extractFieldMappings(FieldCapabilitiesResponse response) {
Map<String, String> extractedTypes = new HashMap<>();

response.get().forEach((fieldName, capabilitiesMap) -> {
// TODO: overwrites types, requires resolve if
// types are mixed
capabilitiesMap.forEach((name, capability) -> {
logger.trace("Extracted type for [{}] : [{}]", fieldName, capability.getType());
extractedTypes.put(fieldName, capability.getType());
});
});
response.get()
.forEach(
(fieldName, capabilitiesMap) -> {
// TODO: overwrites types, requires resolve if
// types are mixed
capabilitiesMap.forEach((name, capability) -> {
logger.trace("Extracted type for [{}] : [{}]", fieldName, capability.getType());
extractedTypes.put(fieldName, capability.getType());
});
}
);
return extractedTypes;
}

/**
* Insert object mappings for fields like:
*
* a.b.c : some_type
*
* in which case it creates additional mappings:
*
* a.b : object
* a : object
*
* avoids snafu with index templates injecting incompatible mappings
*
* @param fieldMappings field mappings to inject to
*/
static void insertNestedObjectMappings(Map<String, String> fieldMappings) {
Map<String, String> additionalMappings = new HashMap<>();
fieldMappings.keySet().stream().filter(key -> key.contains(".")).forEach(key -> {
int pos;
String objectKey = key;
// lastIndexOf returns -1 on mismatch, but to disallow empty strings check for > 0
while ((pos = objectKey.lastIndexOf(".")) > 0) {
objectKey = objectKey.substring(0, pos);
additionalMappings.putIfAbsent(objectKey, "object");
}
});

additionalMappings.forEach(fieldMappings::putIfAbsent);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.transform.transforms.pivot;

import org.elasticsearch.test.ESTestCase;

import java.util.HashMap;
import java.util.Map;

public class SchemaUtilTests extends ESTestCase {

public void testInsertNestedObjectMappings() {
Map<String, String> fieldMappings = new HashMap<>();

// creates: a.b, a
fieldMappings.put("a.b.c", "long");
fieldMappings.put("a.b.d", "double");
// creates: c.b, c
fieldMappings.put("c.b.a", "double");
// creates: c.d
fieldMappings.put("c.d.e", "object");
fieldMappings.put("d", "long");
fieldMappings.put("e.f.g", "long");
// cc: already there
fieldMappings.put("e.f", "object");
// cc: already there but different type (should not be possible)
fieldMappings.put("e", "long");
// cc: start with . (should not be possible)
fieldMappings.put(".x", "long");
// cc: start and ends with . (should not be possible), creates: .y
fieldMappings.put(".y.", "long");
// cc: ends with . (should not be possible), creates: .z
fieldMappings.put(".z.", "long");

SchemaUtil.insertNestedObjectMappings(fieldMappings);

assertEquals(18, fieldMappings.size());
assertEquals("long", fieldMappings.get("a.b.c"));
assertEquals("object", fieldMappings.get("a.b"));
assertEquals("double", fieldMappings.get("a.b.d"));
assertEquals("object", fieldMappings.get("a"));
assertEquals("object", fieldMappings.get("c.d"));
assertEquals("object", fieldMappings.get("e.f"));
assertEquals("long", fieldMappings.get("e"));
assertEquals("object", fieldMappings.get(".y"));
assertEquals("object", fieldMappings.get(".z"));
assertFalse(fieldMappings.containsKey("."));
assertFalse(fieldMappings.containsKey(""));
}

}