Skip to content

Commit

Permalink
Add templating support to pipeline processor. (elastic#49030)
Browse files Browse the repository at this point in the history
This commit adds templating support to the pipeline processor's `name` option.

Closes elastic#39955
  • Loading branch information
martijnvg committed Nov 27, 2019
1 parent 4b6915e commit 634d6c9
Show file tree
Hide file tree
Showing 9 changed files with 139 additions and 28 deletions.
2 changes: 1 addition & 1 deletion docs/reference/ingest/processors/pipeline.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ Executes another pipeline.
[options="header"]
|======
| Name | Required | Default | Description
| `name` | yes | - | The name of the pipeline to execute
| `name` | yes | - | The name of the pipeline to execute. Supports <<accessing-template-fields,template snippets>>.
include::common-options.asciidoc[]
|======

Expand Down
6 changes: 6 additions & 0 deletions modules/ingest-common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,9 @@ dependencies {
compile project(':libs:elasticsearch-grok')
compile project(':libs:elasticsearch-dissect')
}

testClusters.integTest {
// Needed in order to test ingest pipeline templating:
// (this is because the integTest node is not using default distribution, but only the minimal number of required modules)
module file(project(':modules:lang-mustache').tasks.bundlePlugin.archiveFile)
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,97 @@ teardown:
body: {}
- match: { error.root_cause.0.type: "ingest_processor_exception" }
- match: { error.root_cause.0.reason: "java.lang.IllegalStateException: Cycle detected for pipeline: outer" }

---
"Test Pipeline Processor with templating":
- do:
ingest.put_pipeline:
id: "engineering-department"
body: >
{
"processors" : [
{
"set" : {
"field": "manager",
"value": "john"
}
}
]
}
- match: { acknowledged: true }

- do:
ingest.put_pipeline:
id: "sales-department"
body: >
{
"processors" : [
{
"set" : {
"field": "manager",
"value": "jan"
}
}
]
}
- match: { acknowledged: true }

- do:
ingest.put_pipeline:
id: "outer"
body: >
{
"processors" : [
{
"pipeline" : {
"name": "{{org}}-department"
}
}
]
}
- match: { acknowledged: true }

- do:
index:
index: test
id: 1
pipeline: "outer"
body: >
{
"org": "engineering"
}
- do:
get:
index: test
id: 1
- match: { _source.manager: "john" }

- do:
index:
index: test
id: 2
pipeline: "outer"
body: >
{
"org": "sales"
}
- do:
get:
index: test
id: 2
- match: { _source.manager: "jan" }

- do:
catch: /illegal_state_exception/
index:
index: test
id: 3
pipeline: "outer"
body: >
{
"org": "legal"
}
- match: { error.root_cause.0.type: "ingest_processor_exception" }
- match: { error.root_cause.0.reason: "java.lang.IllegalStateException: Pipeline processor configured for non-existent pipeline [legal-department]" }
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ public void addIngestClusterStateListener(Consumer<ClusterState> listener) {
}

//package private for testing
static String getProcessorName(Processor processor){
static String getProcessorName(Processor processor) {
// conditionals are implemented as wrappers around the real processor, so get the real processor for the correct type for the name
if(processor instanceof ConditionalProcessor){
processor = ((ConditionalProcessor) processor).getInnerProcessor();
Expand All @@ -470,7 +470,7 @@ static String getProcessorName(Processor processor){
sb.append(processor.getType());

if(processor instanceof PipelineProcessor){
String pipelineName = ((PipelineProcessor) processor).getPipelineName();
String pipelineName = ((PipelineProcessor) processor).getPipelineTemplate().newInstance(Map.of()).execute();
sb.append(":");
sb.append(pipelineName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,27 @@

package org.elasticsearch.ingest;

import org.elasticsearch.script.TemplateScript;

import java.util.Map;
import java.util.function.BiConsumer;

public class PipelineProcessor extends AbstractProcessor {

public static final String TYPE = "pipeline";

private final String pipelineName;

private final TemplateScript.Factory pipelineTemplate;
private final IngestService ingestService;

private PipelineProcessor(String tag, String pipelineName, IngestService ingestService) {
private PipelineProcessor(String tag, TemplateScript.Factory pipelineTemplate, IngestService ingestService) {
super(tag);
this.pipelineName = pipelineName;
this.pipelineTemplate = pipelineTemplate;
this.ingestService = ingestService;
}

@Override
public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
String pipelineName = ingestDocument.renderTemplate(this.pipelineTemplate);
Pipeline pipeline = ingestService.getPipeline(pipelineName);
if (pipeline != null) {
ingestDocument.executePipeline(pipeline, handler);
Expand All @@ -52,7 +54,8 @@ public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
throw new UnsupportedOperationException("this method should not get executed");
}

Pipeline getPipeline(){
Pipeline getPipeline(IngestDocument ingestDocument) {
String pipelineName = ingestDocument.renderTemplate(this.pipelineTemplate);
return ingestService.getPipeline(pipelineName);
}

Expand All @@ -61,8 +64,8 @@ public String getType() {
return TYPE;
}

String getPipelineName() {
return pipelineName;
TemplateScript.Factory getPipelineTemplate() {
return pipelineTemplate;
}

public static final class Factory implements Processor.Factory {
Expand All @@ -76,9 +79,9 @@ public Factory(IngestService ingestService) {
@Override
public PipelineProcessor create(Map<String, Processor.Factory> registry, String processorTag,
Map<String, Object> config) throws Exception {
String pipeline =
ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "name");
return new PipelineProcessor(processorTag, pipeline, ingestService);
TemplateScript.Factory pipelineTemplate =
ConfigurationUtils.readTemplateProperty(TYPE, processorTag, config, "name", ingestService.getScriptService());
return new PipelineProcessor(processorTag, pipelineTemplate, ingestService);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ public final class TrackingResultProcessor implements Processor {
public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
if (actualProcessor instanceof PipelineProcessor) {
PipelineProcessor pipelineProcessor = ((PipelineProcessor) actualProcessor);
Pipeline pipeline = pipelineProcessor.getPipeline();
Pipeline pipeline = pipelineProcessor.getPipeline(ingestDocument);
//runtime check for cycles against a copy of the document. This is needed to properly handle conditionals around pipelines
IngestDocument ingestDocumentCopy = new IngestDocument(ingestDocument);
ingestDocumentCopy.executePipeline(pipelineProcessor.getPipeline(), (result, e) -> {
ingestDocumentCopy.executePipeline(pipelineProcessor.getPipeline(ingestDocument), (result, e) -> {
// do nothing, let the tracking processors throw the exception while recording the path up to the failure
if (e instanceof ElasticsearchException) {
ElasticsearchException elasticsearchException = (ElasticsearchException) e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1140,7 +1140,7 @@ public void testStatName(){

PipelineProcessor pipelineProcessor = mock(PipelineProcessor.class);
String pipelineName = randomAlphaOfLength(10);
when(pipelineProcessor.getPipelineName()).thenReturn(pipelineName);
when(pipelineProcessor.getPipelineTemplate()).thenReturn(new TestTemplateService.MockTemplateScript.Factory(pipelineName));
name = PipelineProcessor.TYPE;
when(pipelineProcessor.getType()).thenReturn(name);
assertThat(IngestService.getProcessorName(pipelineProcessor), equalTo(name + ":" + pipelineName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.elasticsearch.ingest;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.test.ESTestCase;

import java.util.Arrays;
Expand All @@ -37,7 +38,7 @@ public class PipelineProcessorTests extends ESTestCase {

public void testExecutesPipeline() throws Exception {
String pipelineId = "pipeline";
IngestService ingestService = mock(IngestService.class);
IngestService ingestService = createIngestService();
CompletableFuture<IngestDocument> invoked = new CompletableFuture<>();
IngestDocument testIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
Pipeline pipeline = new Pipeline(
Expand Down Expand Up @@ -69,7 +70,7 @@ public String getTag() {
}

public void testThrowsOnMissingPipeline() throws Exception {
IngestService ingestService = mock(IngestService.class);
IngestService ingestService = createIngestService();
IngestDocument testIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService);
Map<String, Object> config = new HashMap<>();
Expand All @@ -85,7 +86,7 @@ public void testThrowsOnMissingPipeline() throws Exception {
public void testThrowsOnRecursivePipelineInvocations() throws Exception {
String innerPipelineId = "inner";
String outerPipelineId = "outer";
IngestService ingestService = mock(IngestService.class);
IngestService ingestService = createIngestService();
IngestDocument testIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
Map<String, Object> outerConfig = new HashMap<>();
outerConfig.put("name", innerPipelineId);
Expand Down Expand Up @@ -113,7 +114,7 @@ public void testThrowsOnRecursivePipelineInvocations() throws Exception {

public void testAllowsRepeatedPipelineInvocations() throws Exception {
String innerPipelineId = "inner";
IngestService ingestService = mock(IngestService.class);
IngestService ingestService = createIngestService();
IngestDocument testIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
Map<String, Object> outerConfig = new HashMap<>();
outerConfig.put("name", innerPipelineId);
Expand All @@ -131,7 +132,7 @@ public void testPipelineProcessorWithPipelineChain() throws Exception {
String pipeline1Id = "pipeline1";
String pipeline2Id = "pipeline2";
String pipeline3Id = "pipeline3";
IngestService ingestService = mock(IngestService.class);
IngestService ingestService = createIngestService();
PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService);

Map<String, Object> pipeline1ProcessorConfig = new HashMap<>();
Expand Down Expand Up @@ -203,4 +204,11 @@ pipeline3Id, null, null, new CompoundProcessor(
assertThat(pipeline2Stats.getIngestFailedCount(), equalTo(0L));
assertThat(pipeline3Stats.getIngestFailedCount(), equalTo(1L));
}

static IngestService createIngestService() {
IngestService ingestService = mock(IngestService.class);
ScriptService scriptService = mock(ScriptService.class);
when(ingestService.getScriptService()).thenReturn(scriptService);
return ingestService;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,14 @@
import static org.elasticsearch.ingest.CompoundProcessor.ON_FAILURE_MESSAGE_FIELD;
import static org.elasticsearch.ingest.CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD;
import static org.elasticsearch.ingest.CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD;
import static org.elasticsearch.ingest.PipelineProcessorTests.createIngestService;
import static org.elasticsearch.ingest.TrackingResultProcessor.decorate;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -195,7 +195,7 @@ public void testActualCompoundProcessorWithFalseConditional() throws Exception {

public void testActualPipelineProcessor() throws Exception {
String pipelineId = "pipeline1";
IngestService ingestService = mock(IngestService.class);
IngestService ingestService = createIngestService();
Map<String, Object> pipelineConfig = new HashMap<>();
pipelineConfig.put("name", pipelineId);
PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService);
Expand Down Expand Up @@ -240,7 +240,7 @@ pipelineId, null, null, new CompoundProcessor(
public void testActualPipelineProcessorWithTrueConditional() throws Exception {
String pipelineId1 = "pipeline1";
String pipelineId2 = "pipeline2";
IngestService ingestService = mock(IngestService.class);
IngestService ingestService = createIngestService();
Map<String, Object> pipelineConfig0 = new HashMap<>();
pipelineConfig0.put("name", pipelineId1);
Map<String, Object> pipelineConfig1 = new HashMap<>();
Expand Down Expand Up @@ -308,7 +308,7 @@ pipelineId2, null, null, new CompoundProcessor(
public void testActualPipelineProcessorWithFalseConditional() throws Exception {
String pipelineId1 = "pipeline1";
String pipelineId2 = "pipeline2";
IngestService ingestService = mock(IngestService.class);
IngestService ingestService = createIngestService();
Map<String, Object> pipelineConfig0 = new HashMap<>();
pipelineConfig0.put("name", pipelineId1);
Map<String, Object> pipelineConfig1 = new HashMap<>();
Expand Down Expand Up @@ -377,7 +377,7 @@ public void testActualPipelineProcessorWithHandledFailure() throws Exception {
RuntimeException exception = new RuntimeException("processor failed");

String pipelineId = "pipeline1";
IngestService ingestService = mock(IngestService.class);
IngestService ingestService = createIngestService();
Map<String, Object> pipelineConfig = new HashMap<>();
pipelineConfig.put("name", pipelineId);
PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService);
Expand Down Expand Up @@ -430,7 +430,7 @@ pipelineId, null, null, new CompoundProcessor(
public void testActualPipelineProcessorWithCycle() throws Exception {
String pipelineId1 = "pipeline1";
String pipelineId2 = "pipeline2";
IngestService ingestService = mock(IngestService.class);
IngestService ingestService = createIngestService();
Map<String, Object> pipelineConfig0 = new HashMap<>();
pipelineConfig0.put("name", pipelineId1);
Map<String, Object> pipelineConfig1 = new HashMap<>();
Expand Down Expand Up @@ -462,7 +462,7 @@ public void testActualPipelineProcessorWithCycle() throws Exception {

public void testActualPipelineProcessorRepeatedInvocation() throws Exception {
String pipelineId = "pipeline1";
IngestService ingestService = mock(IngestService.class);
IngestService ingestService = createIngestService();
Map<String, Object> pipelineConfig = new HashMap<>();
pipelineConfig.put("name", pipelineId);
PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService);
Expand Down

0 comments on commit 634d6c9

Please sign in to comment.