Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add templating support to pipeline processor. #49030

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/reference/ingest/processors/pipeline.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ Executes another pipeline.
[options="header"]
|======
| Name | Required | Default | Description
| `name` | yes | - | The name of the pipeline to execute
| `name` | yes | - | The name of the pipeline to execute. Supports <<accessing-template-fields,template snippets>>.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have these links changed? accessing-template-fields points to common-options and template snippets to the accessing template fields paragraph on the pipeline.asciidoc page but there isn't such a paragraph

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did a direct copy from the set processor docs page and that links correctly. So I think it is good?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah right, might be the github navigation that's not able to follow the links correctly.

include::common-options.asciidoc[]
|======

Expand Down
6 changes: 6 additions & 0 deletions modules/ingest-common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,9 @@ dependencies {
compile project(':libs:elasticsearch-grok')
compile project(':libs:elasticsearch-dissect')
}

testClusters.integTest {
// Needed in order to test ingest pipeline templating:
// (this is because the integTest node is not using default distribution, but only the minimal number of required modules)
module file(project(':modules:lang-mustache').tasks.bundlePlugin.archiveFile)
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,97 @@ teardown:
body: {}
- match: { error.root_cause.0.type: "ingest_processor_exception" }
- match: { error.root_cause.0.reason: "java.lang.IllegalStateException: Cycle detected for pipeline: outer" }

---
"Test Pipeline Processor with templating":
- do:
ingest.put_pipeline:
id: "engineering-department"
body: >
{
"processors" : [
{
"set" : {
"field": "manager",
"value": "john"
}
}
]
}
- match: { acknowledged: true }

- do:
ingest.put_pipeline:
id: "sales-department"
body: >
{
"processors" : [
{
"set" : {
"field": "manager",
"value": "jan"
}
}
]
}
- match: { acknowledged: true }

- do:
ingest.put_pipeline:
id: "outer"
body: >
{
"processors" : [
{
"pipeline" : {
"name": "{{org}}-department"
}
}
]
}
- match: { acknowledged: true }

- do:
index:
index: test
id: 1
pipeline: "outer"
body: >
{
"org": "engineering"
}

- do:
get:
index: test
id: 1
- match: { _source.manager: "john" }

- do:
index:
index: test
id: 2
pipeline: "outer"
body: >
{
"org": "sales"
}

- do:
get:
index: test
id: 2
- match: { _source.manager: "jan" }

- do:
catch: /illegal_state_exception/
index:
index: test
id: 3
pipeline: "outer"
body: >
{
"org": "legal"
}
- match: { error.root_cause.0.type: "ingest_processor_exception" }
- match: { error.root_cause.0.reason: "java.lang.IllegalStateException: Pipeline processor configured for non-existent pipeline [legal-department]" }
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ public void addIngestClusterStateListener(Consumer<ClusterState> listener) {
}

//package private for testing
static String getProcessorName(Processor processor){
static String getProcessorName(Processor processor) {
// conditionals are implemented as wrappers around the real processor, so get the real processor for the correct type for the name
if(processor instanceof ConditionalProcessor){
processor = ((ConditionalProcessor) processor).getInnerProcessor();
Expand All @@ -469,7 +469,7 @@ static String getProcessorName(Processor processor){
sb.append(processor.getType());

if(processor instanceof PipelineProcessor){
String pipelineName = ((PipelineProcessor) processor).getPipelineName();
String pipelineName = ((PipelineProcessor) processor).getPipelineTemplate().newInstance(Map.of()).execute();
sb.append(":");
sb.append(pipelineName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,27 @@

package org.elasticsearch.ingest;

import org.elasticsearch.script.TemplateScript;

import java.util.Map;
import java.util.function.BiConsumer;

public class PipelineProcessor extends AbstractProcessor {

public static final String TYPE = "pipeline";

private final String pipelineName;

private final TemplateScript.Factory pipelineTemplate;
private final IngestService ingestService;

private PipelineProcessor(String tag, String pipelineName, IngestService ingestService) {
private PipelineProcessor(String tag, TemplateScript.Factory pipelineTemplate, IngestService ingestService) {
super(tag);
this.pipelineName = pipelineName;
this.pipelineTemplate = pipelineTemplate;
this.ingestService = ingestService;
}

@Override
public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
String pipelineName = ingestDocument.renderTemplate(this.pipelineTemplate);
Pipeline pipeline = ingestService.getPipeline(pipelineName);
if (pipeline != null) {
ingestDocument.executePipeline(pipeline, handler);
Expand All @@ -52,7 +54,8 @@ public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
throw new UnsupportedOperationException("this method should not get executed");
}

Pipeline getPipeline(){
Pipeline getPipeline(IngestDocument ingestDocument) {
String pipelineName = ingestDocument.renderTemplate(this.pipelineTemplate);
return ingestService.getPipeline(pipelineName);
}

Expand All @@ -61,8 +64,8 @@ public String getType() {
return TYPE;
}

String getPipelineName() {
return pipelineName;
TemplateScript.Factory getPipelineTemplate() {
return pipelineTemplate;
}

public static final class Factory implements Processor.Factory {
Expand All @@ -76,9 +79,9 @@ public Factory(IngestService ingestService) {
@Override
public PipelineProcessor create(Map<String, Processor.Factory> registry, String processorTag,
Map<String, Object> config) throws Exception {
String pipeline =
ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "name");
return new PipelineProcessor(processorTag, pipeline, ingestService);
TemplateScript.Factory pipelineTemplate =
ConfigurationUtils.readTemplateProperty(TYPE, processorTag, config, "name", ingestService.getScriptService());
return new PipelineProcessor(processorTag, pipelineTemplate, ingestService);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ public final class TrackingResultProcessor implements Processor {
public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
if (actualProcessor instanceof PipelineProcessor) {
PipelineProcessor pipelineProcessor = ((PipelineProcessor) actualProcessor);
Pipeline pipeline = pipelineProcessor.getPipeline();
Pipeline pipeline = pipelineProcessor.getPipeline(ingestDocument);
//runtime check for cycles against a copy of the document. This is needed to properly handle conditionals around pipelines
IngestDocument ingestDocumentCopy = new IngestDocument(ingestDocument);
ingestDocumentCopy.executePipeline(pipelineProcessor.getPipeline(), (result, e) -> {
ingestDocumentCopy.executePipeline(pipelineProcessor.getPipeline(ingestDocument), (result, e) -> {
// do nothing, let the tracking processors throw the exception while recording the path up to the failure
if (e instanceof ElasticsearchException) {
ElasticsearchException elasticsearchException = (ElasticsearchException) e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1115,7 +1115,7 @@ public void testStatName(){

PipelineProcessor pipelineProcessor = mock(PipelineProcessor.class);
String pipelineName = randomAlphaOfLength(10);
when(pipelineProcessor.getPipelineName()).thenReturn(pipelineName);
when(pipelineProcessor.getPipelineTemplate()).thenReturn(new TestTemplateService.MockTemplateScript.Factory(pipelineName));
name = PipelineProcessor.TYPE;
when(pipelineProcessor.getType()).thenReturn(name);
assertThat(IngestService.getProcessorName(pipelineProcessor), equalTo(name + ":" + pipelineName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.elasticsearch.ingest;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.test.ESTestCase;

import java.util.Arrays;
Expand All @@ -37,7 +38,7 @@ public class PipelineProcessorTests extends ESTestCase {

public void testExecutesPipeline() throws Exception {
String pipelineId = "pipeline";
IngestService ingestService = mock(IngestService.class);
IngestService ingestService = createIngestService();
CompletableFuture<IngestDocument> invoked = new CompletableFuture<>();
IngestDocument testIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
Pipeline pipeline = new Pipeline(
Expand Down Expand Up @@ -69,7 +70,7 @@ public String getTag() {
}

public void testThrowsOnMissingPipeline() throws Exception {
IngestService ingestService = mock(IngestService.class);
IngestService ingestService = createIngestService();
IngestDocument testIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService);
Map<String, Object> config = new HashMap<>();
Expand All @@ -85,7 +86,7 @@ public void testThrowsOnMissingPipeline() throws Exception {
public void testThrowsOnRecursivePipelineInvocations() throws Exception {
String innerPipelineId = "inner";
String outerPipelineId = "outer";
IngestService ingestService = mock(IngestService.class);
IngestService ingestService = createIngestService();
IngestDocument testIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
Map<String, Object> outerConfig = new HashMap<>();
outerConfig.put("name", innerPipelineId);
Expand Down Expand Up @@ -113,7 +114,7 @@ public void testThrowsOnRecursivePipelineInvocations() throws Exception {

public void testAllowsRepeatedPipelineInvocations() throws Exception {
String innerPipelineId = "inner";
IngestService ingestService = mock(IngestService.class);
IngestService ingestService = createIngestService();
IngestDocument testIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
Map<String, Object> outerConfig = new HashMap<>();
outerConfig.put("name", innerPipelineId);
Expand All @@ -131,7 +132,7 @@ public void testPipelineProcessorWithPipelineChain() throws Exception {
String pipeline1Id = "pipeline1";
String pipeline2Id = "pipeline2";
String pipeline3Id = "pipeline3";
IngestService ingestService = mock(IngestService.class);
IngestService ingestService = createIngestService();
PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService);

Map<String, Object> pipeline1ProcessorConfig = new HashMap<>();
Expand Down Expand Up @@ -203,4 +204,11 @@ pipeline3Id, null, null, new CompoundProcessor(
assertThat(pipeline2Stats.getIngestFailedCount(), equalTo(0L));
assertThat(pipeline3Stats.getIngestFailedCount(), equalTo(1L));
}

static IngestService createIngestService() {
IngestService ingestService = mock(IngestService.class);
ScriptService scriptService = mock(ScriptService.class);
when(ingestService.getScriptService()).thenReturn(scriptService);
return ingestService;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,14 @@
import static org.elasticsearch.ingest.CompoundProcessor.ON_FAILURE_MESSAGE_FIELD;
import static org.elasticsearch.ingest.CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD;
import static org.elasticsearch.ingest.CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD;
import static org.elasticsearch.ingest.PipelineProcessorTests.createIngestService;
import static org.elasticsearch.ingest.TrackingResultProcessor.decorate;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -195,7 +195,7 @@ public void testActualCompoundProcessorWithFalseConditional() throws Exception {

public void testActualPipelineProcessor() throws Exception {
String pipelineId = "pipeline1";
IngestService ingestService = mock(IngestService.class);
IngestService ingestService = createIngestService();
Map<String, Object> pipelineConfig = new HashMap<>();
pipelineConfig.put("name", pipelineId);
PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService);
Expand Down Expand Up @@ -240,7 +240,7 @@ pipelineId, null, null, new CompoundProcessor(
public void testActualPipelineProcessorWithTrueConditional() throws Exception {
String pipelineId1 = "pipeline1";
String pipelineId2 = "pipeline2";
IngestService ingestService = mock(IngestService.class);
IngestService ingestService = createIngestService();
Map<String, Object> pipelineConfig0 = new HashMap<>();
pipelineConfig0.put("name", pipelineId1);
Map<String, Object> pipelineConfig1 = new HashMap<>();
Expand Down Expand Up @@ -308,7 +308,7 @@ pipelineId2, null, null, new CompoundProcessor(
public void testActualPipelineProcessorWithFalseConditional() throws Exception {
String pipelineId1 = "pipeline1";
String pipelineId2 = "pipeline2";
IngestService ingestService = mock(IngestService.class);
IngestService ingestService = createIngestService();
Map<String, Object> pipelineConfig0 = new HashMap<>();
pipelineConfig0.put("name", pipelineId1);
Map<String, Object> pipelineConfig1 = new HashMap<>();
Expand Down Expand Up @@ -377,7 +377,7 @@ public void testActualPipelineProcessorWithHandledFailure() throws Exception {
RuntimeException exception = new RuntimeException("processor failed");

String pipelineId = "pipeline1";
IngestService ingestService = mock(IngestService.class);
IngestService ingestService = createIngestService();
Map<String, Object> pipelineConfig = new HashMap<>();
pipelineConfig.put("name", pipelineId);
PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService);
Expand Down Expand Up @@ -430,7 +430,7 @@ pipelineId, null, null, new CompoundProcessor(
public void testActualPipelineProcessorWithCycle() throws Exception {
String pipelineId1 = "pipeline1";
String pipelineId2 = "pipeline2";
IngestService ingestService = mock(IngestService.class);
IngestService ingestService = createIngestService();
Map<String, Object> pipelineConfig0 = new HashMap<>();
pipelineConfig0.put("name", pipelineId1);
Map<String, Object> pipelineConfig1 = new HashMap<>();
Expand Down Expand Up @@ -462,7 +462,7 @@ public void testActualPipelineProcessorWithCycle() throws Exception {

public void testActualPipelineProcessorRepeatedInvocation() throws Exception {
String pipelineId = "pipeline1";
IngestService ingestService = mock(IngestService.class);
IngestService ingestService = createIngestService();
Map<String, Object> pipelineConfig = new HashMap<>();
pipelineConfig.put("name", pipelineId);
PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService);
Expand Down