Skip to content

Commit

Permalink
[Transform] fail to start/put on missing pipeline (elastic#50701)
Browse files Browse the repository at this point in the history
If a pipeline referenced by a transform does not exist, we should not allow the transform to be created. 

We do allow the pipeline existence check to be skipped with defer_validations, but if the pipeline still does not exist on `_start`, the pipeline will fail to start.

relates:  elastic#50135
  • Loading branch information
benwtrent committed Jan 9, 2020
1 parent 71afeec commit d440108
Show file tree
Hide file tree
Showing 10 changed files with 173 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ public void testPutTransform() throws IOException, InterruptedException {
.setIndex("pivot-destination")
.setPipeline("my-pipeline").build();
// end::put-transform-dest-config
destConfig = DestConfig.builder().setIndex("pivot-destination").build();
// tag::put-transform-group-config
GroupConfig groupConfig = GroupConfig.builder()
.groupBy("reviewer", // <1>
Expand Down
18 changes: 17 additions & 1 deletion docs/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -1226,7 +1226,23 @@ buildRestTests.setups['kibana_sample_data_ecommerce'] = '''
number_of_shards: 1
number_of_replicas: 0
'''
buildRestTests.setups['simple_kibana_continuous_pivot'] = buildRestTests.setups['kibana_sample_data_ecommerce'] + '''
buildRestTests.setups['add_timestamp_pipeline'] = '''
- do:
ingest.put_pipeline:
id: "add_timestamp_pipeline"
body: >
{
"processors": [
{
"set" : {
"field" : "@timestamp",
"value" : "{{_ingest.timestamp}}"
}
}
]
}
'''
buildRestTests.setups['simple_kibana_continuous_pivot'] = buildRestTests.setups['kibana_sample_data_ecommerce'] + buildRestTests.setups['add_timestamp_pipeline'] + '''
- do:
raw:
method: PUT
Expand Down
2 changes: 1 addition & 1 deletion docs/reference/transform/apis/put-transform.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ PUT _transform/ecommerce_transform
}
}
--------------------------------------------------
// TEST[setup:kibana_sample_data_ecommerce]
// TEST[setup:kibana_sample_data_ecommerce,add_timestamp_pipeline]

When the {transform} is created, you receive the following results:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public class TransformMessages {
public static final String REST_FAILED_TO_SERIALIZE_TRANSFORM = "Failed to serialise transform [{0}]";
public static final String TRANSFORM_FAILED_TO_PERSIST_STATS = "Failed to persist transform statistics for transform [{0}]";
public static final String UNKNOWN_TRANSFORM_STATS = "Statistics for transform [{0}] could not be found";
public static final String PIPELINE_MISSING = "Pipeline with id [{0}] could not be found";

public static final String REST_DEPRECATED_ENDPOINT = "[_data_frame/transforms/] is deprecated, use [_transform/] in the future.";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,22 @@ setup:
}
- match: { acknowledged: true }

- do:
ingest.put_pipeline:
id: "airline-pipeline"
body: >
{
"processors": [
{
"set" : {
"field" : "some_field",
"value" : 42
}
}
]
}
- match: { acknowledged: true }

- do:
transform.put_transform:
transform_id: "airline-transform-dos"
Expand Down Expand Up @@ -631,3 +647,36 @@ setup:
transform_id: "airline-transform-start-delete"
force: true
- match: { acknowledged: true }
---
"Test put transform with missing pipeline":
- do:
catch: /Pipeline with id \[missing-transform-pipeline\] could not be found/
transform.put_transform:
transform_id: "airline-transform-with-missing-pipeline-crud"
body: >
{
"source": { "index": "airline-data" },
"dest": { "index": "airline-data-by-airline-with-pipeline", "pipeline": "missing-transform-pipeline" },
"pivot": {
"group_by": { "airline": {"terms": {"field": "airline"}}},
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
},
"description": "yaml test transform on airline-data"
}
---
"Test put transform with missing pipeline and defer validations":
- do:
transform.put_transform:
defer_validation: true
transform_id: "airline-transform-with-missing-pipeline-crud-defer"
body: >
{
"source": { "index": "airline-data" },
"dest": { "index": "airline-data-by-airline", "pipeline": "missing-transform-pipeline" },
"pivot": {
"group_by": { "airline": {"terms": {"field": "airline"}}},
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
},
"description": "yaml test transform on airline-data"
}
- match: {acknowledged: true}
Original file line number Diff line number Diff line change
Expand Up @@ -376,3 +376,59 @@ teardown:
index: airline-data-time-alias
- match: { airline-data-time-alias.mappings.properties.time.type: date }
- match: { airline-data-time-alias.mappings.properties.avg_response.type: double }
---
"Test start transform with missing pipeline":
- do:
transform.put_transform:
defer_validation: true
transform_id: "airline-transform-with-missing-pipeline"
body: >
{
"source": { "index": "airline-data" },
"dest": { "index": "airline-data-by-airline-pipeline", "pipeline": "missing-transform-pipeline" },
"pivot": {
"group_by": { "airline": {"terms": {"field": "airline"}}},
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
},
"description": "yaml test transform on airline-data"
}
- match: {acknowledged: true}
- do:
catch: /Pipeline with id \[missing-transform-pipeline\] could not be found/
transform.start_transform:
transform_id: "airline-transform-with-missing-pipeline"
---
"Test start transform with pipeline":
- do:
ingest.put_pipeline:
id: "transform-pipeline"
body: >
{
"processors": [
{
"set" : {
"field" : "some_field",
"value" : 42
}
}
]
}
- match: { acknowledged: true }

- do:
transform.put_transform:
transform_id: "airline-transform-with-pipeline"
body: >
{
"source": { "index": "airline-data" },
"dest": { "index": "airline-data-by-airline-pipeline", "pipeline": "transform-pipeline" },
"pivot": {
"group_by": { "airline": {"terms": {"field": "airline"}}},
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
},
"description": "yaml test transform on airline-data"
}
- match: {acknowledged: true}
- do:
transform.start_transform:
transform_id: "airline-transform-with-pipeline"
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.license.License;
import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.license.RemoteClusterLicenseChecker;
Expand Down Expand Up @@ -73,6 +74,7 @@ public class TransportPutTransformAction extends TransportMasterNodeAction<Reque
private final SecurityContext securityContext;
private final TransformAuditor auditor;
private final SourceDestValidator sourceDestValidator;
private final IngestService ingestService;

@Inject
public TransportPutTransformAction(
Expand All @@ -84,7 +86,8 @@ public TransportPutTransformAction(
ClusterService clusterService,
XPackLicenseState licenseState,
TransformServices transformServices,
Client client
Client client,
IngestService ingestService
) {
this(
PutTransformAction.NAME,
Expand All @@ -96,7 +99,8 @@ public TransportPutTransformAction(
clusterService,
licenseState,
transformServices,
client
client,
ingestService
);
}

Expand All @@ -110,7 +114,8 @@ protected TransportPutTransformAction(
ClusterService clusterService,
XPackLicenseState licenseState,
TransformServices transformServices,
Client client
Client client,
IngestService ingestService
) {
super(
name,
Expand All @@ -137,6 +142,7 @@ protected TransportPutTransformAction(
clusterService.getNodeName(),
License.OperationMode.BASIC.description()
);
this.ingestService = ingestService;
}

static HasPrivilegesRequest buildPrivilegeCheck(
Expand Down Expand Up @@ -335,6 +341,16 @@ private void putTransform(Request request, ActionListener<AcknowledgedResponse>
if (request.isDeferValidation()) {
pivotValidationListener.onResponse(true);
} else {
if (config.getDestination().getPipeline() != null) {
if (ingestService.getPipeline(config.getDestination().getPipeline()) == null) {
listener.onFailure(new ElasticsearchStatusException(
TransformMessages.getMessage(TransformMessages.PIPELINE_MISSING, config.getDestination().getPipeline()),
RestStatus.BAD_REQUEST
)
);
return;
}
}
pivot.validateQuery(client, config.getSource(), pivotValidationListener);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.license.License;
import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.license.RemoteClusterLicenseChecker;
Expand Down Expand Up @@ -70,6 +71,7 @@ public class TransportStartTransformAction extends TransportMasterNodeAction<Sta
private final Client client;
private final TransformAuditor auditor;
private final SourceDestValidator sourceDestValidator;
private final IngestService ingestService;

@Inject
public TransportStartTransformAction(
Expand All @@ -82,7 +84,8 @@ public TransportStartTransformAction(
TransformServices transformServices,
PersistentTasksService persistentTasksService,
Client client,
Settings settings
Settings settings,
IngestService ingestService
) {
this(
StartTransformAction.NAME,
Expand All @@ -95,7 +98,8 @@ public TransportStartTransformAction(
transformServices,
persistentTasksService,
client,
settings
settings,
ingestService
);
}

Expand All @@ -110,7 +114,8 @@ protected TransportStartTransformAction(
TransformServices transformServices,
PersistentTasksService persistentTasksService,
Client client,
Settings settings
Settings settings,
IngestService ingestService
) {
super(
name,
Expand All @@ -135,6 +140,7 @@ protected TransportStartTransformAction(
clusterService.getNodeName(),
License.OperationMode.BASIC.description()
);
this.ingestService = ingestService;
}

@Override
Expand Down Expand Up @@ -256,6 +262,16 @@ protected void masterOperation(
}
transformTaskHolder.set(createTransform(config.getId(), config.getVersion(), config.getFrequency()));
transformConfigHolder.set(config);
if (config.getDestination().getPipeline() != null) {
if (ingestService.getPipeline(config.getDestination().getPipeline()) == null) {
listener.onFailure(new ElasticsearchStatusException(
TransformMessages.getMessage(TransformMessages.PIPELINE_MISSING, config.getDestination().getPipeline()),
RestStatus.BAD_REQUEST
)
);
return;
}
}

sourceDestValidator.validate(
clusterService.state(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
Expand All @@ -31,7 +32,8 @@ public TransportPutTransformActionDeprecated(
ClusterService clusterService,
XPackLicenseState licenseState,
TransformServices transformServices,
Client client
Client client,
IngestService ingestService
) {
super(
PutTransformActionDeprecated.NAME,
Expand All @@ -43,7 +45,8 @@ public TransportPutTransformActionDeprecated(
clusterService,
licenseState,
transformServices,
client
client,
ingestService
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.persistent.PersistentTasksService;
import org.elasticsearch.threadpool.ThreadPool;
Expand All @@ -33,7 +34,8 @@ public TransportStartTransformActionDeprecated(
TransformServices transformServices,
PersistentTasksService persistentTasksService,
Client client,
Settings settings
Settings settings,
IngestService ingestService
) {
super(
StartTransformActionDeprecated.NAME,
Expand All @@ -46,7 +48,8 @@ public TransportStartTransformActionDeprecated(
transformServices,
persistentTasksService,
client,
settings
settings,
ingestService
);
}
}

0 comments on commit d440108

Please sign in to comment.