Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML][Data Frame] adds new pipeline field to dest config #43124

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -28,36 +28,48 @@
import java.util.Objects;

import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;

/**
* Configuration containing the destination index for the {@link DataFrameTransformConfig}
*/
public class DestConfig implements ToXContentObject {

public static final ParseField INDEX = new ParseField("index");
public static final ParseField PIPELINE = new ParseField("pipeline");

public static final ConstructingObjectParser<DestConfig, Void> PARSER = new ConstructingObjectParser<>("data_frame_config_dest",
true,
args -> new DestConfig((String)args[0]));
args -> new DestConfig((String)args[0], (String)args[1]));

static {
PARSER.declareString(constructorArg(), INDEX);
PARSER.declareString(optionalConstructorArg(), PIPELINE);
}

private final String index;
private final String pipeline;

public DestConfig(String index) {
DestConfig(String index, String pipeline) {
this.index = Objects.requireNonNull(index, INDEX.getPreferredName());
this.pipeline = pipeline;
}

public String getIndex() {
return index;
}

public String getPipeline() {
return pipeline;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(INDEX.getPreferredName(), index);
if (pipeline != null) {
builder.field(PIPELINE.getPreferredName(), pipeline);
}
builder.endObject();
return builder;
}
Expand All @@ -72,11 +84,45 @@ public boolean equals(Object other) {
}

DestConfig that = (DestConfig) other;
return Objects.equals(index, that.index);
return Objects.equals(index, that.index) &&
Objects.equals(pipeline, that.pipeline);
}

@Override
public int hashCode(){
return Objects.hash(index);
return Objects.hash(index, pipeline);
}

public static Builder builder() {
return new Builder();
}

public static class Builder {
private String index;
private String pipeline;

/**
* Sets which index to which to write the data
* @param index where to write the data
* @return The {@link Builder} with index set
*/
public Builder setIndex(String index) {
this.index = index;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe Objects.requireNonNull(index) here so the error that the constructor will throw is traced to the root cause.

return this;
}

/**
* Sets the pipeline through which the indexed documents should be processed
* @param pipeline The pipeline ID
* @return The {@link Builder} with pipeline set
*/
public Builder setPipeline(String pipeline) {
this.pipeline = pipeline;
return this;
}

public DestConfig build() {
return new DestConfig(index, pipeline);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ private DataFrameTransformConfig validDataFrameTransformConfig(String id, String
aggBuilder.addAggregator(AggregationBuilders.avg("avg_rating").field("stars"));
PivotConfig pivotConfig = PivotConfig.builder().setGroups(groupConfig).setAggregations(aggBuilder).build();

DestConfig destConfig = (destination != null) ? new DestConfig(destination) : null;
DestConfig destConfig = (destination != null) ? DestConfig.builder().setIndex(destination).build() : null;

return DataFrameTransformConfig.builder()
.setId(id)
Expand All @@ -333,7 +333,7 @@ public void testGetStats() throws Exception {
DataFrameTransformConfig transform = DataFrameTransformConfig.builder()
.setId(id)
.setSource(SourceConfig.builder().setIndex(sourceIndex).setQuery(new MatchAllQueryBuilder()).build())
.setDest(new DestConfig("pivot-dest"))
.setDest(DestConfig.builder().setIndex("pivot-dest").build())
.setPivotConfig(pivotConfig)
.setDescription("transform for testing stats")
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
public class DestConfigTests extends AbstractXContentTestCase<DestConfig> {

public static DestConfig randomDestConfig() {
return new DestConfig(randomAlphaOfLength(10));
return new DestConfig(randomAlphaOfLength(10),
randomBoolean() ? null : randomAlphaOfLength(10));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,11 @@ public void testPutDataFrameTransform() throws IOException, InterruptedException
.setIndex("source-index")
.setQueryConfig(queryConfig).build();
// end::put-data-frame-transform-source-config
// tag::put-data-frame-transform-dest-config
DestConfig destConfig = DestConfig.builder()
.setIndex("pivot-destination")
.setPipeline("my-pipeline").build();
// end::put-data-frame-transform-dest-config
// tag::put-data-frame-transform-group-config
GroupConfig groupConfig = GroupConfig.builder()
.groupBy("reviewer", // <1>
Expand All @@ -149,7 +154,7 @@ public void testPutDataFrameTransform() throws IOException, InterruptedException
.builder()
.setId("reviewer-avg-rating") // <1>
.setSource(sourceConfig) // <2>
.setDest(new DestConfig("pivot-destination")) // <3>
.setDest(destConfig) // <3>
.setPivotConfig(pivotConfig) // <4>
.setDescription("This is my test transform") // <5>
.build();
Expand Down Expand Up @@ -222,7 +227,7 @@ public void testStartStop() throws IOException, InterruptedException {
DataFrameTransformConfig transformConfig = DataFrameTransformConfig.builder()
.setId("mega-transform")
.setSource(SourceConfig.builder().setIndex("source-data").setQueryConfig(queryConfig).build())
.setDest(new DestConfig("pivot-dest"))
.setDest(DestConfig.builder().setIndex("pivot-dest").build())
.setPivotConfig(pivotConfig)
.build();

Expand Down Expand Up @@ -344,7 +349,7 @@ public void testDeleteDataFrameTransform() throws IOException, InterruptedExcept
.setIndex("source-data")
.setQuery(new MatchAllQueryBuilder())
.build())
.setDest(new DestConfig("pivot-dest"))
.setDest(DestConfig.builder().setIndex("pivot-dest").build())
.setPivotConfig(pivotConfig)
.build();
DataFrameTransformConfig transformConfig2 = DataFrameTransformConfig.builder()
Expand All @@ -353,7 +358,7 @@ public void testDeleteDataFrameTransform() throws IOException, InterruptedExcept
.setIndex("source-data")
.setQuery(new MatchAllQueryBuilder())
.build())
.setDest(new DestConfig("pivot-dest2"))
.setDest(DestConfig.builder().setIndex("pivot-dest2").build())
.setPivotConfig(pivotConfig)
.build();

Expand Down Expand Up @@ -488,7 +493,7 @@ public void testGetStats() throws IOException, InterruptedException {
.setIndex("source-data")
.setQuery(new MatchAllQueryBuilder())
.build())
.setDest(new DestConfig("pivot-dest"))
.setDest(DestConfig.builder().setIndex("pivot-dest").build())
.setPivotConfig(pivotConfig)
.build();
client.dataFrame().putDataFrameTransform(new PutDataFrameTransformRequest(transformConfig), RequestOptions.DEFAULT);
Expand Down Expand Up @@ -574,7 +579,7 @@ public void testGetDataFrameTransform() throws IOException, InterruptedException
.setIndex("source-data")
.setQuery(new MatchAllQueryBuilder())
.build())
.setDest(new DestConfig("pivot-dest"))
.setDest(DestConfig.builder().setIndex("pivot-dest").build())
.setPivotConfig(pivotConfig)
.build();

Expand Down
12 changes: 11 additions & 1 deletion docs/java-rest/high-level/dataframe/put_data_frame.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ include-tagged::{doc-tests-file}[{api}-config]
--------------------------------------------------
<1> The {dataframe-transform} ID
<2> The source indices and query from which to gather data
<3> The destination index
<3> The destination index and optional pipeline
<4> The PivotConfig
<5> Optional free text description of the transform

Expand All @@ -49,6 +49,16 @@ If query is not set, a `match_all` query is used by default.
include-tagged::{doc-tests-file}[{api}-source-config]
--------------------------------------------------

==== DestConfig

The index where to write the data and the optional pipeline
through which the docs should be indexed

["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests-file}[{api}-dest-config]
--------------------------------------------------

===== QueryConfig

The query with which to select data from the source.
Expand Down
6 changes: 4 additions & 2 deletions docs/reference/data-frames/apis/put-transform.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ IMPORTANT: You must use {kib} or this API to create a {dataframe-transform}.
`source` (required):: (object) The source configuration, consisting of `index` and optionally
a `query`.

`dest` (required):: (object) The destination configuration, consisting of `index`.
`dest` (required):: (object) The destination configuration, consisting of `index` and optionally a
`pipeline` id.

`pivot`:: (object) Defines the pivot function `group by` fields and the aggregation to
reduce the data. See <<data-frame-transform-pivot, data frame transform pivot objects>>.
Expand Down Expand Up @@ -66,7 +67,8 @@ PUT _data_frame/transforms/ecommerce_transform
}
},
"dest": {
"index": "kibana_sample_data_ecommerce_transform"
"index": "kibana_sample_data_ecommerce_transform",
"pipeline": "add_timestamp_pipeline"
},
"pivot": {
"group_by": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

package org.elasticsearch.xpack.core.dataframe.transforms;

import org.elasticsearch.Version;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand All @@ -20,49 +21,69 @@
import java.util.Objects;

import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;

public class DestConfig implements Writeable, ToXContentObject {

public static final ParseField INDEX = new ParseField("index");
public static final ParseField PIPELINE = new ParseField("pipeline");

public static final ConstructingObjectParser<DestConfig, Void> STRICT_PARSER = createParser(false);
public static final ConstructingObjectParser<DestConfig, Void> LENIENT_PARSER = createParser(true);

private static ConstructingObjectParser<DestConfig, Void> createParser(boolean lenient) {
ConstructingObjectParser<DestConfig, Void> parser = new ConstructingObjectParser<>("data_frame_config_dest",
lenient,
args -> new DestConfig((String)args[0]));
args -> new DestConfig((String)args[0], (String) args[1]));
parser.declareString(constructorArg(), INDEX);
parser.declareString(optionalConstructorArg(), PIPELINE);
return parser;
}

private final String index;
private final String pipeline;

public DestConfig(String index) {
public DestConfig(String index, String pipeline) {
this.index = ExceptionsHelper.requireNonNull(index, INDEX.getPreferredName());
this.pipeline = pipeline;
}

public DestConfig(final StreamInput in) throws IOException {
index = in.readString();
if (in.getVersion().onOrAfter(Version.CURRENT)) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be adjusted after backport

pipeline = in.readOptionalString();
} else {
pipeline = null;
}
}

public String getIndex() {
return index;
}

public String getPipeline() {
return pipeline;
}

public boolean isValid() {
return index.isEmpty() == false;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(index);
if (out.getVersion().onOrAfter(Version.CURRENT)) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be adjusted after backport

out.writeOptionalString(pipeline);
}
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(INDEX.getPreferredName(), index);
if (pipeline != null) {
builder.field(PIPELINE.getPreferredName(), pipeline);
}
builder.endObject();
return builder;
}
Expand All @@ -77,12 +98,13 @@ public boolean equals(Object other) {
}

DestConfig that = (DestConfig) other;
return Objects.equals(index, that.index);
return Objects.equals(index, that.index) &&
Objects.equals(pipeline, that.pipeline);
}

@Override
public int hashCode(){
return Objects.hash(index);
return Objects.hash(index, pipeline);
}

public static DestConfig fromXContent(final XContentParser parser, boolean lenient) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ protected boolean supportsUnknownFields() {
@Override
protected Request createTestInstance() {
DataFrameTransformConfig config = new DataFrameTransformConfig("transform-preview", randomSourceConfig(),
new DestConfig("unused-transform-preview-index"),
new DestConfig("unused-transform-preview-index", null),
null, PivotConfigTests.randomPivotConfig(), null);
return new Request(config);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ public class DestConfigTests extends AbstractSerializingDataFrameTestCase<DestCo
private boolean lenient;

public static DestConfig randomDestConfig() {
return new DestConfig(randomAlphaOfLength(10));
return new DestConfig(randomAlphaOfLength(10),
randomBoolean() ? null : randomAlphaOfLength(10));
}

@Before
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ protected DataFrameTransformConfig createTransformConfig(String id,
return DataFrameTransformConfig.builder()
.setId(id)
.setSource(SourceConfig.builder().setIndex(sourceIndices).setQueryConfig(createQueryConfig(queryBuilder)).build())
.setDest(new DestConfig(destinationIndex))
.setDest( DestConfig(destinationIndex, null))
.setPivotConfig(createPivotConfig(groups, aggregations))
.setDescription("Test data frame transform config id: " + id)
.build();
Expand Down
Loading