From 6f787696b140febc3a04ff72cf845b33befb75d8 Mon Sep 17 00:00:00 2001 From: Kat Shen Date: Fri, 25 Aug 2023 00:22:06 +0000 Subject: [PATCH 1/4] readme, config done, main code integration in progress Signed-off-by: Kat Shen --- data-prepper-plugins/key-value-processor/README.md | 3 +++ .../plugins/processor/keyvalue/KeyValueProcessor.java | 3 +++ .../processor/keyvalue/KeyValueProcessorConfig.java | 8 ++++++++ 3 files changed, 14 insertions(+) diff --git a/data-prepper-plugins/key-value-processor/README.md b/data-prepper-plugins/key-value-processor/README.md index f787284fbb..f27e377584 100644 --- a/data-prepper-plugins/key-value-processor/README.md +++ b/data-prepper-plugins/key-value-processor/README.md @@ -101,6 +101,9 @@ When run, the processor will parse the message into the following output: * `overwrite_if_destination_exists` - Specify whether to overwrite existing fields if there are key conflicts when writing parsed fields to the event. * Default: `true` +* `tag_on_failure` - When a kv operation causes a runtime exception to be thrown within the processor, the operation is safely aborted without crashing the processor, and the event is tagged with the provided value. + * Default: `["keyvalueprocessor_failure"]` + ## Developer Guide This plugin is compatible with Java 14. See - [CONTRIBUTING](https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md) diff --git a/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessor.java b/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessor.java index 9a347ad9fd..fa1192035d 100644 --- a/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessor.java +++ b/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessor.java @@ -56,12 +56,15 @@ public class KeyValueProcessor extends AbstractProcessor, Record validWhitespaceSet = Set.of(whitespaceLenient, whitespaceStrict); final String delimiterBracketCheck = "[\\[\\]()<>]"; private final Set bracketSet = Set.of('[', ']', '(', ')', '<', '>'); + private final List tagOnFailure; @DataPrepperPluginConstructor public KeyValueProcessor(final PluginMetrics pluginMetrics, final KeyValueProcessorConfig keyValueProcessorConfig) { super(pluginMetrics); this.keyValueProcessorConfig = keyValueProcessorConfig; + tagOnFailure = keyValueProcessorConfig.getTagOnFailure(); + if (keyValueProcessorConfig.getFieldDelimiterRegex() != null && !keyValueProcessorConfig.getFieldDelimiterRegex().isEmpty()) { if (keyValueProcessorConfig.getFieldSplitCharacters() != null diff --git a/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorConfig.java b/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorConfig.java index fb7e9abdad..d7a2519776 100644 --- a/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorConfig.java +++ b/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorConfig.java @@ -30,6 +30,7 @@ public class KeyValueProcessorConfig { static final boolean DEFAULT_SKIP_DUPLICATE_VALUES = false; static final boolean DEFAULT_REMOVE_BRACKETS = false; static final boolean DEFAULT_RECURSIVE = false; + static final List DEFAULT_TAG_ON_FAILURE = new ArrayList<>(Arrays.asList("keyvalueprocessor_failure")); @NotEmpty private String source = DEFAULT_SOURCE; @@ -95,6 +96,9 @@ public class KeyValueProcessorConfig { @NotNull private boolean recursive = DEFAULT_RECURSIVE; + @JsonProperty("tag_on_failure") + private List tagOnFailure = DEFAULT_TAG_ON_FAILURE; + @JsonProperty("overwrite_if_destination_exists") private boolean overwriteIfDestinationExists = true; @@ -170,6 +174,10 @@ public boolean getRecursive() { return recursive; } + public List getTagOnFailure() { + return tagOnFailure; + } + public boolean getOverwriteIfDestinationExists() { return overwriteIfDestinationExists; } From 6121d69196980977e94ddca329184f165a85ef22 Mon Sep 17 00:00:00 2001 From: Kat Shen Date: Fri, 25 Aug 2023 19:56:55 +0000 Subject: [PATCH 2/4] clarify readme with example output Signed-off-by: Kat Shen --- data-prepper-plugins/key-value-processor/README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/data-prepper-plugins/key-value-processor/README.md b/data-prepper-plugins/key-value-processor/README.md index f27e377584..e26173a7e3 100644 --- a/data-prepper-plugins/key-value-processor/README.md +++ b/data-prepper-plugins/key-value-processor/README.md @@ -103,6 +103,7 @@ When run, the processor will parse the message into the following output: * `tag_on_failure` - When a kv operation causes a runtime exception to be thrown within the processor, the operation is safely aborted without crashing the processor, and the event is tagged with the provided value. * Default: `["keyvalueprocessor_failure"]` + * Example: in the case of a runtime exception, the output will be `{"message": "some input message", "tags": ["keyvalueprocessor_failure"]}` ## Developer Guide This plugin is compatible with Java 14. See From d4c9740120f87ce2c00ccd1047346bf220da9b17 Mon Sep 17 00:00:00 2001 From: Kat Shen Date: Fri, 25 Aug 2023 20:21:23 +0000 Subject: [PATCH 3/4] add import statement Signed-off-by: Kat Shen --- .../plugins/processor/keyvalue/KeyValueProcessorConfig.java | 1 + 1 file changed, 1 insertion(+) diff --git a/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorConfig.java b/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorConfig.java index d7a2519776..de5f62b090 100644 --- a/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorConfig.java +++ b/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorConfig.java @@ -10,6 +10,7 @@ import jakarta.validation.constraints.NotNull; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Map; From 49b089ec2517dfbcad10fb583544db6dd9aeaaeb Mon Sep 17 00:00:00 2001 From: Hai Yan Date: Wed, 20 Sep 2023 11:52:08 -0500 Subject: [PATCH 4/4] Add tagging on failure Signed-off-by: Hai Yan --- .../key-value-processor/README.md | 5 ++-- .../processor/keyvalue/KeyValueProcessor.java | 26 ++++++++++++------- .../keyvalue/KeyValueProcessorConfig.java | 10 +++---- .../keyvalue/KeyValueProcessorTests.java | 14 ++++++++++ 4 files changed, 36 insertions(+), 19 deletions(-) diff --git a/data-prepper-plugins/key-value-processor/README.md b/data-prepper-plugins/key-value-processor/README.md index e26173a7e3..828d331f7e 100644 --- a/data-prepper-plugins/key-value-processor/README.md +++ b/data-prepper-plugins/key-value-processor/README.md @@ -101,9 +101,8 @@ When run, the processor will parse the message into the following output: * `overwrite_if_destination_exists` - Specify whether to overwrite existing fields if there are key conflicts when writing parsed fields to the event. * Default: `true` -* `tag_on_failure` - When a kv operation causes a runtime exception to be thrown within the processor, the operation is safely aborted without crashing the processor, and the event is tagged with the provided value. - * Default: `["keyvalueprocessor_failure"]` - * Example: in the case of a runtime exception, the output will be `{"message": "some input message", "tags": ["keyvalueprocessor_failure"]}` +* `tags_on_failure` - When a kv operation causes a runtime exception to be thrown within the processor, the operation is safely aborted without crashing the processor, and the event is tagged with the provided tags. + * Example: if `tags_on_failure` is set to `["keyvalueprocessor_failure"]`, in the case of a runtime exception, `{"tags": ["keyvalueprocessor_failure"]}` will be added to the event's metadata. ## Developer Guide This plugin is compatible with Java 14. See diff --git a/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessor.java b/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessor.java index fa1192035d..a062e90e48 100644 --- a/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessor.java +++ b/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessor.java @@ -56,14 +56,14 @@ public class KeyValueProcessor extends AbstractProcessor, Record validWhitespaceSet = Set.of(whitespaceLenient, whitespaceStrict); final String delimiterBracketCheck = "[\\[\\]()<>]"; private final Set bracketSet = Set.of('[', ']', '(', ')', '<', '>'); - private final List tagOnFailure; + private final List tagsOnFailure; @DataPrepperPluginConstructor public KeyValueProcessor(final PluginMetrics pluginMetrics, final KeyValueProcessorConfig keyValueProcessorConfig) { super(pluginMetrics); this.keyValueProcessorConfig = keyValueProcessorConfig; - tagOnFailure = keyValueProcessorConfig.getTagOnFailure(); + tagsOnFailure = keyValueProcessorConfig.getTagsOnFailure(); if (keyValueProcessorConfig.getFieldDelimiterRegex() != null && !keyValueProcessorConfig.getFieldDelimiterRegex().isEmpty()) { @@ -97,7 +97,7 @@ public KeyValueProcessor(final PluginMetrics pluginMetrics, final KeyValueProces if (keyValueProcessorConfig.getRecursive() && fieldDelimiterPattern.matcher(delimiterBracketCheck).matches()) { - throw new IllegalArgumentException("While recursive is true, the set field split characters cannot contain brackets while you are trying to recurse."); + throw new IllegalArgumentException("While recursive is true, the set field delimiter cannot contain brackets while you are trying to recurse."); } } @@ -214,8 +214,8 @@ private boolean validateRegex(final String pattern) } private void validateKeySets(final Set includeSet, final Set excludeSet, final Set defaultSet) { - final Set includeIntersectionSet = new HashSet(includeSet); - final Set defaultIntersectionSet = new HashSet(defaultSet); + final Set includeIntersectionSet = new HashSet<>(includeSet); + final Set defaultIntersectionSet = new HashSet<>(defaultSet); includeIntersectionSet.retainAll(excludeSet); if (!includeIntersectionSet.isEmpty()) { @@ -232,7 +232,7 @@ private void validateKeySets(final Set includeSet, final Set exc public Collection> doExecute(final Collection> records) { final ObjectMapper mapper = new ObjectMapper(); - for(final Record record : records) { + for (final Record record : records) { final Map outputMap = new HashMap<>(); final Event recordEvent = record.getData(); final String groupsRaw = recordEvent.get(keyValueProcessorConfig.getSource(), String.class); @@ -243,10 +243,16 @@ public Collection> doExecute(final Collection> recor JsonNode recursedTree = recurse(groupsRaw, mapper); outputMap.putAll(createRecursedMap(recursedTree, mapper)); } catch (Exception e) { - LOG.error("Recursive parsing ran into an unexpected error, treating message as non-recursive"); + LOG.error("Recursive parsing ran into an unexpected error, treating message as non-recursive", e); + recordEvent.getMetadata().addTags(tagsOnFailure); } } else { - outputMap.putAll(createNonRecursedMap(groups)); + try { + outputMap.putAll(createNonRecursedMap(groups)); + } catch (Exception e) { + LOG.error("Non-recursive parsing ran into an unexpected error", e); + recordEvent.getMetadata().addTags(tagsOnFailure); + } } final Map processedMap = executeConfigs(outputMap); @@ -278,11 +284,11 @@ public void shutdown() { } private ObjectNode recurse(final String input, final ObjectMapper mapper) { - Stack bracketStack = new Stack(); + Stack bracketStack = new Stack<>(); Map bracketMap = initBracketMap(); int pairStart = 0; - ArrayList pairs = new ArrayList(); + ArrayList pairs = new ArrayList<>(); ObjectNode root = mapper.createObjectNode(); for (int i = 0; i < input.length(); i++) { diff --git a/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorConfig.java b/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorConfig.java index de5f62b090..4e3344483b 100644 --- a/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorConfig.java +++ b/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorConfig.java @@ -10,7 +10,6 @@ import jakarta.validation.constraints.NotNull; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.Map; @@ -31,7 +30,6 @@ public class KeyValueProcessorConfig { static final boolean DEFAULT_SKIP_DUPLICATE_VALUES = false; static final boolean DEFAULT_REMOVE_BRACKETS = false; static final boolean DEFAULT_RECURSIVE = false; - static final List DEFAULT_TAG_ON_FAILURE = new ArrayList<>(Arrays.asList("keyvalueprocessor_failure")); @NotEmpty private String source = DEFAULT_SOURCE; @@ -97,8 +95,8 @@ public class KeyValueProcessorConfig { @NotNull private boolean recursive = DEFAULT_RECURSIVE; - @JsonProperty("tag_on_failure") - private List tagOnFailure = DEFAULT_TAG_ON_FAILURE; + @JsonProperty("tags_on_failure") + private List tagsOnFailure; @JsonProperty("overwrite_if_destination_exists") private boolean overwriteIfDestinationExists = true; @@ -175,8 +173,8 @@ public boolean getRecursive() { return recursive; } - public List getTagOnFailure() { - return tagOnFailure; + public List getTagsOnFailure() { + return tagsOnFailure; } public boolean getOverwriteIfDestinationExists() { diff --git a/data-prepper-plugins/key-value-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorTests.java b/data-prepper-plugins/key-value-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorTests.java index 66d9c6f7ed..1ebb910895 100644 --- a/data-prepper-plugins/key-value-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorTests.java +++ b/data-prepper-plugins/key-value-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorTests.java @@ -814,6 +814,20 @@ void testDefaultInnerKeyRecursiveKvProcessor() { assertThatKeyEquals(parsed_message, "item1-subitem1", "default"); } + @Test + void testTagsAddedWhenParsingFails() { + when(mockConfig.getRecursive()).thenReturn(true); + when(mockConfig.getTagsOnFailure()).thenReturn(List.of("tag1", "tag2")); + keyValueProcessor = new KeyValueProcessor(pluginMetrics, mockConfig); + + final Record record = getMessage("item1=[]"); + final List> editedRecords = (List>) keyValueProcessor.doExecute(Collections.singletonList(record)); + final LinkedHashMap parsed_message = getLinkedHashMap(editedRecords); + + assertThat(parsed_message.size(), equalTo(0)); + assertThat(record.getData().getMetadata().hasTags(List.of("tag1", "tag2")), is(true)); + } + @Test void testShutdownIsReady() { assertThat(keyValueProcessor.isReadyForShutdown(), is(true));