diff --git a/data-prepper-plugins/key-value-processor/README.md b/data-prepper-plugins/key-value-processor/README.md index f787284fbb..828d331f7e 100644 --- a/data-prepper-plugins/key-value-processor/README.md +++ b/data-prepper-plugins/key-value-processor/README.md @@ -101,6 +101,9 @@ When run, the processor will parse the message into the following output: * `overwrite_if_destination_exists` - Specify whether to overwrite existing fields if there are key conflicts when writing parsed fields to the event. * Default: `true` +* `tags_on_failure` - When a kv operation causes a runtime exception to be thrown within the processor, the operation is safely aborted without crashing the processor, and the event is tagged with the provided tags. + * Example: if `tags_on_failure` is set to `["keyvalueprocessor_failure"]`, in the case of a runtime exception, `{"tags": ["keyvalueprocessor_failure"]}` will be added to the event's metadata. + ## Developer Guide This plugin is compatible with Java 14. See - [CONTRIBUTING](https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md) diff --git a/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessor.java b/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessor.java index 9a347ad9fd..a062e90e48 100644 --- a/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessor.java +++ b/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessor.java @@ -56,12 +56,15 @@ public class KeyValueProcessor extends AbstractProcessor, Record validWhitespaceSet = Set.of(whitespaceLenient, whitespaceStrict); final String delimiterBracketCheck = "[\\[\\]()<>]"; private final Set bracketSet = Set.of('[', ']', '(', ')', '<', '>'); + private final List tagsOnFailure; @DataPrepperPluginConstructor public KeyValueProcessor(final PluginMetrics pluginMetrics, final KeyValueProcessorConfig keyValueProcessorConfig) { super(pluginMetrics); this.keyValueProcessorConfig = keyValueProcessorConfig; + tagsOnFailure = keyValueProcessorConfig.getTagsOnFailure(); + if (keyValueProcessorConfig.getFieldDelimiterRegex() != null && !keyValueProcessorConfig.getFieldDelimiterRegex().isEmpty()) { if (keyValueProcessorConfig.getFieldSplitCharacters() != null @@ -94,7 +97,7 @@ public KeyValueProcessor(final PluginMetrics pluginMetrics, final KeyValueProces if (keyValueProcessorConfig.getRecursive() && fieldDelimiterPattern.matcher(delimiterBracketCheck).matches()) { - throw new IllegalArgumentException("While recursive is true, the set field split characters cannot contain brackets while you are trying to recurse."); + throw new IllegalArgumentException("While recursive is true, the set field delimiter cannot contain brackets while you are trying to recurse."); } } @@ -211,8 +214,8 @@ private boolean validateRegex(final String pattern) } private void validateKeySets(final Set includeSet, final Set excludeSet, final Set defaultSet) { - final Set includeIntersectionSet = new HashSet(includeSet); - final Set defaultIntersectionSet = new HashSet(defaultSet); + final Set includeIntersectionSet = new HashSet<>(includeSet); + final Set defaultIntersectionSet = new HashSet<>(defaultSet); includeIntersectionSet.retainAll(excludeSet); if (!includeIntersectionSet.isEmpty()) { @@ -229,7 +232,7 @@ private void validateKeySets(final Set includeSet, final Set exc public Collection> doExecute(final Collection> records) { final ObjectMapper mapper = new ObjectMapper(); - for(final Record record : records) { + for (final Record record : records) { final Map outputMap = new HashMap<>(); final Event recordEvent = record.getData(); final String groupsRaw = recordEvent.get(keyValueProcessorConfig.getSource(), String.class); @@ -240,10 +243,16 @@ public Collection> doExecute(final Collection> recor JsonNode recursedTree = recurse(groupsRaw, mapper); outputMap.putAll(createRecursedMap(recursedTree, mapper)); } catch (Exception e) { - LOG.error("Recursive parsing ran into an unexpected error, treating message as non-recursive"); + LOG.error("Recursive parsing ran into an unexpected error, treating message as non-recursive", e); + recordEvent.getMetadata().addTags(tagsOnFailure); } } else { - outputMap.putAll(createNonRecursedMap(groups)); + try { + outputMap.putAll(createNonRecursedMap(groups)); + } catch (Exception e) { + LOG.error("Non-recursive parsing ran into an unexpected error", e); + recordEvent.getMetadata().addTags(tagsOnFailure); + } } final Map processedMap = executeConfigs(outputMap); @@ -275,11 +284,11 @@ public void shutdown() { } private ObjectNode recurse(final String input, final ObjectMapper mapper) { - Stack bracketStack = new Stack(); + Stack bracketStack = new Stack<>(); Map bracketMap = initBracketMap(); int pairStart = 0; - ArrayList pairs = new ArrayList(); + ArrayList pairs = new ArrayList<>(); ObjectNode root = mapper.createObjectNode(); for (int i = 0; i < input.length(); i++) { diff --git a/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorConfig.java b/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorConfig.java index fb7e9abdad..4e3344483b 100644 --- a/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorConfig.java +++ b/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorConfig.java @@ -95,6 +95,9 @@ public class KeyValueProcessorConfig { @NotNull private boolean recursive = DEFAULT_RECURSIVE; + @JsonProperty("tags_on_failure") + private List tagsOnFailure; + @JsonProperty("overwrite_if_destination_exists") private boolean overwriteIfDestinationExists = true; @@ -170,6 +173,10 @@ public boolean getRecursive() { return recursive; } + public List getTagsOnFailure() { + return tagsOnFailure; + } + public boolean getOverwriteIfDestinationExists() { return overwriteIfDestinationExists; } diff --git a/data-prepper-plugins/key-value-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorTests.java b/data-prepper-plugins/key-value-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorTests.java index 66d9c6f7ed..1ebb910895 100644 --- a/data-prepper-plugins/key-value-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorTests.java +++ b/data-prepper-plugins/key-value-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorTests.java @@ -814,6 +814,20 @@ void testDefaultInnerKeyRecursiveKvProcessor() { assertThatKeyEquals(parsed_message, "item1-subitem1", "default"); } + @Test + void testTagsAddedWhenParsingFails() { + when(mockConfig.getRecursive()).thenReturn(true); + when(mockConfig.getTagsOnFailure()).thenReturn(List.of("tag1", "tag2")); + keyValueProcessor = new KeyValueProcessor(pluginMetrics, mockConfig); + + final Record record = getMessage("item1=[]"); + final List> editedRecords = (List>) keyValueProcessor.doExecute(Collections.singletonList(record)); + final LinkedHashMap parsed_message = getLinkedHashMap(editedRecords); + + assertThat(parsed_message.size(), equalTo(0)); + assertThat(record.getData().getMetadata().hasTags(List.of("tag1", "tag2")), is(true)); + } + @Test void testShutdownIsReady() { assertThat(keyValueProcessor.isReadyForShutdown(), is(true));