diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DestConfig.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DestConfig.java index 5e81a368f66b4..f808fa867209a 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DestConfig.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DestConfig.java @@ -28,6 +28,7 @@ import java.util.Objects; import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; +import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg; /** * Configuration containing the destination index for the {@link DataFrameTransformConfig} @@ -35,29 +36,40 @@ public class DestConfig implements ToXContentObject { public static final ParseField INDEX = new ParseField("index"); + public static final ParseField PIPELINE = new ParseField("pipeline"); public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>("data_frame_config_dest", true, - args -> new DestConfig((String)args[0])); + args -> new DestConfig((String)args[0], (String)args[1])); static { PARSER.declareString(constructorArg(), INDEX); + PARSER.declareString(optionalConstructorArg(), PIPELINE); } private final String index; + private final String pipeline; - public DestConfig(String index) { + DestConfig(String index, String pipeline) { this.index = Objects.requireNonNull(index, INDEX.getPreferredName()); + this.pipeline = pipeline; } public String getIndex() { return index; } + public String getPipeline() { + return pipeline; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); builder.field(INDEX.getPreferredName(), index); + if (pipeline != null) { + builder.field(PIPELINE.getPreferredName(), pipeline); + } builder.endObject(); return builder; } @@ -72,11 +84,45 @@ public boolean equals(Object other) { } DestConfig that = (DestConfig) other; - return Objects.equals(index, that.index); + return Objects.equals(index, that.index) && + Objects.equals(pipeline, that.pipeline); } @Override public int hashCode(){ - return Objects.hash(index); + return Objects.hash(index, pipeline); + } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + private String index; + private String pipeline; + + /** + * Sets which index to which to write the data + * @param index where to write the data + * @return The {@link Builder} with index set + */ + public Builder setIndex(String index) { + this.index = Objects.requireNonNull(index, INDEX.getPreferredName()); + return this; + } + + /** + * Sets the pipeline through which the indexed documents should be processed + * @param pipeline The pipeline ID + * @return The {@link Builder} with pipeline set + */ + public Builder setPipeline(String pipeline) { + this.pipeline = pipeline; + return this; + } + + public DestConfig build() { + return new DestConfig(index, pipeline); + } } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java index 5e00dfb8ed3c9..17d423ba4fdd8 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java @@ -307,7 +307,7 @@ private DataFrameTransformConfig validDataFrameTransformConfig(String id, String aggBuilder.addAggregator(AggregationBuilders.avg("avg_rating").field("stars")); PivotConfig pivotConfig = PivotConfig.builder().setGroups(groupConfig).setAggregations(aggBuilder).build(); - DestConfig destConfig = (destination != null) ? new DestConfig(destination) : null; + DestConfig destConfig = (destination != null) ? DestConfig.builder().setIndex(destination).build() : null; return DataFrameTransformConfig.builder() .setId(id) @@ -333,7 +333,7 @@ public void testGetStats() throws Exception { DataFrameTransformConfig transform = DataFrameTransformConfig.builder() .setId(id) .setSource(SourceConfig.builder().setIndex(sourceIndex).setQuery(new MatchAllQueryBuilder()).build()) - .setDest(new DestConfig("pivot-dest")) + .setDest(DestConfig.builder().setIndex("pivot-dest").build()) .setPivotConfig(pivotConfig) .setDescription("transform for testing stats") .build(); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DestConfigTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DestConfigTests.java index f2950b64cf7c9..0dc8f99d7631b 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DestConfigTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DestConfigTests.java @@ -27,7 +27,8 @@ public class DestConfigTests extends AbstractXContentTestCase { public static DestConfig randomDestConfig() { - return new DestConfig(randomAlphaOfLength(10)); + return new DestConfig(randomAlphaOfLength(10), + randomBoolean() ? null : randomAlphaOfLength(10)); } @Override diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java index f0dfda1d58902..6604e97ed5b97 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java @@ -125,6 +125,11 @@ public void testPutDataFrameTransform() throws IOException, InterruptedException .setIndex("source-index") .setQueryConfig(queryConfig).build(); // end::put-data-frame-transform-source-config + // tag::put-data-frame-transform-dest-config + DestConfig destConfig = DestConfig.builder() + .setIndex("pivot-destination") + .setPipeline("my-pipeline").build(); + // end::put-data-frame-transform-dest-config // tag::put-data-frame-transform-group-config GroupConfig groupConfig = GroupConfig.builder() .groupBy("reviewer", // <1> @@ -149,7 +154,7 @@ public void testPutDataFrameTransform() throws IOException, InterruptedException .builder() .setId("reviewer-avg-rating") // <1> .setSource(sourceConfig) // <2> - .setDest(new DestConfig("pivot-destination")) // <3> + .setDest(destConfig) // <3> .setPivotConfig(pivotConfig) // <4> .setDescription("This is my test transform") // <5> .build(); @@ -222,7 +227,7 @@ public void testStartStop() throws IOException, InterruptedException { DataFrameTransformConfig transformConfig = DataFrameTransformConfig.builder() .setId("mega-transform") .setSource(SourceConfig.builder().setIndex("source-data").setQueryConfig(queryConfig).build()) - .setDest(new DestConfig("pivot-dest")) + .setDest(DestConfig.builder().setIndex("pivot-dest").build()) .setPivotConfig(pivotConfig) .build(); @@ -344,7 +349,7 @@ public void testDeleteDataFrameTransform() throws IOException, InterruptedExcept .setIndex("source-data") .setQuery(new MatchAllQueryBuilder()) .build()) - .setDest(new DestConfig("pivot-dest")) + .setDest(DestConfig.builder().setIndex("pivot-dest").build()) .setPivotConfig(pivotConfig) .build(); DataFrameTransformConfig transformConfig2 = DataFrameTransformConfig.builder() @@ -353,7 +358,7 @@ public void testDeleteDataFrameTransform() throws IOException, InterruptedExcept .setIndex("source-data") .setQuery(new MatchAllQueryBuilder()) .build()) - .setDest(new DestConfig("pivot-dest2")) + .setDest(DestConfig.builder().setIndex("pivot-dest2").build()) .setPivotConfig(pivotConfig) .build(); @@ -488,7 +493,7 @@ public void testGetStats() throws IOException, InterruptedException { .setIndex("source-data") .setQuery(new MatchAllQueryBuilder()) .build()) - .setDest(new DestConfig("pivot-dest")) + .setDest(DestConfig.builder().setIndex("pivot-dest").build()) .setPivotConfig(pivotConfig) .build(); client.dataFrame().putDataFrameTransform(new PutDataFrameTransformRequest(transformConfig), RequestOptions.DEFAULT); @@ -574,7 +579,7 @@ public void testGetDataFrameTransform() throws IOException, InterruptedException .setIndex("source-data") .setQuery(new MatchAllQueryBuilder()) .build()) - .setDest(new DestConfig("pivot-dest")) + .setDest(DestConfig.builder().setIndex("pivot-dest").build()) .setPivotConfig(pivotConfig) .build(); diff --git a/docs/java-rest/high-level/dataframe/put_data_frame.asciidoc b/docs/java-rest/high-level/dataframe/put_data_frame.asciidoc index 567449c9c25b1..19c7fe443dbcd 100644 --- a/docs/java-rest/high-level/dataframe/put_data_frame.asciidoc +++ b/docs/java-rest/high-level/dataframe/put_data_frame.asciidoc @@ -33,7 +33,7 @@ include-tagged::{doc-tests-file}[{api}-config] -------------------------------------------------- <1> The {dataframe-transform} ID <2> The source indices and query from which to gather data -<3> The destination index +<3> The destination index and optional pipeline <4> The PivotConfig <5> Optional free text description of the transform @@ -49,6 +49,16 @@ If query is not set, a `match_all` query is used by default. include-tagged::{doc-tests-file}[{api}-source-config] -------------------------------------------------- +==== DestConfig + +The index where to write the data and the optional pipeline +through which the docs should be indexed + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests-file}[{api}-dest-config] +-------------------------------------------------- + ===== QueryConfig The query with which to select data from the source. diff --git a/docs/reference/data-frames/apis/put-transform.asciidoc b/docs/reference/data-frames/apis/put-transform.asciidoc index 428321aa0305f..10000126ef002 100644 --- a/docs/reference/data-frames/apis/put-transform.asciidoc +++ b/docs/reference/data-frames/apis/put-transform.asciidoc @@ -38,7 +38,8 @@ IMPORTANT: You must use {kib} or this API to create a {dataframe-transform}. `source` (required):: (object) The source configuration, consisting of `index` and optionally a `query`. -`dest` (required):: (object) The destination configuration, consisting of `index`. +`dest` (required):: (object) The destination configuration, consisting of `index` and optionally a +`pipeline` id. `pivot`:: (object) Defines the pivot function `group by` fields and the aggregation to reduce the data. See <>. @@ -76,7 +77,8 @@ PUT _data_frame/transforms/ecommerce_transform } }, "dest": { - "index": "kibana_sample_data_ecommerce_transform" + "index": "kibana_sample_data_ecommerce_transform", + "pipeline": "add_timestamp_pipeline" }, "pivot": { "group_by": { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/PreviewDataFrameTransformAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/PreviewDataFrameTransformAction.java index 90e0adba16269..7a2e05798908c 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/PreviewDataFrameTransformAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/PreviewDataFrameTransformAction.java @@ -24,10 +24,11 @@ import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.xpack.core.dataframe.DataFrameField; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig; +import org.elasticsearch.xpack.core.dataframe.transforms.DestConfig; import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -66,8 +67,20 @@ public Request(StreamInput in) throws IOException { public static Request fromXContent(final XContentParser parser) throws IOException { Map content = parser.map(); - // Destination and ID are not required for Preview, so we just supply our own - content.put(DataFrameField.DESTINATION.getPreferredName(), Collections.singletonMap("index", "unused-transform-preview-index")); + // dest.index and ID are not required for Preview, so we just supply our own + Map tempDestination = new HashMap<>(); + tempDestination.put(DestConfig.INDEX.getPreferredName(), "unused-transform-preview-index"); + // Users can still provide just dest.pipeline to preview what their data would look like given the pipeline ID + Object providedDestination = content.get(DataFrameField.DESTINATION.getPreferredName()); + if (providedDestination instanceof Map) { + @SuppressWarnings("unchecked") + Map destMap = (Map)providedDestination; + String pipeline = destMap.get(DestConfig.PIPELINE.getPreferredName()); + if (pipeline != null) { + tempDestination.put(DestConfig.PIPELINE.getPreferredName(), pipeline); + } + } + content.put(DataFrameField.DESTINATION.getPreferredName(), tempDestination); content.put(DataFrameField.ID.getPreferredName(), "transform-preview"); try(XContentBuilder xContentBuilder = XContentFactory.jsonBuilder().map(content); XContentParser newParser = XContentType.JSON diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DestConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DestConfig.java index 29b2b8c5dc0e1..282a3f9a04484 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DestConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DestConfig.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.core.dataframe.transforms; +import org.elasticsearch.Version; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -20,10 +21,12 @@ import java.util.Objects; import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; +import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg; public class DestConfig implements Writeable, ToXContentObject { public static final ParseField INDEX = new ParseField("index"); + public static final ParseField PIPELINE = new ParseField("pipeline"); public static final ConstructingObjectParser STRICT_PARSER = createParser(false); public static final ConstructingObjectParser LENIENT_PARSER = createParser(true); @@ -31,25 +34,37 @@ public class DestConfig implements Writeable, ToXContentObject { private static ConstructingObjectParser createParser(boolean lenient) { ConstructingObjectParser parser = new ConstructingObjectParser<>("data_frame_config_dest", lenient, - args -> new DestConfig((String)args[0])); + args -> new DestConfig((String)args[0], (String) args[1])); parser.declareString(constructorArg(), INDEX); + parser.declareString(optionalConstructorArg(), PIPELINE); return parser; } private final String index; + private final String pipeline; - public DestConfig(String index) { + public DestConfig(String index, String pipeline) { this.index = ExceptionsHelper.requireNonNull(index, INDEX.getPreferredName()); + this.pipeline = pipeline; } public DestConfig(final StreamInput in) throws IOException { index = in.readString(); + if (in.getVersion().onOrAfter(Version.CURRENT)) { + pipeline = in.readOptionalString(); + } else { + pipeline = null; + } } public String getIndex() { return index; } + public String getPipeline() { + return pipeline; + } + public boolean isValid() { return index.isEmpty() == false; } @@ -57,12 +72,18 @@ public boolean isValid() { @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(index); + if (out.getVersion().onOrAfter(Version.CURRENT)) { + out.writeOptionalString(pipeline); + } } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); builder.field(INDEX.getPreferredName(), index); + if (pipeline != null) { + builder.field(PIPELINE.getPreferredName(), pipeline); + } builder.endObject(); return builder; } @@ -77,12 +98,13 @@ public boolean equals(Object other) { } DestConfig that = (DestConfig) other; - return Objects.equals(index, that.index); + return Objects.equals(index, that.index) && + Objects.equals(pipeline, that.pipeline); } @Override public int hashCode(){ - return Objects.hash(index); + return Objects.hash(index, pipeline); } public static DestConfig fromXContent(final XContentParser parser, boolean lenient) throws IOException { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/PreviewDataFrameTransformActionRequestTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/PreviewDataFrameTransformActionRequestTests.java index 0cfc659e50646..c3a921a90d26b 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/PreviewDataFrameTransformActionRequestTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/PreviewDataFrameTransformActionRequestTests.java @@ -40,7 +40,7 @@ protected boolean supportsUnknownFields() { @Override protected Request createTestInstance() { DataFrameTransformConfig config = new DataFrameTransformConfig("transform-preview", randomSourceConfig(), - new DestConfig("unused-transform-preview-index"), + new DestConfig("unused-transform-preview-index", null), null, PivotConfigTests.randomPivotConfig(), null); return new Request(config); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DestConfigTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DestConfigTests.java index b29fa46c34ede..094267ba4adea 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DestConfigTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DestConfigTests.java @@ -17,7 +17,8 @@ public class DestConfigTests extends AbstractSerializingDataFrameTestCase indexStats = getAsMap(dataFrameIndex + "/_stats"); + assertEquals(27, XContentMapValues.extractValue("_all.total.docs.count", indexStats)); + + // get and check some users + assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_0", 3.776978417); + assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_5", 3.72); + assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_11", 3.846153846); + assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_20", 3.769230769); + assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_26", 3.918918918); + + Map searchResult = getAsMap(dataFrameIndex + "/_search?q=reviewer:user_0"); + Integer actual = (Integer) ((List) XContentMapValues.extractValue("hits.hits._source.pipeline_field", searchResult)).get(0); + assertThat(actual, equalTo(pipelineValue)); + } + public void testHistogramPivot() throws Exception { String transformId = "simple_histogram_pivot"; String dataFrameIndex = "pivot_reviews_via_histogram"; @@ -138,38 +178,38 @@ public void testBiggerPivot() throws Exception { + " \"dest\": {\"index\":\"" + dataFrameIndex + "\"},"; config += " \"pivot\": {" - + " \"group_by\": {" - + " \"reviewer\": {" - + " \"terms\": {" - + " \"field\": \"user_id\"" - + " } } }," - + " \"aggregations\": {" - + " \"avg_rating\": {" - + " \"avg\": {" - + " \"field\": \"stars\"" - + " } }," - + " \"sum_rating\": {" - + " \"sum\": {" - + " \"field\": \"stars\"" - + " } }," - + " \"cardinality_business\": {" - + " \"cardinality\": {" - + " \"field\": \"business_id\"" - + " } }," - + " \"min_rating\": {" - + " \"min\": {" - + " \"field\": \"stars\"" - + " } }," - + " \"max_rating\": {" - + " \"max\": {" - + " \"field\": \"stars\"" - + " } }," - + " \"count\": {" - + " \"value_count\": {" - + " \"field\": \"business_id\"" - + " } }" - + " } }" - + "}"; + + " \"group_by\": {" + + " \"reviewer\": {" + + " \"terms\": {" + + " \"field\": \"user_id\"" + + " } } }," + + " \"aggregations\": {" + + " \"avg_rating\": {" + + " \"avg\": {" + + " \"field\": \"stars\"" + + " } }," + + " \"sum_rating\": {" + + " \"sum\": {" + + " \"field\": \"stars\"" + + " } }," + + " \"cardinality_business\": {" + + " \"cardinality\": {" + + " \"field\": \"business_id\"" + + " } }," + + " \"min_rating\": {" + + " \"min\": {" + + " \"field\": \"stars\"" + + " } }," + + " \"max_rating\": {" + + " \"max\": {" + + " \"field\": \"stars\"" + + " } }," + + " \"count\": {" + + " \"value_count\": {" + + " \"field\": \"business_id\"" + + " } }" + + " } }" + + "}"; createDataframeTransformRequest.setJsonEntity(config); Map createDataframeTransformResponse = entityAsMap(client().performRequest(createDataframeTransformRequest)); @@ -260,7 +300,7 @@ public void testPreviewTransform() throws Exception { createPreviewRequest.setJsonEntity(config); Map previewDataframeResponse = entityAsMap(client().performRequest(createPreviewRequest)); - List> preview = (List>)previewDataframeResponse.get("preview"); + List> preview = (List>) previewDataframeResponse.get("preview"); // preview is limited to 100 assertThat(preview.size(), equalTo(100)); Set expectedTopLevelFields = new HashSet<>(Arrays.asList("user", "by_day")); @@ -268,6 +308,57 @@ public void testPreviewTransform() throws Exception { preview.forEach(p -> { Set keys = p.keySet(); assertThat(keys, equalTo(expectedTopLevelFields)); + Map nestedObj = (Map) p.get("user"); + keys = nestedObj.keySet(); + assertThat(keys, equalTo(expectedNestedFields)); + }); + } + + @SuppressWarnings("unchecked") + public void testPreviewTransformWithPipeline() throws Exception { + String pipelineId = "my-preview-pivot-pipeline"; + int pipelineValue = 42; + Request pipelineRequest = new Request("PUT", "/_ingest/pipeline/" + pipelineId); + pipelineRequest.setJsonEntity("{\n" + + " \"description\" : \"my pivot preview pipeline\",\n" + + " \"processors\" : [\n" + + " {\n" + + " \"set\" : {\n" + + " \"field\": \"pipeline_field\",\n" + + " \"value\": " + pipelineValue + + " }\n" + + " }\n" + + " ]\n" + + "}"); + client().performRequest(pipelineRequest); + + setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME); + final Request createPreviewRequest = createRequestWithAuth("POST", DATAFRAME_ENDPOINT + "_preview", null); + + String config = "{ \"source\": {\"index\":\"" + REVIEWS_INDEX_NAME + "\"} ," + + "\"dest\": {\"pipeline\": \"" + pipelineId + "\"}," + + " \"pivot\": {" + + " \"group_by\": {" + + " \"user.id\": {\"terms\": { \"field\": \"user_id\" }}," + + " \"by_day\": {\"date_histogram\": {\"fixed_interval\": \"1d\",\"field\":\"timestamp\",\"format\":\"yyyy-MM-dd\"}}}," + + " \"aggregations\": {" + + " \"user.avg_rating\": {" + + " \"avg\": {" + + " \"field\": \"stars\"" + + " } } } }" + + "}"; + createPreviewRequest.setJsonEntity(config); + + Map previewDataframeResponse = entityAsMap(client().performRequest(createPreviewRequest)); + List> preview = (List>)previewDataframeResponse.get("preview"); + // preview is limited to 100 + assertThat(preview.size(), equalTo(100)); + Set expectedTopLevelFields = new HashSet<>(Arrays.asList("user", "by_day", "pipeline_field")); + Set expectedNestedFields = new HashSet<>(Arrays.asList("id", "avg_rating")); + preview.forEach(p -> { + Set keys = p.keySet(); + assertThat(keys, equalTo(expectedTopLevelFields)); + assertThat(p.get("pipeline_field"), equalTo(pipelineValue)); Map nestedObj = (Map)p.get("user"); keys = nestedObj.keySet(); assertThat(keys, equalTo(expectedNestedFields)); diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java index bb82b6a040478..30435a8490328 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java @@ -147,12 +147,23 @@ protected void createPivotReviewsTransform(String transformId, String dataFrameI createPivotReviewsTransform(transformId, dataFrameIndex, query, null); } - protected void createPivotReviewsTransform(String transformId, String dataFrameIndex, String query, String authHeader) + protected void createPivotReviewsTransform(String transformId, String dataFrameIndex, String query, String pipeline) + throws IOException { + createPivotReviewsTransform(transformId, dataFrameIndex, query, pipeline, null); + } + + + protected void createPivotReviewsTransform(String transformId, String dataFrameIndex, String query, String pipeline, String authHeader) throws IOException { final Request createDataframeTransformRequest = createRequestWithAuth("PUT", DATAFRAME_ENDPOINT + transformId, authHeader); - String config = "{" - + " \"dest\": {\"index\":\"" + dataFrameIndex + "\"},"; + String config = "{"; + + if (pipeline != null) { + config += " \"dest\": {\"index\":\"" + dataFrameIndex + "\", \"pipeline\":\"" + pipeline + "\"},"; + } else { + config += " \"dest\": {\"index\":\"" + dataFrameIndex + "\"},"; + } if (query != null) { config += " \"source\": {\"index\":\"" + REVIEWS_INDEX_NAME + "\", \"query\":{" + query + "}},"; diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTransformProgressIT.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTransformProgressIT.java index 7f320ff9aaf27..3efe8c4d012bb 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTransformProgressIT.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTransformProgressIT.java @@ -119,7 +119,7 @@ protected void createReviewsIndex() throws Exception { public void testGetProgress() throws Exception { createReviewsIndex(); SourceConfig sourceConfig = new SourceConfig(REVIEWS_INDEX_NAME); - DestConfig destConfig = new DestConfig("unnecessary"); + DestConfig destConfig = new DestConfig("unnecessary", null); GroupConfig histgramGroupConfig = new GroupConfig(Collections.emptyMap(), Collections.singletonMap("every_50", new HistogramGroupSource("count", 50.0))); AggregatorFactories.Builder aggs = new AggregatorFactories.Builder(); diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPreviewDataFrameTransformAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPreviewDataFrameTransformAction.java index dde9edb37e55c..d9d4b45be332e 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPreviewDataFrameTransformAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPreviewDataFrameTransformAction.java @@ -6,8 +6,13 @@ package org.elasticsearch.xpack.dataframe.action; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ingest.SimulatePipelineAction; +import org.elasticsearch.action.ingest.SimulatePipelineRequest; +import org.elasticsearch.action.ingest.SimulatePipelineResponse; import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; @@ -16,7 +21,14 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.license.LicenseUtils; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.rest.RestStatus; @@ -26,6 +38,7 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.XPackField; +import org.elasticsearch.xpack.core.dataframe.DataFrameField; import org.elasticsearch.xpack.core.dataframe.DataFrameMessages; import org.elasticsearch.xpack.core.dataframe.action.PreviewDataFrameTransformAction; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats; @@ -34,15 +47,19 @@ import org.elasticsearch.xpack.dataframe.transforms.pivot.AggregationResultUtils; import org.elasticsearch.xpack.dataframe.transforms.pivot.Pivot; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.xpack.dataframe.transforms.DataFrameIndexer.COMPOSITE_AGGREGATION_NAME; public class TransportPreviewDataFrameTransformAction extends HandledTransportAction { + private static final Logger logger = LogManager.getLogger(TransportPreviewDataFrameTransformAction.class); private static final int NUMBER_OF_PREVIEW_BUCKETS = 100; private final XPackLicenseState licenseState; private final Client client; @@ -87,13 +104,41 @@ protected void doExecute(Task task, Pivot pivot = new Pivot(config.getPivotConfig()); - getPreview(pivot, config.getSource(), ActionListener.wrap( - previewResponse -> listener.onResponse(new PreviewDataFrameTransformAction.Response(previewResponse)), - listener::onFailure + getPreview(pivot, + config.getSource(), + config.getDestination().getPipeline(), + config.getDestination().getIndex(), + ActionListener.wrap( + previewResponse -> listener.onResponse(new PreviewDataFrameTransformAction.Response(previewResponse)), + error -> { + logger.error("Failure gathering preview", error); + listener.onFailure(error); + } )); } - private void getPreview(Pivot pivot, SourceConfig source, ActionListener>> listener) { + @SuppressWarnings("unchecked") + private void getPreview(Pivot pivot, + SourceConfig source, + String pipeline, + String dest, + ActionListener>> listener) { + ActionListener pipelineResponseActionListener = ActionListener.wrap( + simulatePipelineResponse -> { + List> response = new ArrayList<>(simulatePipelineResponse.getResults().size()); + for(var simulateDocumentResult : simulatePipelineResponse.getResults()) { + try(XContentBuilder xContentBuilder = XContentFactory.jsonBuilder()) { + XContentBuilder content = simulateDocumentResult.toXContent(xContentBuilder, ToXContent.EMPTY_PARAMS); + Map tempMap = XContentHelper.convertToMap(BytesReference.bytes(content), + true, + XContentType.JSON).v2(); + response.add((Map)XContentMapValues.extractValue("doc._source", tempMap)); + } + } + listener.onResponse(response); + }, + listener::onFailure + ); pivot.deduceMappings(client, source, ActionListener.wrap( deducedMappings -> { ClientHelper.executeWithHeadersAsync(threadPool.getThreadContext().getHeaders(), @@ -103,17 +148,40 @@ private void getPreview(Pivot pivot, SourceConfig source, ActionListener { - try { final CompositeAggregation agg = r.getAggregations().get(COMPOSITE_AGGREGATION_NAME); DataFrameIndexerTransformStats stats = DataFrameIndexerTransformStats.withDefaultTransformId(); // remove all internal fields - List> results = pivot.extractResults(agg, deducedMappings, stats) - .peek(record -> { - record.keySet().removeIf(k -> k.startsWith("_")); - }).collect(Collectors.toList()); - listener.onResponse(results); + if (pipeline == null) { + List> results = pivot.extractResults(agg, deducedMappings, stats) + .peek(doc -> doc.keySet().removeIf(k -> k.startsWith("_"))) + .collect(Collectors.toList()); + listener.onResponse(results); + } else { + List> results = pivot.extractResults(agg, deducedMappings, stats) + .map(doc -> { + Map src = new HashMap<>(); + String id = (String) doc.get(DataFrameField.DOCUMENT_ID_FIELD); + doc.keySet().removeIf(k -> k.startsWith("_")); + src.put("_source", doc); + src.put("_id", id); + src.put("_index", dest); + return src; + }).collect(Collectors.toList()); + try (XContentBuilder builder = jsonBuilder()) { + builder.startObject(); + builder.field("docs", results); + builder.endObject(); + var pipelineRequest = new SimulatePipelineRequest(BytesReference.bytes(builder), XContentType.JSON); + pipelineRequest.setId(pipeline); + ClientHelper.executeAsyncWithOrigin(client, + ClientHelper.DATA_FRAME_ORIGIN, + SimulatePipelineAction.INSTANCE, + pipelineRequest, + pipelineResponseActionListener); + } + } } catch (AggregationResultUtils.AggregationExtractionException extractionException) { listener.onFailure( new ElasticsearchStatusException(extractionException.getMessage(), RestStatus.BAD_REQUEST)); diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java index f2fc71da7f059..bb23bc9878f4c 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java @@ -179,6 +179,9 @@ private Stream processBucketsToIndexRequests(CompositeAggregation } IndexRequest request = new IndexRequest(indexName).source(builder).id(id); + if (transformConfig.getDestination().getPipeline() != null) { + request.setPipeline(transformConfig.getDestination().getPipeline()); + } return request; }); } diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/preview_transforms.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/preview_transforms.yml index 7b5c4e8cb5664..98ef4039eafe4 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/preview_transforms.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/preview_transforms.yml @@ -99,6 +99,49 @@ setup: - match: { preview.2.time.max: "2017-02-18T01:01:00.000Z" } - match: { preview.2.time.min: "2017-02-18T01:01:00.000Z" } + - do: + ingest.put_pipeline: + id: "data_frame_simple_pipeline" + body: > + { + "processors": [ + { + "set" : { + "field" : "my_field", + "value": 42 + } + } + ] + } + - match: { acknowledged: true } + - do: + data_frame.preview_data_frame_transform: + body: > + { + "source": { "index": "airline-data" }, + "dest": { "pipeline": "data_frame_simple_pipeline" }, + "pivot": { + "group_by": { + "airline": {"terms": {"field": "airline"}}, + "by-hour": {"date_histogram": {"fixed_interval": "1h", "field": "time", "format": "yyyy-MM-dd HH"}}}, + "aggs": { + "avg_response": {"avg": {"field": "responsetime"}} + } + } + } + - match: { preview.0.airline: foo } + - match: { preview.0.by-hour: "2017-02-18 00" } + - match: { preview.0.avg_response: 1.0 } + - match: { preview.0.my_field: 42 } + - match: { preview.1.airline: bar } + - match: { preview.1.by-hour: "2017-02-18 01" } + - match: { preview.1.avg_response: 42.0 } + - match: { preview.1.my_field: 42 } + - match: { preview.2.airline: foo } + - match: { preview.2.by-hour: "2017-02-18 01" } + - match: { preview.2.avg_response: 42.0 } + - match: { preview.2.my_field: 42 } + --- "Test preview transform with invalid config": - do: @@ -127,7 +170,6 @@ setup: "aggs": {"avg_response": {"avg": {"field": "responsetime"}}} } } - --- "Test preview returns bad request with invalid agg": - do: @@ -161,4 +203,21 @@ setup: } } } - +--- +"Test preview with missing pipeline": + - do: + catch: bad_request + data_frame.preview_data_frame_transform: + body: > + { + "source": { "index": "airline-data" }, + "dest": { "pipeline": "missing-pipeline" }, + "pivot": { + "group_by": { + "time": {"date_histogram": {"fixed_interval": "1h", "field": "time", "format": "yyyy-MM-DD HH"}}}, + "aggs": { + "avg_response": {"avg": {"field": "responsetime"}}, + "time.min": {"min": {"field": "time"}} + } + } + } diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_crud.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_crud.yml index ffba67f879145..c57de030ae74d 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_crud.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_crud.yml @@ -74,7 +74,7 @@ setup: body: > { "source": { "index": "airline-data" }, - "dest": { "index": "airline-data-by-airline-again" }, + "dest": { "index": "airline-data-by-airline-again", "pipeline": "airline-pipeline" }, "pivot": { "group_by": { "airline": {"terms": {"field": "airline"}}}, "aggs": {"avg_response": {"avg": {"field": "responsetime"}}}