Skip to content

Commit

Permalink
Add tagging on failure
Browse files Browse the repository at this point in the history
Signed-off-by: Hai Yan <oeyh@amazon.com>
  • Loading branch information
oeyh committed Sep 20, 2023
1 parent 8105c2c commit d674f12
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 19 deletions.
5 changes: 2 additions & 3 deletions data-prepper-plugins/key-value-processor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,8 @@ When run, the processor will parse the message into the following output:
* While `recursive` is `true`, `skip_duplicate_values` will always be `true`.
* While `recursive` is `true`, `whitespace` will always be `"strict"`.

* `tag_on_failure` - When a kv operation causes a runtime exception to be thrown within the processor, the operation is safely aborted without crashing the processor, and the event is tagged with the provided value.
* Default: `["keyvalueprocessor_failure"]`
* Example: in the case of a runtime exception, the output will be `{"message": "some input message", "tags": ["keyvalueprocessor_failure"]}`
* `tags_on_failure` - When a kv operation causes a runtime exception to be thrown within the processor, the operation is safely aborted without crashing the processor, and the event is tagged with the provided tags.
* Example: if `tags_on_failure` is set to `["keyvalueprocessor_failure"]`, in the case of a runtime exception, `{"tags": ["keyvalueprocessor_failure"]}` will be added to the event's metadata.

## Developer Guide
This plugin is compatible with Java 14. See
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,14 @@ public class KeyValueProcessor extends AbstractProcessor<Record<Event>, Record<E
private final Set<String> validWhitespaceSet = Set.of(whitespaceLenient, whitespaceStrict);
final String delimiterBracketCheck = "[\\[\\]()<>]";
private final Set<Character> bracketSet = Set.of('[', ']', '(', ')', '<', '>');
private final List<String> tagOnFailure;
private final List<String> tagsOnFailure;

@DataPrepperPluginConstructor
public KeyValueProcessor(final PluginMetrics pluginMetrics, final KeyValueProcessorConfig keyValueProcessorConfig) {
super(pluginMetrics);
this.keyValueProcessorConfig = keyValueProcessorConfig;

tagOnFailure = keyValueProcessorConfig.getTagOnFailure();
tagsOnFailure = keyValueProcessorConfig.getTagsOnFailure();

if (keyValueProcessorConfig.getFieldDelimiterRegex() != null
&& !keyValueProcessorConfig.getFieldDelimiterRegex().isEmpty()) {
Expand Down Expand Up @@ -97,7 +97,7 @@ public KeyValueProcessor(final PluginMetrics pluginMetrics, final KeyValueProces

if (keyValueProcessorConfig.getRecursive()
&& fieldDelimiterPattern.matcher(delimiterBracketCheck).matches()) {
throw new IllegalArgumentException("While recursive is true, the set field split characters cannot contain brackets while you are trying to recurse.");
throw new IllegalArgumentException("While recursive is true, the set field delimiter cannot contain brackets while you are trying to recurse.");
}
}

Expand Down Expand Up @@ -214,8 +214,8 @@ private boolean validateRegex(final String pattern)
}

private void validateKeySets(final Set<String> includeSet, final Set<String> excludeSet, final Set<String> defaultSet) {
final Set<String> includeIntersectionSet = new HashSet<String>(includeSet);
final Set<String> defaultIntersectionSet = new HashSet<String>(defaultSet);
final Set<String> includeIntersectionSet = new HashSet<>(includeSet);
final Set<String> defaultIntersectionSet = new HashSet<>(defaultSet);

includeIntersectionSet.retainAll(excludeSet);
if (!includeIntersectionSet.isEmpty()) {
Expand All @@ -232,7 +232,7 @@ private void validateKeySets(final Set<String> includeSet, final Set<String> exc
public Collection<Record<Event>> doExecute(final Collection<Record<Event>> records) {
final ObjectMapper mapper = new ObjectMapper();

for(final Record<Event> record : records) {
for (final Record<Event> record : records) {
final Map<String, Object> outputMap = new HashMap<>();
final Event recordEvent = record.getData();
final String groupsRaw = recordEvent.get(keyValueProcessorConfig.getSource(), String.class);
Expand All @@ -243,10 +243,16 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor
JsonNode recursedTree = recurse(groupsRaw, mapper);
outputMap.putAll(createRecursedMap(recursedTree, mapper));
} catch (Exception e) {
LOG.error("Recursive parsing ran into an unexpected error, treating message as non-recursive");
LOG.error("Recursive parsing ran into an unexpected error, treating message as non-recursive", e);
recordEvent.getMetadata().addTags(tagsOnFailure);
}
} else {
outputMap.putAll(createNonRecursedMap(groups));
try {
outputMap.putAll(createNonRecursedMap(groups));
} catch (Exception e) {
LOG.error("Non-recursive parsing ran into an unexpected error", e);
recordEvent.getMetadata().addTags(tagsOnFailure);
}
}

final Map<String, Object> processedMap = executeConfigs(outputMap);
Expand All @@ -258,11 +264,11 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor
}

private ObjectNode recurse(final String input, final ObjectMapper mapper) {
Stack<Character> bracketStack = new Stack<Character>();
Stack<Character> bracketStack = new Stack<>();
Map<Character, Character> bracketMap = initBracketMap();
int pairStart = 0;

ArrayList<String> pairs = new ArrayList<String>();
ArrayList<String> pairs = new ArrayList<>();
ObjectNode root = mapper.createObjectNode();

for (int i = 0; i < input.length(); i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import jakarta.validation.constraints.NotNull;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

Expand All @@ -31,7 +30,6 @@ public class KeyValueProcessorConfig {
static final boolean DEFAULT_SKIP_DUPLICATE_VALUES = false;
static final boolean DEFAULT_REMOVE_BRACKETS = false;
static final boolean DEFAULT_RECURSIVE = false;
static final List<String> DEFAULT_TAG_ON_FAILURE = new ArrayList<>(Arrays.asList("keyvalueprocessor_failure"));

@NotEmpty
private String source = DEFAULT_SOURCE;
Expand Down Expand Up @@ -98,8 +96,8 @@ public class KeyValueProcessorConfig {
@NotNull
private boolean recursive = DEFAULT_RECURSIVE;

@JsonProperty("tag_on_failure")
private List<String> tagOnFailure = DEFAULT_TAG_ON_FAILURE;
@JsonProperty("tags_on_failure")
private List<String> tagsOnFailure;

public String getSource() {
return source;
Expand Down Expand Up @@ -173,7 +171,7 @@ public boolean getRecursive() {
return recursive;
}

public List<String> getTagOnFailure() {
return tagOnFailure;
public List<String> getTagsOnFailure() {
return tagsOnFailure;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -743,6 +743,20 @@ void testDefaultInnerKeyRecursiveKvProcessor() {
assertThatKeyEquals(parsed_message, "item1-subitem1", "default");
}

@Test
void testTagsAddedWhenParsingFails() {
when(mockConfig.getRecursive()).thenReturn(true);
when(mockConfig.getTagsOnFailure()).thenReturn(List.of("tag1", "tag2"));
keyValueProcessor = new KeyValueProcessor(pluginMetrics, mockConfig);

final Record<Event> record = getMessage("item1=[]");
final List<Record<Event>> editedRecords = (List<Record<Event>>) keyValueProcessor.doExecute(Collections.singletonList(record));
final LinkedHashMap<String, Object> parsed_message = getLinkedHashMap(editedRecords);

assertThat(parsed_message.size(), equalTo(0));
assertThat(record.getData().getMetadata().hasTags(List.of("tag1", "tag2")), is(true));
}

@Test
void testShutdownIsReady() {
assertThat(keyValueProcessor.isReadyForShutdown(), is(true));
Expand Down

0 comments on commit d674f12

Please sign in to comment.