Skip to content

Commit

Permalink
Add support for adding metadata entries (#2707)
Browse files Browse the repository at this point in the history
* Add support for adding metadata entries

Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>

* Updated documentation

Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>

* Updated documentation

Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>

* Added more unit tests for metadata key set

Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>

---------

Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>
  • Loading branch information
kkondaka committed May 19, 2023
1 parent f2c9341 commit c04a46b
Show file tree
Hide file tree
Showing 8 changed files with 193 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,11 @@

package org.opensearch.dataprepper.model.event;

import com.google.common.collect.ImmutableMap;

import java.time.Instant;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import java.util.Objects;
import java.util.Set;

Expand All @@ -28,7 +27,7 @@ public class DefaultEventMetadata implements EventMetadata {

private final Instant timeReceived;

private final ImmutableMap<String, Object> attributes;
private Map<String, Object> attributes;

private Set<String> tags;

Expand All @@ -41,15 +40,15 @@ private DefaultEventMetadata(final Builder builder) {

this.timeReceived = builder.timeReceived == null ? Instant.now() : builder.timeReceived;

this.attributes = builder.attributes == null ? ImmutableMap.of() : ImmutableMap.copyOf(builder.attributes);
this.attributes = builder.attributes == null ? new HashMap<>() : new HashMap<>(builder.attributes);

this.tags = builder.tags == null ? new HashSet<>() : new HashSet(builder.tags);
}

private DefaultEventMetadata(final EventMetadata eventMetadata) {
this.eventType = eventMetadata.getEventType();
this.timeReceived = eventMetadata.getTimeReceived();
this.attributes = ImmutableMap.copyOf(eventMetadata.getAttributes());
this.attributes = new HashMap<>(eventMetadata.getAttributes());
this.tags = new HashSet<>(eventMetadata.getTags());
}

Expand All @@ -68,6 +67,11 @@ public Map<String, Object> getAttributes() {
return attributes;
}

@Override
public void setAttribute(final String key, final Object value) {
attributes.put(key, value);
}

@Override
public Object getAttribute(final String attributeKey) {
String key = (attributeKey.charAt(0) == '/') ? attributeKey.substring(1) : attributeKey;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ public interface EventMetadata extends Serializable {
*/
Object getAttribute(final String key);

/**
* Sets an attribute
* @param key to be set
* @param value to be set
* @since 2.3
*/
void setAttribute(String key, Object value);

/**
* Returns the tags
* @return a set of tags
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,22 +111,16 @@ public void testGetAttribute(final String key, final Object value) {
assertThat(eventMetadata.getAttribute(key+"/key6"), equalTo(null));
}

@Test
public void testAttributesMutation_throwsAnException() {
final Map<String, Object> attributes = eventMetadata.getAttributes();

assertThrows(UnsupportedOperationException.class, () -> attributes.put("foo", "bar"));
}

@Test
public void testAttributesMutation_without_attributes_throwsAnException() {
@ParameterizedTest
@MethodSource("getAttributeTestInputs")
public void testSetAttribute(String key, final Object value) {
eventMetadata = DefaultEventMetadata.builder()
.withEventType(testEventType)
.withTimeReceived(testTimeReceived)
.build();
final Map<String, Object> attributes = eventMetadata.getAttributes();

assertThrows(UnsupportedOperationException.class, () -> attributes.put("foo", "bar"));
key = (key.charAt(0) == '/') ? key.substring(1) : key;
eventMetadata.setAttribute(key, value);
assertThat(eventMetadata.getAttribute(key), equalTo(value));
}

@Test
Expand All @@ -139,7 +133,6 @@ public void testAttributes_without_attributes_is_empty() {
assertThat(attributes, notNullValue());
assertThat(attributes.size(), equalTo(0));

assertThrows(UnsupportedOperationException.class, () -> attributes.put("foo", "bar"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
import org.opensearch.dataprepper.model.event.DefaultEventMetadata;

import java.util.Map;
import java.util.HashMap;
import java.time.Instant;
import com.google.common.collect.ImmutableMap;

abstract class DefaultBaseEventBuilder<T extends Event> implements BaseEventBuilder<T>{
private EventMetadata eventMetadata;
Expand Down Expand Up @@ -70,7 +70,7 @@ public BaseEventBuilder<T> withTimeReceived(final Instant timeReceived) {
public BaseEventBuilder<T> withEventMetadata(final EventMetadata eventMetadata) {
this.eventType = eventMetadata.getEventType();
this.timeReceived = eventMetadata.getTimeReceived();
this.attributes = ImmutableMap.copyOf(eventMetadata.getAttributes());
this.attributes = new HashMap<>(eventMetadata.getAttributes());
return this;
}

Expand Down
3 changes: 2 additions & 1 deletion data-prepper-plugins/mutate-event-processors/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ then when we run with the same input, the processor will parse the message into

### Configuration
* `entries` - (required) - A list of entries to add to an event
* `key` - (required) - The key of the new entry to be added
* `key` - (required) - The key of the new entry to be added. One of `key` or `metadata_key` is required.
* `metadata_key` - (required) - The key of the new metadata attribute to be added. Argument must be a literal string key and not JsonPointer. One of `key` or `metadata_key` is required.
* `value` - (optional) - The value of the new entry to be added. Strings, booleans, numbers, null, nested objects, and arrays containing the aforementioned data types are valid to use. Required if `format` is not specified.
* `format` - (optional) - A format string to use as value of the new entry to be added. For example, `${key1}-${ke2}` where `key1` and `key2` are existing keys in the event. Required if `value` is not specified.
* `overwrite_if_key_exists` - (optional) - When set to `true`, if `key` already exists in the event, then the existing value will be overwritten. The default is `false`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import static org.opensearch.dataprepper.logging.DataPrepperMarkers.EVENT;
Expand Down Expand Up @@ -48,16 +49,28 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor
}

try {
if (!recordEvent.containsKey(entry.getKey()) || entry.getOverwriteIfKeyExists()) {
if (!Objects.isNull(entry.getFormat())) {
recordEvent.put(entry.getKey(), recordEvent.formatString(entry.getFormat()));
} else {
recordEvent.put(entry.getKey(), entry.getValue());
final String key = entry.getKey();
final String metadataKey = entry.getMetadataKey();
Object value;
if (!Objects.isNull(entry.getFormat())) {
value = recordEvent.formatString(entry.getFormat());
} else {
value = entry.getValue();
}
if (!Objects.isNull(key)) {
if (!recordEvent.containsKey(key) || entry.getOverwriteIfKeyExists()) {
recordEvent.put(key, value);
}
} else {
Map<String, Object> attributes = recordEvent.getMetadata().getAttributes();
if (!attributes.containsKey(metadataKey) || entry.getOverwriteIfKeyExists()) {
recordEvent.getMetadata().setAttribute(metadataKey, value);

}
}
} catch (Exception e) {
LOG.error(EVENT, "Error adding entry to record [{}] with key [{}], format [{}], value [{}]",
entry.getKey(), entry.getValue(), entry.getFormat(), recordEvent, e);
LOG.error(EVENT, "Error adding entry to record [{}] with key [{}], metadataKey [{}], format [{}], value [{}]",
recordEvent, entry.getKey(), entry.getMetadataKey(), entry.getFormat(), entry.getValue(), e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@

public class AddEntryProcessorConfig {
public static class Entry {
@NotEmpty
@NotNull
private String key;

private String metadataKey;

private Object value;

private String format;
Expand All @@ -34,6 +34,10 @@ public String getKey() {
return key;
}

public String getMetadataKey() {
return metadataKey;
}

public Object getValue() {
return value;
}
Expand All @@ -54,12 +58,20 @@ public boolean hasValueOrFormat() {
}

public Entry(final String key,
final String metadataKey,
final Object value,
final String format,
final boolean overwriteIfKeyExists,
final String addWhen)
{
if (key != null && metadataKey != null) {
throw new IllegalArgumentException("Only one of the two - key and metadatakey - should be specified");
}
if (key == null && metadataKey == null) {
throw new IllegalArgumentException("At least one of the two - key and metadatakey - must be specified");
}
this.key = key;
this.metadataKey = metadataKey;
this.value = value;
this.format = format;
this.overwriteIfKeyExists = overwriteIfKeyExists;
Expand Down
Loading

0 comments on commit c04a46b

Please sign in to comment.