Skip to content

Commit

Permalink
Add support to tag events when parse_json fails to parse (#2745)
Browse files Browse the repository at this point in the history
* Add support to tag events when parse_json fails to parse

Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>

* Updated documentation

Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>

---------

Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>
  • Loading branch information
kkondaka committed May 24, 2023
1 parent ef260ee commit 5e92474
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 1 deletion.
2 changes: 2 additions & 0 deletions data-prepper-plugins/parse-json-processor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ The processor will parse the message into the following:
* If the JSON Pointer is invalid then the entire `source` data is parsed into the outgoing `Event`.
* If the pointed-to key already exists in the `Event` and the `destination` is the root, then the entire path of the key will be used.

* `tags_on_failure` (Optional): A `List` of `String`s that specifies the tags to be set in the event the processor fails to parse or an unknown exception occurs while parsing. This tag may be used in conditional expressions in other parts of the configuration

## Developer Guide
This plugin is compatible with Java 8 and up. See
- [CONTRIBUTING](https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

Expand All @@ -38,6 +39,7 @@ public class ParseJsonProcessor extends AbstractProcessor<Record<Event>, Record<
private final String destination;
private final String pointer;
private final String parseWhen;
private final List<String> tagsOnFailure;

private final ExpressionEvaluator expressionEvaluator;

Expand All @@ -51,6 +53,7 @@ public ParseJsonProcessor(final PluginMetrics pluginMetrics,
destination = parseJsonProcessorConfig.getDestination();
pointer = parseJsonProcessorConfig.getPointer();
parseWhen = parseJsonProcessorConfig.getParseWhen();
tagsOnFailure = parseJsonProcessorConfig.getTagsOnFailure();
this.expressionEvaluator = expressionEvaluator;
}

Expand Down Expand Up @@ -86,6 +89,7 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor
event.put(destination, parsedJson);
}
} catch (final JsonProcessingException jsonException) {
event.getMetadata().addTags(tagsOnFailure);
LOG.error(EVENT, "An exception occurred due to invalid JSON while reading event [{}]", event, jsonException);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import jakarta.validation.constraints.NotBlank;

import java.util.Objects;
import java.util.List;

public class ParseJsonProcessorConfig {
static final String DEFAULT_SOURCE = "message";
Expand All @@ -27,6 +28,9 @@ public class ParseJsonProcessorConfig {
@JsonProperty("parse_when")
private String parseWhen;

@JsonProperty("tags_on_failure")
private List<String> tagsOnFailure;

/**
* The field of the Event that contains the JSON data.
*
Expand Down Expand Up @@ -58,6 +62,10 @@ public String getPointer() {
return pointer;
}

public List<String> getTagsOnFailure() {
return tagsOnFailure;
}

public String getParseWhen() { return parseWhen; }

@AssertTrue(message = "destination cannot be empty, whitespace, or a front slash (/)")
Expand All @@ -67,4 +75,4 @@ boolean isValidDestination() {
final String trimmedDestination = destination.trim();
return trimmedDestination.length() != 0 && !(trimmedDestination.equals("/"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import static org.opensearch.dataprepper.plugins.processor.parsejson.ParseJsonProcessorConfig.DEFAULT_SOURCE;
import static org.opensearch.dataprepper.test.helper.ReflectivelySetField.setField;

import java.util.List;

public class ParseJsonProcessorConfigTest {

private ParseJsonProcessorConfig createObjectUnderTest() {
Expand All @@ -26,6 +28,7 @@ public void test_when_defaultParseJsonProcessorConfig_then_returns_default_value
assertThat(objectUnderTest.getSource(), equalTo(DEFAULT_SOURCE));
assertThat(objectUnderTest.getDestination(), equalTo(null));
assertThat(objectUnderTest.getPointer(), equalTo(null));
assertThat(objectUnderTest.getTagsOnFailure(), equalTo(null));
}

@Nested
Expand All @@ -50,6 +53,10 @@ void test_when_destinationIsWhiteSpaceOrFrontSlash_then_isValidDestinationFalse(
setField(ParseJsonProcessorConfig.class, config, "destination", " / ");

assertThat(config.isValidDestination(), equalTo(false));
List<String> tagsList = List.of("tag1", "tag2");
setField(ParseJsonProcessorConfig.class, config, "tagsOnFailure", tagsList);

assertThat(config.getTagsOnFailure(), equalTo(tagsList));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import static java.util.Map.entry;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
Expand Down Expand Up @@ -287,6 +288,24 @@ void test_when_condition_skips_processing_when_evaluates_to_false() {

}

@Test
void test_tags_when_json_parse_fails() {
final String source = "different_source";
final String destination = "destination_key";
when(processorConfig.getSource()).thenReturn(source);
when(processorConfig.getDestination()).thenReturn(destination);
final String whenCondition = UUID.randomUUID().toString();
when(processorConfig.getParseWhen()).thenReturn(whenCondition);
List<String> testTags = List.of("tag1", "tag2");
when(processorConfig.getTagsOnFailure()).thenReturn(testTags);
final Record<Event> testEvent = createMessageEvent("{key:}");
when(expressionEvaluator.evaluateConditional(whenCondition, testEvent.getData())).thenReturn(true);
parseJsonProcessor = createObjectUnderTest();

final Event parsedEvent = createAndParseMessageEvent(testEvent);
assertTrue(parsedEvent.getMetadata().hasTags(testTags));
}

private String constructDeeplyNestedJsonPointer(final int numberOfLayers) {
String pointer = "/" + DEEPLY_NESTED_KEY_NAME;
for (int layer = 0; layer < numberOfLayers; layer++) {
Expand Down

0 comments on commit 5e92474

Please sign in to comment.