Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Address missing processor JsonPropertyDescriptions and validations #4837

Merged
merged 3 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.peerforwarder.RequiresPeerForwarding;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.processor.AbstractProcessor;
import org.opensearch.dataprepper.model.processor.Processor;
Expand Down Expand Up @@ -78,6 +79,10 @@ public AggregateProcessor(final AggregateProcessorConfig aggregateProcessorConfi
this.localMode = aggregateProcessorConfig.getLocalMode();

pluginMetrics.gauge(CURRENT_AGGREGATE_GROUPS, aggregateGroupManager, AggregateGroupManager::getAllGroupsSize);

if (aggregateProcessorConfig.getWhenCondition() != null && (!expressionEvaluator.isValidExpressionStatement(aggregateProcessorConfig.getWhenCondition()))) {
Copy link
Collaborator

@oeyh oeyh Aug 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense to me. Are we adding this validation to all processors now to fail fast on invalid expressions?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that's right

throw new InvalidPluginConfigurationException("aggregate_when {} is not a valid expression statement. See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax");
}
}

private AggregateAction loadAggregateAction(final PluginFactory pluginFactory) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@

package org.opensearch.dataprepper.plugins.processor.aggregate;

import org.opensearch.dataprepper.model.configuration.PluginModel;
import com.fasterxml.jackson.annotation.JsonPropertyDescription;
import org.opensearch.dataprepper.model.configuration.PluginModel;
import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.AssertTrue;
import jakarta.validation.constraints.NotEmpty;
Expand All @@ -28,7 +28,7 @@ public class AggregateProcessorConfig {
@JsonProperty("group_duration")
private Duration groupDuration = Duration.ofSeconds(DEFAULT_GROUP_DURATION_SECONDS);

@JsonPropertyDescription("The action to be performed on each group. One of the available aggregate actions must be provided, or you can create custom aggregate actions. remove_duplicates and put_all are the available actions. For more information, see Creating New Aggregate Actions.")
@JsonPropertyDescription("The action to be performed on each group. One of the available aggregate actions must be provided.")
@JsonProperty("action")
@NotNull
private PluginModel aggregateAction;
Expand All @@ -46,7 +46,7 @@ public class AggregateProcessorConfig {
@JsonProperty("aggregated_events_tag")
private String aggregatedEventsTag;

@JsonPropertyDescription("A Data Prepper conditional expression (https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/), such as '/some-key == \"test\"', that will be evaluated to determine whether the processor will be run on the event.")
@JsonPropertyDescription("A Data Prepper [conditional expression](https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/), such as '/some-key == \"test\"', that will be evaluated to determine whether the processor will be run on the event.")
@JsonProperty("aggregate_when")
private String whenCondition;

Expand Down Expand Up @@ -74,7 +74,7 @@ public Boolean getLocalMode() {
return localMode;
}

@AssertTrue(message="Aggragated Events Tag must be set when output_unaggregated_events is set")
@AssertTrue(message="Aggregated Events Tag must be set when output_unaggregated_events is set")
boolean isValidConfig() {
return (!outputUnaggregatedEvents || (outputUnaggregatedEvents && aggregatedEventsTag != null));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@

public class AppendAggregateActionConfig {

@JsonPropertyDescription("List of keys to append.")
@JsonProperty("keys_to_append")
@JsonPropertyDescription("A list of keys to append to for the aggregated result.")
List<String> keysToAppend;

public List<String> getKeysToAppend() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@
package org.opensearch.dataprepper.plugins.processor.aggregate.actions;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyDescription;
import jakarta.validation.constraints.NotNull;
import com.fasterxml.jackson.annotation.JsonPropertyDescription;
import jakarta.validation.constraints.AssertTrue;

public class PercentSamplerAggregateActionConfig {
@JsonPropertyDescription("Percent value of the sampling to be done. 0.0 < percent < 100.0")
@JsonPropertyDescription("The percentage of events to be processed during a one second interval. Must be greater than 0.0 and less than 100.0")
@JsonProperty("percent")
@NotNull
private double percent;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class TailSamplerAggregateActionConfig {
@NotNull
private Integer percent;

@JsonPropertyDescription("A Data Prepper conditional expression (https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/), such as '/some-key == \"test\"', that will be evaluated to determine whether the event is an error event or not")
@JsonPropertyDescription("A Data Prepper [conditional expression](https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/), such as '/some-key == \"test\"', that will be evaluated to determine whether the event is an error event or not")
@JsonProperty("condition")
private String condition;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ void aggregateWithPutAllActionAndCondition() throws InterruptedException {
String condition = "/firstRandomNumber < 100";
when(aggregateProcessorConfig.getGroupDuration()).thenReturn(Duration.ofSeconds(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE));
when(aggregateProcessorConfig.getWhenCondition()).thenReturn(condition);
when(expressionEvaluator.isValidExpressionStatement(condition)).thenReturn(true);
int count = 0;
for (Record<Event> record: eventBatch) {
Event event = record.getData();
Expand Down Expand Up @@ -410,6 +411,7 @@ void aggregateWithCountAggregateActionWithCondition() throws InterruptedExceptio
final String condition = "/firstRandomNumber < 100";
when(aggregateProcessorConfig.getGroupDuration()).thenReturn(Duration.ofSeconds(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE));
when(aggregateProcessorConfig.getWhenCondition()).thenReturn(condition);
when(expressionEvaluator.isValidExpressionStatement(condition)).thenReturn(true);
int count = 0;
eventBatch = getBatchOfEvents(true);
for (Record<Event> record: eventBatch) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.record.Record;
import io.micrometer.core.instrument.Counter;
Expand Down Expand Up @@ -41,6 +42,7 @@
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
Expand Down Expand Up @@ -152,6 +154,16 @@ void setUp() {
when(pluginMetrics.timer(MetricNames.TIME_ELAPSED)).thenReturn(timeElapsed);
}

@Test
void invalid_aggregate_when_statement_throws_InvalidPluginConfigurationException() {
final String whenCondition = UUID.randomUUID().toString();
when(aggregateProcessorConfig.getWhenCondition()).thenReturn(whenCondition);

when(expressionEvaluator.isValidExpressionStatement(whenCondition)).thenReturn(false);

assertThrows(InvalidPluginConfigurationException.class, this::createObjectUnderTest);
}

@Test
void getIdentificationKeys_should_return_configured_identification_keys() {
final List<String> keys = List.of("key");
Expand Down Expand Up @@ -218,6 +230,7 @@ void handleEvent_returning_with_condition_eliminates_one_record() {
when(identificationKeysHasher.createIdentificationKeysMapFromEvent(firstEvent))
.thenReturn(identificationKeysMap);
when(aggregateActionSynchronizer.handleEventForGroup(firstEvent, identificationKeysMap, aggregateGroup)).thenReturn(firstAggregateActionResponse);
when(expressionEvaluator.isValidExpressionStatement(condition)).thenReturn(true);
when(expressionEvaluator.evaluateConditional(condition, event)).thenReturn(true);
when(expressionEvaluator.evaluateConditional(condition, firstEvent)).thenReturn(true);
when(expressionEvaluator.evaluateConditional(condition, secondEvent)).thenReturn(false);
Expand Down Expand Up @@ -280,6 +293,7 @@ void handleEvent_returning_with_condition_eliminates_one_record_local_only() {
when(identificationKeysHasher.createIdentificationKeysMapFromEvent(firstEvent))
.thenReturn(identificationKeysMap);
when(aggregateActionSynchronizer.handleEventForGroup(firstEvent, identificationKeysMap, aggregateGroup)).thenReturn(firstAggregateActionResponse);
when(expressionEvaluator.isValidExpressionStatement(condition)).thenReturn(true);
when(expressionEvaluator.evaluateConditional(condition, event)).thenReturn(true);
when(expressionEvaluator.evaluateConditional(condition, firstEvent)).thenReturn(true);
when(expressionEvaluator.evaluateConditional(condition, secondEvent)).thenReturn(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public class CsvProcessorConfig {
private List<String> columnNames;

@JsonProperty("csv_when")
@JsonPropertyDescription("Allows you to specify a [conditional expression](https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/), " +
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might be wrong but I thought we want to keep this markdown format here so it will be rendered in the documentation website. @chenqi0805

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We aren't rendering the documentation website from this.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not today, but we are planning to do that, see this: opensearch-project/documentation-website#7651

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. We plan to generate the config table from those annotations

@JsonPropertyDescription("Allows you to specify a Data Prepper [conditional expression](https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/), " +
"such as `/some-key == \"test\"`, that will be evaluated to determine whether " +
"the processor should be applied to the event.")
private String csvWhen;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
import org.opensearch.dataprepper.model.processor.AbstractProcessor;
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.model.record.Record;
Expand Down Expand Up @@ -63,6 +64,10 @@ public DateProcessor(PluginMetrics pluginMetrics, final DateProcessorConfig date

if (dateProcessorConfig.getMatch() != null)
extractKeyAndFormatters();

if (dateProcessorConfig.getDateWhen() != null && (!expressionEvaluator.isValidExpressionStatement(dateProcessorConfig.getDateWhen()))) {
throw new InvalidPluginConfigurationException("date_when {} is not a valid expression statement. See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax");
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public static class DateMatch {
@JsonProperty("patterns")
@JsonPropertyDescription("A list of possible patterns that the timestamp value of the key can have. The patterns " +
"are based on a sequence of letters and symbols. The `patterns` support all the patterns listed in the " +
"Java [DatetimeFormatter](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html) reference. " +
"Java DateTimeFormatter (https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html) reference. " +
"The timestamp value also supports `epoch_second`, `epoch_milli`, and `epoch_nano` values, " +
"which represent the timestamp as the number of seconds, milliseconds, and nanoseconds since the epoch. " +
"Epoch values always use the UTC time zone.")
Expand All @@ -54,6 +54,7 @@ public List<String> getPatterns() {
}

@JsonIgnore
@AssertTrue
public boolean isValidPatterns() {
// For now, allow only one of the three "epoch_" pattern
int count = 0;
Expand Down Expand Up @@ -119,23 +120,23 @@ public static boolean isValidPattern(final String pattern) {
@JsonProperty("source_timezone")
@JsonPropertyDescription("The time zone used to parse dates, including when the zone or offset cannot be extracted " +
"from the value. If the zone or offset are part of the value, then the time zone is ignored. " +
"A list of all the available time zones is contained in the **TZ database name** column of " +
"[the list of database time zones](https://en.wikipedia.org/wiki/List_of_tz_database_time_zones#List).")
"A list of all the available time zones is contained in the TZ database name column of " +
"(https://en.wikipedia.org/wiki/List_of_tz_database_time_zones#List).")
private String sourceTimezone = DEFAULT_SOURCE_TIMEZONE;

@JsonProperty("destination_timezone")
@JsonPropertyDescription("The time zone used for storing the timestamp in the `destination` field. " +
"A list of all the available time zones is contained in the **TZ database name** column of " +
"[the list of database time zones](https://en.wikipedia.org/wiki/List_of_tz_database_time_zones#List).")
"A list of all the available time zones is contained in the TZ database name column of " +
"(https://en.wikipedia.org/wiki/List_of_tz_database_time_zones#List).")
private String destinationTimezone = DEFAULT_DESTINATION_TIMEZONE;

@JsonProperty("locale")
@JsonPropertyDescription("The location used for parsing dates. Commonly used for parsing month names (`MMM`). " +
"The value can contain language, country, or variant fields in IETF BCP 47, such as `en-US`, " +
"or a string representation of the " +
"[locale](https://docs.oracle.com/javase/8/docs/api/java/util/Locale.html) object, such as `en_US`. " +
"locale (https://docs.oracle.com/javase/8/docs/api/java/util/Locale.html) object, such as `en_US`. " +
"A full list of locale fields, including language, country, and variant, can be found in " +
"[the language subtag registry](https://www.iana.org/assignments/language-subtag-registry/language-subtag-registry). " +
"(https://www.iana.org/assignments/language-subtag-registry/language-subtag-registry). " +
"Default is `Locale.ROOT`.")
private String locale;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
import org.opensearch.dataprepper.model.record.Record;

import java.time.Instant;
Expand All @@ -44,6 +45,7 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.params.provider.Arguments.arguments;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.lenient;
Expand Down Expand Up @@ -105,6 +107,17 @@ private DateProcessor createObjectUnderTest() {
return new DateProcessor(pluginMetrics, mockDateProcessorConfig, expressionEvaluator);
}

@Test
void invalid_date_when_condition_throws_InvalidPluginConfigurationException() {
final String dateWhen = UUID.randomUUID().toString();

when(mockDateProcessorConfig.getDateWhen()).thenReturn(dateWhen);

when(expressionEvaluator.isValidExpressionStatement(dateWhen)).thenReturn(false);

assertThrows(InvalidPluginConfigurationException.class, this::createObjectUnderTest);
}

@Test
void from_time_received_with_default_destination_test() {
when(mockDateProcessorConfig.getFromTimeReceived()).thenReturn(true);
Expand All @@ -130,7 +143,9 @@ void from_time_received_with_default_destination_test() {

@Test
void date_when_does_not_run_date_processor_for_event_with_date_when_as_false() {
when(mockDateProcessorConfig.getDateWhen()).thenReturn(UUID.randomUUID().toString());
final String dateWhen = UUID.randomUUID().toString();
when(mockDateProcessorConfig.getDateWhen()).thenReturn(dateWhen);
when(expressionEvaluator.isValidExpressionStatement(dateWhen)).thenReturn(true);
dateProcessor = createObjectUnderTest();

Map<String, Object> testData = getTestData();
Expand Down Expand Up @@ -526,7 +541,9 @@ void match_without_year_test(String pattern) {

@Test
void date_processor_catches_exceptions_instead_of_throwing() {
when(mockDateProcessorConfig.getDateWhen()).thenReturn(UUID.randomUUID().toString());
final String dateWhen = UUID.randomUUID().toString();
when(mockDateProcessorConfig.getDateWhen()).thenReturn(dateWhen);
when(expressionEvaluator.isValidExpressionStatement(dateWhen)).thenReturn(true);
when(expressionEvaluator.evaluateConditional(any(String.class), any(Event.class)))
.thenThrow(RuntimeException.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyDescription;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import org.opensearch.dataprepper.plugins.processor.decompress.encoding.EncodingType;
Expand All @@ -16,17 +17,22 @@

public class DecompressProcessorConfig {

@JsonPropertyDescription("The keys in the event that will be decompressed.")
@JsonProperty("keys")
@NotEmpty
@NotNull
private List<String> keys;

@JsonPropertyDescription("The type of decompression to use for the keys in the event. Only gzip is supported.")
@JsonProperty("type")
@NotNull
private DecompressionType decompressionType;

@JsonPropertyDescription("A conditional expression that determines when the decompress processor will run on certain events.")
@JsonProperty("decompress_when")
private String decompressWhen;

@JsonPropertyDescription("A list of strings with which to tag events when the processor fails to decompress the keys inside an event. Defaults to _decompression_failure.")
@JsonProperty("tags_on_failure")
private List<String> tagsOnFailure = List.of("_decompression_failure");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
import org.opensearch.dataprepper.model.processor.AbstractProcessor;
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.model.record.Record;
Expand Down Expand Up @@ -49,6 +50,11 @@ public DissectProcessor(PluginMetrics pluginMetrics, final DissectProcessorConfi
dissectorMap.put(key, dissector);
}

if (dissectConfig.getDissectWhen() != null &&
(!expressionEvaluator.isValidExpressionStatement(dissectConfig.getDissectWhen()))) {
throw new InvalidPluginConfigurationException("dissect_when {} is not a valid expression statement. See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax");
}

}

@Override
Expand Down
Loading
Loading