diff --git a/docs/reference/ingest/processors/pipeline.asciidoc b/docs/reference/ingest/processors/pipeline.asciidoc index 573ab3b88d3c4..7f1ea2885e69a 100644 --- a/docs/reference/ingest/processors/pipeline.asciidoc +++ b/docs/reference/ingest/processors/pipeline.asciidoc @@ -7,7 +7,7 @@ Executes another pipeline. [options="header"] |====== | Name | Required | Default | Description -| `name` | yes | - | The name of the pipeline to execute +| `name` | yes | - | The name of the pipeline to execute. Supports <>. include::common-options.asciidoc[] |====== diff --git a/modules/ingest-common/build.gradle b/modules/ingest-common/build.gradle index ec8f8b7c3717d..6590bdc1c52ef 100644 --- a/modules/ingest-common/build.gradle +++ b/modules/ingest-common/build.gradle @@ -28,3 +28,9 @@ dependencies { compile project(':libs:elasticsearch-grok') compile project(':libs:elasticsearch-dissect') } + +testClusters.integTest { + // Needed in order to test ingest pipeline templating: + // (this is because the integTest node is not using default distribution, but only the minimal number of required modules) + module file(project(':modules:lang-mustache').tasks.bundlePlugin.archiveFile) +} diff --git a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_pipeline_processor.yml b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_pipeline_processor.yml index 7f4af85cccf91..76dbc180fa0e5 100644 --- a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_pipeline_processor.yml +++ b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_pipeline_processor.yml @@ -108,3 +108,97 @@ teardown: body: {} - match: { error.root_cause.0.type: "ingest_processor_exception" } - match: { error.root_cause.0.reason: "java.lang.IllegalStateException: Cycle detected for pipeline: outer" } + +--- +"Test Pipeline Processor with templating": + - do: + ingest.put_pipeline: + id: "engineering-department" + body: > + { + "processors" : [ + { + "set" : { + "field": "manager", + "value": "john" + } + } + ] + } + - match: { acknowledged: true } + + - do: + ingest.put_pipeline: + id: "sales-department" + body: > + { + "processors" : [ + { + "set" : { + "field": "manager", + "value": "jan" + } + } + ] + } + - match: { acknowledged: true } + + - do: + ingest.put_pipeline: + id: "outer" + body: > + { + "processors" : [ + { + "pipeline" : { + "name": "{{org}}-department" + } + } + ] + } + - match: { acknowledged: true } + + - do: + index: + index: test + id: 1 + pipeline: "outer" + body: > + { + "org": "engineering" + } + + - do: + get: + index: test + id: 1 + - match: { _source.manager: "john" } + + - do: + index: + index: test + id: 2 + pipeline: "outer" + body: > + { + "org": "sales" + } + + - do: + get: + index: test + id: 2 + - match: { _source.manager: "jan" } + + - do: + catch: /illegal_state_exception/ + index: + index: test + id: 3 + pipeline: "outer" + body: > + { + "org": "legal" + } + - match: { error.root_cause.0.type: "ingest_processor_exception" } + - match: { error.root_cause.0.reason: "java.lang.IllegalStateException: Pipeline processor configured for non-existent pipeline [legal-department]" } diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 9fba3276c6f50..56b899f068b1c 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -460,7 +460,7 @@ public void addIngestClusterStateListener(Consumer listener) { } //package private for testing - static String getProcessorName(Processor processor){ + static String getProcessorName(Processor processor) { // conditionals are implemented as wrappers around the real processor, so get the real processor for the correct type for the name if(processor instanceof ConditionalProcessor){ processor = ((ConditionalProcessor) processor).getInnerProcessor(); @@ -469,7 +469,7 @@ static String getProcessorName(Processor processor){ sb.append(processor.getType()); if(processor instanceof PipelineProcessor){ - String pipelineName = ((PipelineProcessor) processor).getPipelineName(); + String pipelineName = ((PipelineProcessor) processor).getPipelineTemplate().newInstance(Map.of()).execute(); sb.append(":"); sb.append(pipelineName); } diff --git a/server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java b/server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java index f5e37a1c1235e..be02fe24752c1 100644 --- a/server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java @@ -19,6 +19,8 @@ package org.elasticsearch.ingest; +import org.elasticsearch.script.TemplateScript; + import java.util.Map; import java.util.function.BiConsumer; @@ -26,18 +28,18 @@ public class PipelineProcessor extends AbstractProcessor { public static final String TYPE = "pipeline"; - private final String pipelineName; - + private final TemplateScript.Factory pipelineTemplate; private final IngestService ingestService; - private PipelineProcessor(String tag, String pipelineName, IngestService ingestService) { + private PipelineProcessor(String tag, TemplateScript.Factory pipelineTemplate, IngestService ingestService) { super(tag); - this.pipelineName = pipelineName; + this.pipelineTemplate = pipelineTemplate; this.ingestService = ingestService; } @Override public void execute(IngestDocument ingestDocument, BiConsumer handler) { + String pipelineName = ingestDocument.renderTemplate(this.pipelineTemplate); Pipeline pipeline = ingestService.getPipeline(pipelineName); if (pipeline != null) { ingestDocument.executePipeline(pipeline, handler); @@ -52,7 +54,8 @@ public IngestDocument execute(IngestDocument ingestDocument) throws Exception { throw new UnsupportedOperationException("this method should not get executed"); } - Pipeline getPipeline(){ + Pipeline getPipeline(IngestDocument ingestDocument) { + String pipelineName = ingestDocument.renderTemplate(this.pipelineTemplate); return ingestService.getPipeline(pipelineName); } @@ -61,8 +64,8 @@ public String getType() { return TYPE; } - String getPipelineName() { - return pipelineName; + TemplateScript.Factory getPipelineTemplate() { + return pipelineTemplate; } public static final class Factory implements Processor.Factory { @@ -76,9 +79,9 @@ public Factory(IngestService ingestService) { @Override public PipelineProcessor create(Map registry, String processorTag, Map config) throws Exception { - String pipeline = - ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "name"); - return new PipelineProcessor(processorTag, pipeline, ingestService); + TemplateScript.Factory pipelineTemplate = + ConfigurationUtils.readTemplateProperty(TYPE, processorTag, config, "name", ingestService.getScriptService()); + return new PipelineProcessor(processorTag, pipelineTemplate, ingestService); } } } diff --git a/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java b/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java index edd236c8c4e76..4abaadb353c55 100644 --- a/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java @@ -45,10 +45,10 @@ public final class TrackingResultProcessor implements Processor { public void execute(IngestDocument ingestDocument, BiConsumer handler) { if (actualProcessor instanceof PipelineProcessor) { PipelineProcessor pipelineProcessor = ((PipelineProcessor) actualProcessor); - Pipeline pipeline = pipelineProcessor.getPipeline(); + Pipeline pipeline = pipelineProcessor.getPipeline(ingestDocument); //runtime check for cycles against a copy of the document. This is needed to properly handle conditionals around pipelines IngestDocument ingestDocumentCopy = new IngestDocument(ingestDocument); - ingestDocumentCopy.executePipeline(pipelineProcessor.getPipeline(), (result, e) -> { + ingestDocumentCopy.executePipeline(pipelineProcessor.getPipeline(ingestDocument), (result, e) -> { // do nothing, let the tracking processors throw the exception while recording the path up to the failure if (e instanceof ElasticsearchException) { ElasticsearchException elasticsearchException = (ElasticsearchException) e; diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index 93b1589617ea7..5400956d076c3 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -1115,7 +1115,7 @@ public void testStatName(){ PipelineProcessor pipelineProcessor = mock(PipelineProcessor.class); String pipelineName = randomAlphaOfLength(10); - when(pipelineProcessor.getPipelineName()).thenReturn(pipelineName); + when(pipelineProcessor.getPipelineTemplate()).thenReturn(new TestTemplateService.MockTemplateScript.Factory(pipelineName)); name = PipelineProcessor.TYPE; when(pipelineProcessor.getType()).thenReturn(name); assertThat(IngestService.getProcessorName(pipelineProcessor), equalTo(name + ":" + pipelineName)); diff --git a/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java index 4f36727c7ac30..aebcc28e77d5e 100644 --- a/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.ingest; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.script.ScriptService; import org.elasticsearch.test.ESTestCase; import java.util.Arrays; @@ -37,7 +38,7 @@ public class PipelineProcessorTests extends ESTestCase { public void testExecutesPipeline() throws Exception { String pipelineId = "pipeline"; - IngestService ingestService = mock(IngestService.class); + IngestService ingestService = createIngestService(); CompletableFuture invoked = new CompletableFuture<>(); IngestDocument testIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); Pipeline pipeline = new Pipeline( @@ -69,7 +70,7 @@ public String getTag() { } public void testThrowsOnMissingPipeline() throws Exception { - IngestService ingestService = mock(IngestService.class); + IngestService ingestService = createIngestService(); IngestDocument testIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService); Map config = new HashMap<>(); @@ -85,7 +86,7 @@ public void testThrowsOnMissingPipeline() throws Exception { public void testThrowsOnRecursivePipelineInvocations() throws Exception { String innerPipelineId = "inner"; String outerPipelineId = "outer"; - IngestService ingestService = mock(IngestService.class); + IngestService ingestService = createIngestService(); IngestDocument testIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); Map outerConfig = new HashMap<>(); outerConfig.put("name", innerPipelineId); @@ -113,7 +114,7 @@ public void testThrowsOnRecursivePipelineInvocations() throws Exception { public void testAllowsRepeatedPipelineInvocations() throws Exception { String innerPipelineId = "inner"; - IngestService ingestService = mock(IngestService.class); + IngestService ingestService = createIngestService(); IngestDocument testIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); Map outerConfig = new HashMap<>(); outerConfig.put("name", innerPipelineId); @@ -131,7 +132,7 @@ public void testPipelineProcessorWithPipelineChain() throws Exception { String pipeline1Id = "pipeline1"; String pipeline2Id = "pipeline2"; String pipeline3Id = "pipeline3"; - IngestService ingestService = mock(IngestService.class); + IngestService ingestService = createIngestService(); PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService); Map pipeline1ProcessorConfig = new HashMap<>(); @@ -203,4 +204,11 @@ pipeline3Id, null, null, new CompoundProcessor( assertThat(pipeline2Stats.getIngestFailedCount(), equalTo(0L)); assertThat(pipeline3Stats.getIngestFailedCount(), equalTo(1L)); } + + static IngestService createIngestService() { + IngestService ingestService = mock(IngestService.class); + ScriptService scriptService = mock(ScriptService.class); + when(ingestService.getScriptService()).thenReturn(scriptService); + return ingestService; + } } diff --git a/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java index cc9e44e387baf..c66d4742b991b 100644 --- a/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java @@ -40,6 +40,7 @@ import static org.elasticsearch.ingest.CompoundProcessor.ON_FAILURE_MESSAGE_FIELD; import static org.elasticsearch.ingest.CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD; import static org.elasticsearch.ingest.CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD; +import static org.elasticsearch.ingest.PipelineProcessorTests.createIngestService; import static org.elasticsearch.ingest.TrackingResultProcessor.decorate; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.not; @@ -47,7 +48,6 @@ import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.sameInstance; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -195,7 +195,7 @@ public void testActualCompoundProcessorWithFalseConditional() throws Exception { public void testActualPipelineProcessor() throws Exception { String pipelineId = "pipeline1"; - IngestService ingestService = mock(IngestService.class); + IngestService ingestService = createIngestService(); Map pipelineConfig = new HashMap<>(); pipelineConfig.put("name", pipelineId); PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService); @@ -240,7 +240,7 @@ pipelineId, null, null, new CompoundProcessor( public void testActualPipelineProcessorWithTrueConditional() throws Exception { String pipelineId1 = "pipeline1"; String pipelineId2 = "pipeline2"; - IngestService ingestService = mock(IngestService.class); + IngestService ingestService = createIngestService(); Map pipelineConfig0 = new HashMap<>(); pipelineConfig0.put("name", pipelineId1); Map pipelineConfig1 = new HashMap<>(); @@ -308,7 +308,7 @@ pipelineId2, null, null, new CompoundProcessor( public void testActualPipelineProcessorWithFalseConditional() throws Exception { String pipelineId1 = "pipeline1"; String pipelineId2 = "pipeline2"; - IngestService ingestService = mock(IngestService.class); + IngestService ingestService = createIngestService(); Map pipelineConfig0 = new HashMap<>(); pipelineConfig0.put("name", pipelineId1); Map pipelineConfig1 = new HashMap<>(); @@ -377,7 +377,7 @@ public void testActualPipelineProcessorWithHandledFailure() throws Exception { RuntimeException exception = new RuntimeException("processor failed"); String pipelineId = "pipeline1"; - IngestService ingestService = mock(IngestService.class); + IngestService ingestService = createIngestService(); Map pipelineConfig = new HashMap<>(); pipelineConfig.put("name", pipelineId); PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService); @@ -430,7 +430,7 @@ pipelineId, null, null, new CompoundProcessor( public void testActualPipelineProcessorWithCycle() throws Exception { String pipelineId1 = "pipeline1"; String pipelineId2 = "pipeline2"; - IngestService ingestService = mock(IngestService.class); + IngestService ingestService = createIngestService(); Map pipelineConfig0 = new HashMap<>(); pipelineConfig0.put("name", pipelineId1); Map pipelineConfig1 = new HashMap<>(); @@ -462,7 +462,7 @@ public void testActualPipelineProcessorWithCycle() throws Exception { public void testActualPipelineProcessorRepeatedInvocation() throws Exception { String pipelineId = "pipeline1"; - IngestService ingestService = mock(IngestService.class); + IngestService ingestService = createIngestService(); Map pipelineConfig = new HashMap<>(); pipelineConfig.put("name", pipelineId); PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService);