From 121195818e39fe16014b87ba6b9707c662fc8e74 Mon Sep 17 00:00:00 2001 From: DE20436406 Date: Tue, 28 Feb 2023 19:56:12 +0530 Subject: [PATCH 1/5] Signed-off-by: deepaksahu562 deepak.sahu562@gmail.com Description Created "s3-sink" plugin. Github issue : #1048 Added Functionality Configurations for the bucket name, key path and key pattern. The key pattern support timestamps such as logs-${YYYY.mm}-${uniqueId}. Collection of objects from Buffer and store it in RAM/Local file. Check List New functionality s3-sink plugin. New functionality has been documented. New functionality has javadoc added. Commits are signed per the DCO using --signoff By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license. For more information on following Developer Certificate of Origin and signing off your commits, please check https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md --- data-prepper-plugins/s3-sink/README.md | 9 + data-prepper-plugins/s3-sink/build.gradle | 67 +++++++ .../plugins/sink/S3ObjectIndex.java | 111 +++++++++++ .../dataprepper/plugins/sink/S3Sink.java | 116 ++++++++++++ .../plugins/sink/S3SinkConfig.java | 96 ++++++++++ .../plugins/sink/S3SinkService.java | 42 +++++ .../plugins/sink/S3SinkWorker.java | 174 ++++++++++++++++++ .../AwsAuthenticationOptions.java | 82 +++++++++ .../sink/configuration/ObjectOptions.java | 27 +++ .../sink/configuration/ThresholdOptions.java | 50 +++++ .../s3-sink/src/main/resources/pipelines.yaml | 21 +++ settings.gradle | 1 + 12 files changed, 796 insertions(+) create mode 100644 data-prepper-plugins/s3-sink/README.md create mode 100644 data-prepper-plugins/s3-sink/build.gradle create mode 100644 data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3ObjectIndex.java create mode 100644 data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3Sink.java create mode 100644 data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkConfig.java create mode 100644 data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkService.java create mode 100644 data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkWorker.java create mode 100644 data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/AwsAuthenticationOptions.java create mode 100644 data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ObjectOptions.java create mode 100644 data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ThresholdOptions.java create mode 100644 data-prepper-plugins/s3-sink/src/main/resources/pipelines.yaml diff --git a/data-prepper-plugins/s3-sink/README.md b/data-prepper-plugins/s3-sink/README.md new file mode 100644 index 0000000000..1fbdefe20a --- /dev/null +++ b/data-prepper-plugins/s3-sink/README.md @@ -0,0 +1,9 @@ +Attached simple-sample-pipeline: opensearch-data-prepper\data-prepper\data-prepper-plugins\s3-sink\src\main\resources\pipelines.yaml + +Functional Requirements +1 Provide a mechanism to received events from buffer then process and write to s3. +2 Codecs encode the events into the desired format based on the configuration. +3 Flush the encoded events into s3 bucket as objects. +4 Object name based on the key-pattern. +5 Object length depends on the thresholds provided in the configuration. +6 The Thresholds such as events count, bytes capacity and data collection duration. diff --git a/data-prepper-plugins/s3-sink/build.gradle b/data-prepper-plugins/s3-sink/build.gradle new file mode 100644 index 0000000000..acfa7bfff6 --- /dev/null +++ b/data-prepper-plugins/s3-sink/build.gradle @@ -0,0 +1,67 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +plugins { + id 'java' +} + +repositories { + mavenCentral() +} + +dependencies { + implementation project(':data-prepper-api') + implementation 'io.micrometer:micrometer-core' + implementation 'software.amazon.awssdk:s3' + implementation 'software.amazon.awssdk:sts' + implementation 'software.amazon.awssdk:sqs' + implementation 'com.fasterxml.jackson.core:jackson-core' + implementation 'com.fasterxml.jackson.core:jackson-databind' + implementation 'org.apache.commons:commons-compress:1.21' + implementation 'joda-time:joda-time:2.11.1' + implementation 'org.hibernate.validator:hibernate-validator:7.0.5.Final' + implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-csv' + implementation 'software.amazon.awssdk:aws-sdk-java:2.17.148' + implementation 'org.mapdb:mapdb:3.0.8' + testImplementation 'org.apache.commons:commons-lang3:3.12.0' + testImplementation project(':data-prepper-test-common') +} + +test { + useJUnitPlatform() +} + +sourceSets { + integrationTest { + java { + compileClasspath += main.output + test.output + runtimeClasspath += main.output + test.output + srcDir file('src/integrationTest/java') + } + resources.srcDir file('src/integrationTest/resources') + } +} + +configurations { + integrationTestImplementation.extendsFrom testImplementation + integrationTestRuntime.extendsFrom testRuntime +} + +task integrationTest(type: Test) { + group = 'verification' + testClassesDirs = sourceSets.integrationTest.output.classesDirs + + useJUnitPlatform() + + classpath = sourceSets.integrationTest.runtimeClasspath + systemProperty 'tests.s3source.bucket', System.getProperty('tests.s3source.bucket') + systemProperty 'tests.s3source.region', System.getProperty('tests.s3source.region') + systemProperty 'tests.s3source.queue.url', System.getProperty('tests.s3source.queue.url') + + filter { + includeTestsMatching '*IT' + } +} + diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3ObjectIndex.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3ObjectIndex.java new file mode 100644 index 0000000000..31a173a621 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3ObjectIndex.java @@ -0,0 +1,111 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink; + +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Set; +import java.util.TimeZone; +import java.util.UUID; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Reference to an S3 object key Index patterns. + */ + +public class S3ObjectIndex { + + private static final String TIME_PATTERN_STARTING_SYMBOLS = "\\${"; + + //For matching a string that begins with a "${" and ends with a "}". + //For a string like "data-prepper-${yyyy-MM-dd}", "${yyyy-MM-dd}" is matched. + private static final String TIME_PATTERN_REGULAR_EXPRESSION = "\\$\\{.*?\\}"; + + //For matching a string enclosed by "%{" and "}". + //For a string like "data-prepper-${yyyy-MM}", "yyyy-MM" is matched. + private static final String TIME_PATTERN_INTERNAL_EXTRACTOR_REGULAR_EXPRESSION = "\\$\\{(.*?)\\}"; + + private static final ZoneId UTC_ZONE_ID = ZoneId.of(TimeZone.getTimeZone("UTC").getID()); + + S3ObjectIndex() { } + + /* + Create Index with date,time with UniqueID prepended. + */ + public static String getIndexAliasWithDate(final String indexAlias) { + DateTimeFormatter dateFormatter = getDatePatternFormatter(indexAlias); + String suffix = (dateFormatter != null) ? dateFormatter.format(getCurrentUtcTime()) : ""; + return indexAlias.replaceAll(TIME_PATTERN_REGULAR_EXPRESSION, "") + suffix + UUID.randomUUID(); + } + + /* + Validate the index with the regular expression pattern. Throws exception if validation fails + */ + public static DateTimeFormatter getDatePatternFormatter(final String indexAlias) { + final Pattern pattern = Pattern.compile(TIME_PATTERN_INTERNAL_EXTRACTOR_REGULAR_EXPRESSION); + final Matcher timePatternMatcher = pattern.matcher(indexAlias); + if (timePatternMatcher.find()) { + final String timePattern = timePatternMatcher.group(1); + if (timePatternMatcher.find()) { // check if there is a one more match. + throw new IllegalArgumentException("An index only allows one date-time pattern."); + } + if(timePattern.contains(TIME_PATTERN_STARTING_SYMBOLS)){ //check if it is a nested pattern such as "data-prepper-%{%{yyyy.MM.dd}}" + throw new IllegalArgumentException("An index doesn't allow nested date-time patterns."); + } + validateTimePatternIsAtTheEnd(indexAlias, timePattern); + validateNoSpecialCharsInTimePattern(timePattern); + validateTimePatternGranularity(timePattern); + return DateTimeFormatter.ofPattern(timePattern); + } + return null; + } + + /* + Data Prepper only allows time pattern as a suffix. + */ + private static void validateTimePatternIsAtTheEnd(final String indexAlias, final String timePattern) { + if (!indexAlias.endsWith(timePattern + "}")) { + throw new IllegalArgumentException("Time pattern can only be a suffix of an index."); + } + } + + /* + * Special characters can cause failures in creating indexes. + * */ + private static final Set INVALID_CHARS = Set.of('#', '\\', '/', '*', '?', '"', '<', '>', '|', ',', ':'); + public static void validateNoSpecialCharsInTimePattern(String timePattern) { + boolean containsInvalidCharacter = timePattern.chars() + .mapToObj(c -> (char) c) + .anyMatch(character -> INVALID_CHARS.contains(character)); + if (containsInvalidCharacter) { + throw new IllegalArgumentException("Index time pattern contains one or multiple special characters: " + INVALID_CHARS); + } + } + + /* + * Validates the time pattern, support creating indexes with time patterns that are too granular hour, minute and second + */ + private static final Set UNSUPPORTED_TIME_GRANULARITY_CHARS = Set.of('A', 'n', 'N'); + public static void validateTimePatternGranularity(String timePattern) { + boolean containsUnsupportedTimeSymbol = timePattern.chars() + .mapToObj(c -> (char) c) + .anyMatch(character -> UNSUPPORTED_TIME_GRANULARITY_CHARS.contains(character)); + if (containsUnsupportedTimeSymbol) { + throw new IllegalArgumentException("Index time pattern contains time patterns that are less than one hour: " + + UNSUPPORTED_TIME_GRANULARITY_CHARS); + } + } + + /* + Returns the current UTC Date and Time + */ + public static ZonedDateTime getCurrentUtcTime() { + return LocalDateTime.now().atZone(ZoneId.systemDefault()).withZoneSameInstant(UTC_ZONE_ID); + } +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3Sink.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3Sink.java new file mode 100644 index 0000000000..9708283648 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3Sink.java @@ -0,0 +1,116 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink; + +import java.util.Collection; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; + +import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; +import org.opensearch.dataprepper.model.configuration.PluginModel; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; +import org.opensearch.dataprepper.model.plugin.PluginFactory; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.sink.AbstractSink; +import org.opensearch.dataprepper.model.sink.Sink; +//import org.opensearch.dataprepper.plugins.sink.codec.Codec; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +/** + * Implementation class of s3-sink plugin + * + */ +@DataPrepperPlugin(name = "s3", pluginType = Sink.class, pluginConfigurationType = S3SinkConfig.class) +public class S3Sink extends AbstractSink> { + + private static final Logger LOG = LoggerFactory.getLogger(S3Sink.class); + private static final int EVENT_QUEUE_SIZE = 100000; + + private final S3SinkConfig s3SinkConfig; + private volatile boolean initialized; + private static BlockingQueue eventQueue; + private static boolean isStopRequested; + + //private final Codec codec; + private final ObjectMapper objectMapper = new ObjectMapper(); + + /** + * + * @param pluginSetting + * @param s3SinkConfig + * @param pluginFactory + */ + @DataPrepperPluginConstructor + public S3Sink(PluginSetting pluginSetting, final S3SinkConfig s3SinkConfig, final PluginFactory pluginFactory) { + super(pluginSetting); + this.s3SinkConfig = s3SinkConfig; + final PluginModel codecConfiguration = s3SinkConfig.getCodec(); + final PluginSetting codecPluginSettings = new PluginSetting(codecConfiguration.getPluginName(), codecConfiguration.getPluginSettings()); + //codec = pluginFactory.loadPlugin(Codec.class, codecPluginSettings); + initialized = Boolean.FALSE; + } + + @Override + public boolean isReady() { + return initialized; + } + + @Override + public void doInitialize() { + try { + doInitializeInternal(); + } catch (InvalidPluginConfigurationException e) { + LOG.error("Failed to initialize S3-Sink."); + this.shutdown(); + throw new RuntimeException(e.getMessage(), e); + } catch (Exception e) { + LOG.warn("Failed to initialize S3-Sink, retrying. Error - {} \n {}", e.getMessage(), e.getCause()); + } + } + + private void doInitializeInternal() { + eventQueue = new ArrayBlockingQueue<>(EVENT_QUEUE_SIZE); + S3SinkService s3SinkService = new S3SinkService(s3SinkConfig); + S3SinkWorker worker = new S3SinkWorker(s3SinkService, s3SinkConfig); + new Thread(worker).start(); + initialized = Boolean.TRUE; + } + + @Override + public void doOutput(final Collection> records) { + LOG.debug("Records size : {}", records.size()); + if (records.isEmpty()) { + return; + } + + for (final Record recordData : records) { + + Event event = recordData.getData(); + getEventQueue().add(event); + + } + } + + @Override + public void shutdown() { + super.shutdown(); + isStopRequested = Boolean.TRUE; + LOG.info("s3-sink sutdonwn completed"); + } + + public static BlockingQueue getEventQueue() { + return eventQueue; + } + + public static boolean isStopRequested() { + return isStopRequested; + } +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkConfig.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkConfig.java new file mode 100644 index 0000000000..718ce504b2 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkConfig.java @@ -0,0 +1,96 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink; +import org.opensearch.dataprepper.model.configuration.PluginModel; +import org.opensearch.dataprepper.plugins.sink.configuration.AwsAuthenticationOptions; +import org.opensearch.dataprepper.plugins.sink.configuration.ObjectOptions; +import org.opensearch.dataprepper.plugins.sink.configuration.ThresholdOptions; + +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.Valid; +import jakarta.validation.constraints.NotNull; + +/* + An implementation class of s3 sink configuration + */ +public class S3SinkConfig { + + @JsonProperty("aws") + @NotNull + @Valid + private AwsAuthenticationOptions awsAuthenticationOptions; + + @JsonProperty("threshold") + @NotNull + private ThresholdOptions thresholdOptions; + + @JsonProperty("object") + @NotNull + private ObjectOptions objectOptions; + + @JsonProperty("codec") + @NotNull + private PluginModel codec; + + @JsonProperty("temporary_storage") + @NotNull + private String temporaryStorage; + + @JsonProperty("bucket") + @NotNull + private String bucketName; + + @JsonProperty("key_path_prefix") + @NotNull + private String keyPathPrefix; + + /* + Aws Authentication configuration Options + */ + public AwsAuthenticationOptions getAwsAuthenticationOptions() { + return awsAuthenticationOptions; + } + + /* + Threshold configuration Options + */ + public ThresholdOptions getThresholdOptions() { + return thresholdOptions; + } + + /* + s3 index configuration Options + */ + public ObjectOptions getObjectOptions() { + return objectOptions; + } + + /* + sink codec configuration Options + */ + public PluginModel getCodec() { return codec; } + + /* + s3 index path configuration Option + */ + public String getKeyPathPrefix() { + return keyPathPrefix; + } + + /* + s3 bucket name configuration Option + */ + public String getBucketName() { + return bucketName; + } + + /* + Temporary storage location configuration Options + */ + public String getTemporaryStorage() { + return temporaryStorage; + } +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkService.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkService.java new file mode 100644 index 0000000000..711a03d621 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkService.java @@ -0,0 +1,42 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; +import software.amazon.awssdk.core.retry.RetryPolicy; +import software.amazon.awssdk.services.s3.S3Client; + +public class S3SinkService { + + private static final Logger LOG = LoggerFactory.getLogger(S3SinkService.class); + + private final S3SinkConfig s3SinkConfig; + private final S3Client s3Client; + + S3SinkService(final S3SinkConfig s3SinkConfig){ + this.s3SinkConfig = s3SinkConfig; + this.s3Client = createS3Client(); + } + + + S3Client createS3Client() { + LOG.info("Creating S3 client"); + return S3Client.builder() + .region(s3SinkConfig.getAwsAuthenticationOptions().getAwsRegion()) + .credentialsProvider(s3SinkConfig.getAwsAuthenticationOptions().authenticateAwsConfiguration()) + .overrideConfiguration(ClientOverrideConfiguration.builder() + .retryPolicy(RetryPolicy.builder().numRetries(5).build()) + .build()) + .build(); + } + + public S3Client getS3Client() { + return s3Client; + } + +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkWorker.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkWorker.java new file mode 100644 index 0000000000..7840e4224d --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkWorker.java @@ -0,0 +1,174 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.NavigableSet; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.lang3.time.StopWatch; +import org.mapdb.DB; +import org.mapdb.DBMaker; +import org.mapdb.Serializer; +import org.opensearch.dataprepper.model.event.Event; +//import org.opensearch.dataprepper.plugins.sink.stream.InMemoryAccumulator; +//import org.opensearch.dataprepper.plugins.sink.stream.LocalFileAccumulator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * In-order to process bulk records, records splits into numStreams & + * eventsPerChunk. numStreams & eventsPerChunk depends on numEvents provided by + * user in pipelines.yaml eventsPerChunk will be always 20, only numStreams will + * be vary based on numEvents. + * + * numEvents(event_count) must be always divided by 100 completely without any + * remnant. + * + * Ex. 1) if numEvents = 100 then numStreams = 2 and eventsPerChunk = 50 + * 2) if numEvents = 1000 then numStreams = 20 and eventsPerChunk = 50 + */ +public class S3SinkWorker implements Runnable { + + private static final Logger LOG = LoggerFactory.getLogger(S3SinkWorker.class); + private static final float LOAD_FACTOR = 0.02f; + private static final String IN_MEMORY = "in_memory"; + private static final String LOCAL_FILE = "local_file"; + private final int numEvents; + private int numStreams; + private final int eventsPerChunk; + private final S3SinkService s3SinkService; + private final S3SinkConfig s3SinkConfig; + + /** + * + * @param s3SinkService + * @param s3SinkConfig + */ + public S3SinkWorker(S3SinkService s3SinkService, S3SinkConfig s3SinkConfig) { + this.numEvents = s3SinkConfig.getThresholdOptions().getEeventCount(); + this.numStreams = (int) (numEvents * LOAD_FACTOR); + this.eventsPerChunk = numEvents / numStreams; + this.s3SinkService = s3SinkService; + this.s3SinkConfig = s3SinkConfig; + } + + @Override + public void run() { + try { + while (!S3Sink.isStopRequested()) { + if (s3SinkConfig.getTemporaryStorage().equalsIgnoreCase(IN_MEMORY)) { + inMemmoryAccumulator(); + } else { + localFileAccumulator(); + } + } + } catch (Exception e) { + e.printStackTrace(); + LOG.error("Exception in S3SinkWorker : \n Error message {} \n Exception cause {}", e.getMessage(), + e.getCause(), e); + } + } + + /** + * Accumulates data from buffer and store into in memory + */ + public void inMemmoryAccumulator() { + HashSet inMemoryEventSet = null; + HashMap> inMemoryEventMap = null; + try { + StopWatch watch = new StopWatch(); + watch.start(); + int streamCount = 0; + int byteCount = 0; + int eventCount = 0; + long eventCollectionDuration = 0; + inMemoryEventMap = new HashMap<>(numStreams); + for (int stream = 0; stream < numStreams; stream++) { + inMemoryEventSet = new HashSet<>(eventsPerChunk); + boolean flag = Boolean.FALSE; + for (int data = 0; data < eventsPerChunk + && thresholdsCheck(eventCount, watch, byteCount); data++, eventCount++) { + Event event = S3Sink.getEventQueue().take(); + inMemoryEventSet.add(event); + byteCount += event.toJsonString().getBytes().length; + flag = Boolean.TRUE; + eventCollectionDuration = watch.getTime(TimeUnit.SECONDS); + } + if (flag) { + inMemoryEventMap.put(stream, inMemoryEventSet); + streamCount++; + } else { + // Once threshold reached then No more streaming required per snapshot, hence + // terminate the streaming(outer) loop + break; + } + } + + LOG.info( + "In-Memory snapshot info : Byte_count = {} Bytes \t Event_count = {} Records \t Event_collection_duration = {} sec & \t Number of stream {}", + byteCount, eventCount, eventCollectionDuration, streamCount); + + //new InMemoryAccumulator(inMemoryEventMap, streamCount, s3SinkService, s3SinkConfig).doAccumulate(); + } catch (Exception e) { + LOG.error("Exception while storing recoreds into In-Memory", e); + } + } + + /** + * Accumulates data from buffer and store in local file + */ + public void localFileAccumulator() { + DB db = null; + NavigableSet localFileEventSet = null; + int byteCount = 0; + int eventCount = 0; + long eventCollectionDuration = 0; + try { + StopWatch watch = new StopWatch(); + watch.start(); + db = DBMaker.memoryDB().make(); + localFileEventSet = db.treeSet("mySet").serializer(Serializer.STRING).createOrOpen(); + for (int data = 0; thresholdsCheck(data, watch, byteCount); data++) { + String event = S3Sink.getEventQueue().take().toJsonString(); + byteCount += event.getBytes().length; + localFileEventSet.add(event); + eventCount++; + eventCollectionDuration = watch.getTime(TimeUnit.SECONDS); + } + db.commit(); + LOG.info( + "Local-File snapshot info : Byte_count = {} Bytes, \t Event_count = {} Records \n & Event_collection_duration = {} Sec", + byteCount, eventCount, eventCollectionDuration); + + //new LocalFileAccumulator(localFileEventSet, s3SinkService, s3SinkConfig).doAccumulate(); + + } catch (Exception e) { + LOG.error("Exception while storing recoreds into Local-file", e); + } finally { + if (db !=null && !db.isClosed()) { + db.close(); + } + } + } + + /** + * Bunch of events based on thresholds set in the configuration. The Thresholds + * such as events count, bytes capacity and data collection duration. + * + * @param i + * @param watch + * @param byteCount + * @return + */ + private boolean thresholdsCheck(int eventCount, StopWatch watch, int byteCount) { + boolean flag = Boolean.FALSE; + flag = eventCount < numEvents + && watch.getTime(TimeUnit.SECONDS) < s3SinkConfig.getThresholdOptions().getEventCollectionDuration() + && byteCount < s3SinkConfig.getThresholdOptions().getByteCapacity(); + return flag; + } +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/AwsAuthenticationOptions.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/AwsAuthenticationOptions.java new file mode 100644 index 0000000000..73d83d668d --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/AwsAuthenticationOptions.java @@ -0,0 +1,82 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.configuration; + +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.constraints.Size; +import software.amazon.awssdk.arns.Arn; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.sts.StsClient; +import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider; +import software.amazon.awssdk.services.sts.model.AssumeRoleRequest; + +import java.util.Map; +import java.util.UUID; + +/* + An implementation class AWS Authentication configuration + */ +public class AwsAuthenticationOptions { + @JsonProperty("region") + @Size(min = 1, message = "Region cannot be empty string") + private String awsRegion; + + @JsonProperty("sts_role_arn") + @Size(min = 20, max = 2048, message = "awsStsRoleArn length should be between 1 and 2048 characters") + private String awsStsRoleArn; + + @JsonProperty("sts_header_overrides") + @Size(max = 5, message = "sts_header_overrides supports a maximum of 5 headers to override") + private Map awsStsHeaderOverrides; + + /* + AWS Region configuration + */ + public Region getAwsRegion() { + return awsRegion != null ? Region.of(awsRegion) : null; + } + + /* + Aws Credentials Provider configuration + */ + public AwsCredentialsProvider authenticateAwsConfiguration() { + + final AwsCredentialsProvider awsCredentialsProvider; + if (awsStsRoleArn != null && !awsStsRoleArn.isEmpty()) { + try { + Arn.fromString(awsStsRoleArn); + } catch (final Exception e) { + throw new IllegalArgumentException("Invalid ARN format for awsStsRoleArn"); + } + + final StsClient stsClient = StsClient.builder() + .region(getAwsRegion()) + .build(); + + AssumeRoleRequest.Builder assumeRoleRequestBuilder = AssumeRoleRequest.builder() + .roleSessionName("S3-Sink-" + UUID.randomUUID()) + .roleArn(awsStsRoleArn); + + if(awsStsHeaderOverrides != null && !awsStsHeaderOverrides.isEmpty()) { + assumeRoleRequestBuilder = assumeRoleRequestBuilder + .overrideConfiguration(configuration -> awsStsHeaderOverrides.forEach(configuration::putHeader)); + } + + awsCredentialsProvider = StsAssumeRoleCredentialsProvider.builder() + .stsClient(stsClient) + .refreshRequest(assumeRoleRequestBuilder.build()) + .build(); + + } else { + // use default credential provider + awsCredentialsProvider = DefaultCredentialsProvider.create(); + } + + return awsCredentialsProvider; + } +} \ No newline at end of file diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ObjectOptions.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ObjectOptions.java new file mode 100644 index 0000000000..8501f70daa --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ObjectOptions.java @@ -0,0 +1,27 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.configuration; + +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.constraints.NotNull; + +/* + An implementation class of Threshold configuration Options + */ +public class ObjectOptions { + private static final String DEFAULT_KEY_PATTERN = "logs-${YYYY-MM-DD hh:mm:ss}"; + + @JsonProperty("file_pattern") + @NotNull + private String filePattern = DEFAULT_KEY_PATTERN; + + /* + Read s3 object index file patten configuration + */ + public String getFilePattern() { + return filePattern; + } +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ThresholdOptions.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ThresholdOptions.java new file mode 100644 index 0000000000..b50d1219f5 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ThresholdOptions.java @@ -0,0 +1,50 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.configuration; + +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.constraints.NotNull; + +/* + An implementation class of s3 index configuration Options + */ +public class ThresholdOptions { + static final int DEFAULT_EVENT_COUNT = 1000; + private static final long DEFAULT_BYTE_CAPACITY = 5000000; + private static final long DEFAULT_TIMEOUT = 60; + + @JsonProperty("event_count") + @NotNull + private int eventCount = DEFAULT_EVENT_COUNT; + + @JsonProperty("byte_capacity") + @NotNull + private long byteCapacity = DEFAULT_BYTE_CAPACITY; + + @JsonProperty("event_collection_duration") + private long eventCollectionDuration = DEFAULT_TIMEOUT; + + /* + Read event collection duration configuration + */ + public long getEventCollectionDuration() { + return eventCollectionDuration; + } + + /* + Read byte capacity configuration + */ + public long getByteCapacity() { + return byteCapacity; + } + + /* + Read the event count configuration + */ + public int getEeventCount() { + return eventCount; + } +} diff --git a/data-prepper-plugins/s3-sink/src/main/resources/pipelines.yaml b/data-prepper-plugins/s3-sink/src/main/resources/pipelines.yaml new file mode 100644 index 0000000000..6e51f4a11d --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/resources/pipelines.yaml @@ -0,0 +1,21 @@ +simple-sample-pipeline: + workers: 4 + delay: "5000" + source: + random: + sink: + - s3: + aws: + region: us-east-1 + sts_role_arn: arn:aws:iam::701869769844 :role/s3-full-access + bucket: dataprepper + key_path_prefix: logdata + object: + file_pattern: logs-${yyyy-MM-dd} + threshold: + event_count: 200 + byte_capacity: 2500 + event_collection_duration: 20 + codec: + json: + temporary_storage: local_file \ No newline at end of file diff --git a/settings.gradle b/settings.gradle index 751567838a..a3c749d922 100644 --- a/settings.gradle +++ b/settings.gradle @@ -91,4 +91,5 @@ include 'release:docker' include 'release:maven' include 'e2e-test:peerforwarder' include 'rss-source' +include 'data-prepper-plugins:s3-sink' From 289b625ee6426a6a9427fbcfd303fc3be44295b9 Mon Sep 17 00:00:00 2001 From: DE20436406 Date: Tue, 28 Feb 2023 20:24:34 +0530 Subject: [PATCH 2/5] Created "s3-sink" plugin. Configurations for the bucket name, key path and key pattern. The key pattern support timestamps such as logs-${YYYY.mm}-${uniqueId}. Collection of objects from Buffer and store it in RAM/Local file. Signed-off-by: Deepak Sahu --- .../opensearch/dataprepper/plugins/sink/S3SinkWorker.java | 7 ------- 1 file changed, 7 deletions(-) diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkWorker.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkWorker.java index 7840e4224d..27d994505b 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkWorker.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkWorker.java @@ -8,14 +8,11 @@ import java.util.HashSet; import java.util.NavigableSet; import java.util.concurrent.TimeUnit; - import org.apache.commons.lang3.time.StopWatch; import org.mapdb.DB; import org.mapdb.DBMaker; import org.mapdb.Serializer; import org.opensearch.dataprepper.model.event.Event; -//import org.opensearch.dataprepper.plugins.sink.stream.InMemoryAccumulator; -//import org.opensearch.dataprepper.plugins.sink.stream.LocalFileAccumulator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -112,7 +109,6 @@ public void inMemmoryAccumulator() { "In-Memory snapshot info : Byte_count = {} Bytes \t Event_count = {} Records \t Event_collection_duration = {} sec & \t Number of stream {}", byteCount, eventCount, eventCollectionDuration, streamCount); - //new InMemoryAccumulator(inMemoryEventMap, streamCount, s3SinkService, s3SinkConfig).doAccumulate(); } catch (Exception e) { LOG.error("Exception while storing recoreds into In-Memory", e); } @@ -143,9 +139,6 @@ public void localFileAccumulator() { LOG.info( "Local-File snapshot info : Byte_count = {} Bytes, \t Event_count = {} Records \n & Event_collection_duration = {} Sec", byteCount, eventCount, eventCollectionDuration); - - //new LocalFileAccumulator(localFileEventSet, s3SinkService, s3SinkConfig).doAccumulate(); - } catch (Exception e) { LOG.error("Exception while storing recoreds into Local-file", e); } finally { From 2e7d91f9fa53baeb5b3140df04ea5519808bc270 Mon Sep 17 00:00:00 2001 From: DE20436406 Date: Tue, 28 Feb 2023 20:56:30 +0530 Subject: [PATCH 3/5] Json sink codec Signed-off-by: Deepak Sahu --- .../dataprepper/plugins/sink/codec/Codec.java | 32 +++++++++ .../plugins/sink/codec/JsonCodec.java | 66 +++++++++++++++++++ 2 files changed, 98 insertions(+) create mode 100644 data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/codec/Codec.java create mode 100644 data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/codec/JsonCodec.java diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/codec/Codec.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/codec/Codec.java new file mode 100644 index 0000000000..488adffb1d --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/codec/Codec.java @@ -0,0 +1,32 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.codec; + +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Collection; + +/** + * A codec parsing data through an output stream. Each implementation of this class should + * support parsing a specific type or format of data. See sub-classes for examples. + */ +public interface Codec { + /** + * Parses an {@link OutputStream}. Implementors should call the {@link Collection} for each + * {@link Record} loaded from the {@link OutputStream}. + * + * @param outputStream The output stream for the json data + * @param eventCollection The collection which holds record events + */ + void parse(OutputStream outputStream, Collection> eventCollection) throws IOException; + + void parse(OutputStream outputStream, Record eventCollection) throws IOException; + + void parse(OutputStream outputStream, Event event) throws IOException; +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/codec/JsonCodec.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/codec/JsonCodec.java new file mode 100644 index 0000000000..cb13088b39 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/codec/JsonCodec.java @@ -0,0 +1,66 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.codec; + +import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.event.Event; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.opensearch.dataprepper.model.record.Record; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Collection; +import java.util.Objects; + +/** + * An implementation of {@link Codec} which serializes to JSON. + */ +@DataPrepperPlugin(name = "json", pluginType = Codec.class) +public class JsonCodec implements Codec { + private final ObjectMapper objectMapper = new ObjectMapper(); + + /* + * Generates a serialized json string of the Events + */ + + @Override + public void parse(final OutputStream outputStream, final Collection> eventCollection) throws IOException { + Objects.requireNonNull(outputStream); + Objects.requireNonNull(eventCollection); + + StringBuilder recordEventData = new StringBuilder(); + for (final Record recordEvent : eventCollection) { + recordEventData.append(recordEvent.getData().toJsonString()); + + } + objectMapper.writeValue(outputStream, recordEventData.toString()); + } + + /* + * Generates a serialized json string of the Events + */ + @Override + public void parse(OutputStream outputStream, Record eventCollection) throws IOException + { + Objects.requireNonNull(outputStream); + Objects.requireNonNull(eventCollection); + + objectMapper.writeValue(outputStream, eventCollection.getData().toJsonString()); + + } + /* + * Generates a serialized json string of the Event + */ + @Override + public void parse(OutputStream outputStream, Event event) throws IOException + { + Objects.requireNonNull(outputStream); + Objects.requireNonNull(event); + + objectMapper.writeValue(outputStream, event.toJsonString()); + + } +} \ No newline at end of file From 7ff57432d3e193720586fc3b09a9e0b4349ac3c0 Mon Sep 17 00:00:00 2001 From: deepaksahu562 Date: Thu, 2 Mar 2023 16:03:12 +0530 Subject: [PATCH 4/5] Re-factor the s3-sink code & Apply json codec. Signed-off-by: deepaksahu562 --- .../dataprepper/plugins/sink/S3Sink.java | 49 ++++++++++--- .../plugins/sink/S3SinkConfig.java | 11 ++- .../plugins/sink/S3SinkService.java | 4 +- .../plugins/sink/S3SinkWorker.java | 72 +++++++++---------- .../sink/accumulator/SinkAccumulator.java | 10 +++ .../dataprepper/plugins/sink/codec/Codec.java | 7 +- .../plugins/sink/codec/JsonCodec.java | 13 ++-- .../AwsAuthenticationOptions.java | 54 +++++++------- .../sink/configuration/ObjectOptions.java | 5 +- .../sink/configuration/ThresholdOptions.java | 9 +-- 10 files changed, 136 insertions(+), 98 deletions(-) create mode 100644 data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/SinkAccumulator.java diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3Sink.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3Sink.java index 9708283648..70c6978a25 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3Sink.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3Sink.java @@ -18,12 +18,11 @@ import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.sink.AbstractSink; import org.opensearch.dataprepper.model.sink.Sink; -//import org.opensearch.dataprepper.plugins.sink.codec.Codec; +import org.opensearch.dataprepper.plugins.sink.accumulator.SinkAccumulator; +import org.opensearch.dataprepper.plugins.sink.codec.Codec; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.fasterxml.jackson.databind.ObjectMapper; - /** * Implementation class of s3-sink plugin * @@ -33,14 +32,19 @@ public class S3Sink extends AbstractSink> { private static final Logger LOG = LoggerFactory.getLogger(S3Sink.class); private static final int EVENT_QUEUE_SIZE = 100000; - + private static final String IN_MEMORY = "in_memory"; + private static final String LOCAL_FILE = "local_file"; + private final S3SinkConfig s3SinkConfig; + private S3SinkWorker worker; + private SinkAccumulator accumulator; + private final Codec codec; private volatile boolean initialized; private static BlockingQueue eventQueue; private static boolean isStopRequested; + private Thread workerThread; + - //private final Codec codec; - private final ObjectMapper objectMapper = new ObjectMapper(); /** * @@ -54,7 +58,7 @@ public S3Sink(PluginSetting pluginSetting, final S3SinkConfig s3SinkConfig, fina this.s3SinkConfig = s3SinkConfig; final PluginModel codecConfiguration = s3SinkConfig.getCodec(); final PluginSetting codecPluginSettings = new PluginSetting(codecConfiguration.getPluginName(), codecConfiguration.getPluginSettings()); - //codec = pluginFactory.loadPlugin(Codec.class, codecPluginSettings); + codec = pluginFactory.loadPlugin(Codec.class, codecPluginSettings); initialized = Boolean.FALSE; } @@ -79,8 +83,10 @@ public void doInitialize() { private void doInitializeInternal() { eventQueue = new ArrayBlockingQueue<>(EVENT_QUEUE_SIZE); S3SinkService s3SinkService = new S3SinkService(s3SinkConfig); - S3SinkWorker worker = new S3SinkWorker(s3SinkService, s3SinkConfig); - new Thread(worker).start(); + worker = new S3SinkWorker(s3SinkService, s3SinkConfig, codec); + S3SinkWorkerRunner runner = new S3SinkWorkerRunner(); + workerThread = new Thread(runner); + workerThread.start(); initialized = Boolean.TRUE; } @@ -92,10 +98,8 @@ public void doOutput(final Collection> records) { } for (final Record recordData : records) { - Event event = recordData.getData(); getEventQueue().add(event); - } } @@ -103,6 +107,9 @@ public void doOutput(final Collection> records) { public void shutdown() { super.shutdown(); isStopRequested = Boolean.TRUE; + if (workerThread.isAlive()) { + workerThread.stop(); + } LOG.info("s3-sink sutdonwn completed"); } @@ -113,4 +120,24 @@ public static BlockingQueue getEventQueue() { public static boolean isStopRequested() { return isStopRequested; } + + private class S3SinkWorkerRunner implements Runnable { + @Override + public void run() { + try { + while (!S3Sink.isStopRequested()) { + if (s3SinkConfig.getTemporaryStorage().equalsIgnoreCase(IN_MEMORY)) { + accumulator = worker.inMemmoryAccumulator(); + } else { + accumulator = worker.localFileAccumulator(); + } + accumulator.doAccumulate(); + } + } catch (Exception e) { + e.printStackTrace(); + LOG.error("Exception in S3Sink : \n Error message {} \n Exception cause {}", e.getMessage(), + e.getCause(), e); + } + } + } } diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkConfig.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkConfig.java index 718ce504b2..923964f349 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkConfig.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkConfig.java @@ -18,6 +18,11 @@ */ public class S3SinkConfig { + static final String DEFAULT_BUCKET_NAME = "dataprepper"; + static final String DEFAULT_PATH_PREFIX = "logdata"; + + static final String DEFAULT_TEMP_STORAGE = "local_file"; + @JsonProperty("aws") @NotNull @Valid @@ -37,15 +42,15 @@ public class S3SinkConfig { @JsonProperty("temporary_storage") @NotNull - private String temporaryStorage; + private String temporaryStorage = DEFAULT_TEMP_STORAGE; @JsonProperty("bucket") @NotNull - private String bucketName; + private String bucketName = DEFAULT_BUCKET_NAME; @JsonProperty("key_path_prefix") @NotNull - private String keyPathPrefix; + private String keyPathPrefix = DEFAULT_PATH_PREFIX; /* Aws Authentication configuration Options diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkService.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkService.java index 711a03d621..d897e2be46 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkService.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkService.java @@ -10,7 +10,9 @@ import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; import software.amazon.awssdk.core.retry.RetryPolicy; import software.amazon.awssdk.services.s3.S3Client; - +/** + * Create s3 client + */ public class S3SinkService { private static final Logger LOG = LoggerFactory.getLogger(S3SinkService.class); diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkWorker.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkWorker.java index 27d994505b..67bcae524c 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkWorker.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkWorker.java @@ -4,15 +4,20 @@ */ package org.opensearch.dataprepper.plugins.sink; +import java.io.ByteArrayOutputStream; +import java.io.OutputStream; import java.util.HashMap; import java.util.HashSet; import java.util.NavigableSet; import java.util.concurrent.TimeUnit; + import org.apache.commons.lang3.time.StopWatch; import org.mapdb.DB; import org.mapdb.DBMaker; import org.mapdb.Serializer; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.plugins.sink.accumulator.SinkAccumulator; +import org.opensearch.dataprepper.plugins.sink.codec.Codec; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,58 +33,42 @@ * Ex. 1) if numEvents = 100 then numStreams = 2 and eventsPerChunk = 50 * 2) if numEvents = 1000 then numStreams = 20 and eventsPerChunk = 50 */ -public class S3SinkWorker implements Runnable { +public class S3SinkWorker { private static final Logger LOG = LoggerFactory.getLogger(S3SinkWorker.class); private static final float LOAD_FACTOR = 0.02f; - private static final String IN_MEMORY = "in_memory"; - private static final String LOCAL_FILE = "local_file"; + private final S3SinkService s3SinkService; + private final S3SinkConfig s3SinkConfig; + private final Codec codec; + private SinkAccumulator accumulator; private final int numEvents; private int numStreams; private final int eventsPerChunk; - private final S3SinkService s3SinkService; - private final S3SinkConfig s3SinkConfig; - + /** * * @param s3SinkService * @param s3SinkConfig */ - public S3SinkWorker(S3SinkService s3SinkService, S3SinkConfig s3SinkConfig) { + public S3SinkWorker(final S3SinkService s3SinkService, final S3SinkConfig s3SinkConfig, final Codec codec) { this.numEvents = s3SinkConfig.getThresholdOptions().getEeventCount(); - this.numStreams = (int) (numEvents * LOAD_FACTOR); - this.eventsPerChunk = numEvents / numStreams; this.s3SinkService = s3SinkService; this.s3SinkConfig = s3SinkConfig; - } - - @Override - public void run() { - try { - while (!S3Sink.isStopRequested()) { - if (s3SinkConfig.getTemporaryStorage().equalsIgnoreCase(IN_MEMORY)) { - inMemmoryAccumulator(); - } else { - localFileAccumulator(); - } - } - } catch (Exception e) { - e.printStackTrace(); - LOG.error("Exception in S3SinkWorker : \n Error message {} \n Exception cause {}", e.getMessage(), - e.getCause(), e); - } + this.codec = codec; + numStreams = (int) (numEvents * LOAD_FACTOR); + eventsPerChunk = numEvents / numStreams; } /** * Accumulates data from buffer and store into in memory */ - public void inMemmoryAccumulator() { - HashSet inMemoryEventSet = null; - HashMap> inMemoryEventMap = null; + public SinkAccumulator inMemmoryAccumulator() { + HashSet inMemoryEventSet = null; + HashMap> inMemoryEventMap = null; + int streamCount = 0; try { StopWatch watch = new StopWatch(); watch.start(); - int streamCount = 0; int byteCount = 0; int eventCount = 0; long eventCollectionDuration = 0; @@ -90,7 +79,9 @@ public void inMemmoryAccumulator() { for (int data = 0; data < eventsPerChunk && thresholdsCheck(eventCount, watch, byteCount); data++, eventCount++) { Event event = S3Sink.getEventQueue().take(); - inMemoryEventSet.add(event); + OutputStream outPutStream = new ByteArrayOutputStream(); + codec.parse(outPutStream, event); + inMemoryEventSet.add(outPutStream.toString()); byteCount += event.toJsonString().getBytes().length; flag = Boolean.TRUE; eventCollectionDuration = watch.getTime(TimeUnit.SECONDS); @@ -109,15 +100,18 @@ public void inMemmoryAccumulator() { "In-Memory snapshot info : Byte_count = {} Bytes \t Event_count = {} Records \t Event_collection_duration = {} sec & \t Number of stream {}", byteCount, eventCount, eventCollectionDuration, streamCount); + //accumulator = new InMemoryAccumulator(inMemoryEventMap, streamCount, s3SinkService, s3SinkConfig); } catch (Exception e) { LOG.error("Exception while storing recoreds into In-Memory", e); } + + return accumulator; } /** * Accumulates data from buffer and store in local file */ - public void localFileAccumulator() { + public SinkAccumulator localFileAccumulator() { DB db = null; NavigableSet localFileEventSet = null; int byteCount = 0; @@ -129,9 +123,11 @@ public void localFileAccumulator() { db = DBMaker.memoryDB().make(); localFileEventSet = db.treeSet("mySet").serializer(Serializer.STRING).createOrOpen(); for (int data = 0; thresholdsCheck(data, watch, byteCount); data++) { - String event = S3Sink.getEventQueue().take().toJsonString(); - byteCount += event.getBytes().length; - localFileEventSet.add(event); + Event event = S3Sink.getEventQueue().take(); + OutputStream outPutStream = new ByteArrayOutputStream(); + codec.parse(outPutStream, event); + byteCount += event.toJsonString().getBytes().length; + localFileEventSet.add(outPutStream.toString()); eventCount++; eventCollectionDuration = watch.getTime(TimeUnit.SECONDS); } @@ -139,13 +135,11 @@ public void localFileAccumulator() { LOG.info( "Local-File snapshot info : Byte_count = {} Bytes, \t Event_count = {} Records \n & Event_collection_duration = {} Sec", byteCount, eventCount, eventCollectionDuration); + //accumulator = new LocalFileAccumulator(localFileEventSet, s3SinkService, s3SinkConfig, db); } catch (Exception e) { LOG.error("Exception while storing recoreds into Local-file", e); - } finally { - if (db !=null && !db.isClosed()) { - db.close(); - } } + return accumulator; } /** diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/SinkAccumulator.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/SinkAccumulator.java new file mode 100644 index 0000000000..fff05ff275 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/SinkAccumulator.java @@ -0,0 +1,10 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink.accumulator; + +public interface SinkAccumulator { + + void doAccumulate(); +} diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/codec/Codec.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/codec/Codec.java index 488adffb1d..1f08f2c491 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/codec/Codec.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/codec/Codec.java @@ -5,13 +5,13 @@ package org.opensearch.dataprepper.plugins.sink.codec; -import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.record.Record; - import java.io.IOException; import java.io.OutputStream; import java.util.Collection; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; + /** * A codec parsing data through an output stream. Each implementation of this class should * support parsing a specific type or format of data. See sub-classes for examples. @@ -29,4 +29,5 @@ public interface Codec { void parse(OutputStream outputStream, Record eventCollection) throws IOException; void parse(OutputStream outputStream, Event event) throws IOException; + } diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/codec/JsonCodec.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/codec/JsonCodec.java index cb13088b39..6f3762cdf1 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/codec/JsonCodec.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/codec/JsonCodec.java @@ -5,16 +5,17 @@ package org.opensearch.dataprepper.plugins.sink.codec; -import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; -import org.opensearch.dataprepper.model.event.Event; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.opensearch.dataprepper.model.record.Record; - import java.io.IOException; import java.io.OutputStream; import java.util.Collection; import java.util.Objects; +import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; + +import com.fasterxml.jackson.databind.ObjectMapper; + /** * An implementation of {@link Codec} which serializes to JSON. */ @@ -61,6 +62,6 @@ public void parse(OutputStream outputStream, Event event) throws IOException Objects.requireNonNull(event); objectMapper.writeValue(outputStream, event.toJsonString()); - + } } \ No newline at end of file diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/AwsAuthenticationOptions.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/AwsAuthenticationOptions.java index 73d83d668d..3ac3e19cbe 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/AwsAuthenticationOptions.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/AwsAuthenticationOptions.java @@ -5,7 +5,11 @@ package org.opensearch.dataprepper.plugins.sink.configuration; +import java.util.Map; +import java.util.UUID; + import com.fasterxml.jackson.annotation.JsonProperty; + import jakarta.validation.constraints.Size; import software.amazon.awssdk.arns.Arn; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; @@ -15,10 +19,7 @@ import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider; import software.amazon.awssdk.services.sts.model.AssumeRoleRequest; -import java.util.Map; -import java.util.UUID; - -/* +/** An implementation class AWS Authentication configuration */ public class AwsAuthenticationOptions { @@ -47,32 +48,27 @@ public Region getAwsRegion() { public AwsCredentialsProvider authenticateAwsConfiguration() { final AwsCredentialsProvider awsCredentialsProvider; - if (awsStsRoleArn != null && !awsStsRoleArn.isEmpty()) { - try { - Arn.fromString(awsStsRoleArn); - } catch (final Exception e) { - throw new IllegalArgumentException("Invalid ARN format for awsStsRoleArn"); - } + if (awsStsRoleArn != null && !awsStsRoleArn.isEmpty()) { + try { + Arn.fromString(awsStsRoleArn); + } catch (final Exception e) { + throw new IllegalArgumentException("Invalid ARN format for awsStsRoleArn"); + } + + final StsClient stsClient = StsClient.builder().region(getAwsRegion()).build(); + + AssumeRoleRequest.Builder assumeRoleRequestBuilder = AssumeRoleRequest.builder() + .roleSessionName("S3-Sink-" + UUID.randomUUID()).roleArn(awsStsRoleArn); + + if (awsStsHeaderOverrides != null && !awsStsHeaderOverrides.isEmpty()) { + assumeRoleRequestBuilder = assumeRoleRequestBuilder.overrideConfiguration( + configuration -> awsStsHeaderOverrides.forEach(configuration::putHeader)); + } + + awsCredentialsProvider = StsAssumeRoleCredentialsProvider.builder().stsClient(stsClient) + .refreshRequest(assumeRoleRequestBuilder.build()).build(); - final StsClient stsClient = StsClient.builder() - .region(getAwsRegion()) - .build(); - - AssumeRoleRequest.Builder assumeRoleRequestBuilder = AssumeRoleRequest.builder() - .roleSessionName("S3-Sink-" + UUID.randomUUID()) - .roleArn(awsStsRoleArn); - - if(awsStsHeaderOverrides != null && !awsStsHeaderOverrides.isEmpty()) { - assumeRoleRequestBuilder = assumeRoleRequestBuilder - .overrideConfiguration(configuration -> awsStsHeaderOverrides.forEach(configuration::putHeader)); - } - - awsCredentialsProvider = StsAssumeRoleCredentialsProvider.builder() - .stsClient(stsClient) - .refreshRequest(assumeRoleRequestBuilder.build()) - .build(); - - } else { + } else { // use default credential provider awsCredentialsProvider = DefaultCredentialsProvider.create(); } diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ObjectOptions.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ObjectOptions.java index 8501f70daa..9f63bb31a8 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ObjectOptions.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ObjectOptions.java @@ -6,13 +6,14 @@ package org.opensearch.dataprepper.plugins.sink.configuration; import com.fasterxml.jackson.annotation.JsonProperty; + import jakarta.validation.constraints.NotNull; -/* +/** An implementation class of Threshold configuration Options */ public class ObjectOptions { - private static final String DEFAULT_KEY_PATTERN = "logs-${YYYY-MM-DD hh:mm:ss}"; + private static final String DEFAULT_KEY_PATTERN = "logs-${yyyy-MM-dd hh:mm:ss}"; @JsonProperty("file_pattern") @NotNull diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ThresholdOptions.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ThresholdOptions.java index b50d1219f5..b67bf21fd5 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ThresholdOptions.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ThresholdOptions.java @@ -6,15 +6,16 @@ package org.opensearch.dataprepper.plugins.sink.configuration; import com.fasterxml.jackson.annotation.JsonProperty; + import jakarta.validation.constraints.NotNull; -/* +/** An implementation class of s3 index configuration Options */ public class ThresholdOptions { - static final int DEFAULT_EVENT_COUNT = 1000; - private static final long DEFAULT_BYTE_CAPACITY = 5000000; - private static final long DEFAULT_TIMEOUT = 60; + static final int DEFAULT_EVENT_COUNT = 200; + private static final long DEFAULT_BYTE_CAPACITY = 2500; + private static final long DEFAULT_TIMEOUT = 20; @JsonProperty("event_count") @NotNull From 1077fadb0cc302022089d1262e12f1a41fda579a Mon Sep 17 00:00:00 2001 From: deepaksahu562 Date: Fri, 3 Mar 2023 20:30:45 +0530 Subject: [PATCH 5/5] Addressed review comments on PR#2324, Git-issue#1048 Signed-off-by: Deepak Sahu --- .../plugins/sink/S3ObjectIndex.java | 24 ++--- .../dataprepper/plugins/sink/S3Sink.java | 48 +++++----- .../plugins/sink/S3SinkConfig.java | 41 ++++----- .../plugins/sink/S3SinkService.java | 6 ++ .../plugins/sink/S3SinkWorker.java | 38 +++++--- .../sink/accumulator/SinkAccumulator.java | 3 + .../plugins/sink/codec/JsonCodec.java | 4 +- .../AwsAuthenticationOptions.java | 10 +-- .../sink/configuration/ObjectOptions.java | 7 +- .../sink/configuration/ThresholdOptions.java | 89 ++++++++++++++----- .../s3-sink/src/main/resources/pipelines.yaml | 6 +- 11 files changed, 173 insertions(+), 103 deletions(-) diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3ObjectIndex.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3ObjectIndex.java index 31a173a621..bcc6f7c06e 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3ObjectIndex.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3ObjectIndex.java @@ -35,8 +35,8 @@ public class S3ObjectIndex { S3ObjectIndex() { } - /* - Create Index with date,time with UniqueID prepended. + /** + * Create Index with date,time with UniqueID prepended. */ public static String getIndexAliasWithDate(final String indexAlias) { DateTimeFormatter dateFormatter = getDatePatternFormatter(indexAlias); @@ -44,8 +44,8 @@ public static String getIndexAliasWithDate(final String indexAlias) { return indexAlias.replaceAll(TIME_PATTERN_REGULAR_EXPRESSION, "") + suffix + UUID.randomUUID(); } - /* - Validate the index with the regular expression pattern. Throws exception if validation fails + /** + * Validate the index with the regular expression pattern. Throws exception if validation fails */ public static DateTimeFormatter getDatePatternFormatter(final String indexAlias) { final Pattern pattern = Pattern.compile(TIME_PATTERN_INTERNAL_EXTRACTOR_REGULAR_EXPRESSION); @@ -66,18 +66,18 @@ public static DateTimeFormatter getDatePatternFormatter(final String indexAlias) return null; } - /* - Data Prepper only allows time pattern as a suffix. - */ + /** + * Data Prepper only allows time pattern as a suffix. + */ private static void validateTimePatternIsAtTheEnd(final String indexAlias, final String timePattern) { if (!indexAlias.endsWith(timePattern + "}")) { throw new IllegalArgumentException("Time pattern can only be a suffix of an index."); } } - /* + /** * Special characters can cause failures in creating indexes. - * */ + */ private static final Set INVALID_CHARS = Set.of('#', '\\', '/', '*', '?', '"', '<', '>', '|', ',', ':'); public static void validateNoSpecialCharsInTimePattern(String timePattern) { boolean containsInvalidCharacter = timePattern.chars() @@ -88,7 +88,7 @@ public static void validateNoSpecialCharsInTimePattern(String timePattern) { } } - /* + /** * Validates the time pattern, support creating indexes with time patterns that are too granular hour, minute and second */ private static final Set UNSUPPORTED_TIME_GRANULARITY_CHARS = Set.of('A', 'n', 'N'); @@ -102,8 +102,8 @@ public static void validateTimePatternGranularity(String timePattern) { } } - /* - Returns the current UTC Date and Time + /** + * Returns the current UTC Date and Time */ public static ZonedDateTime getCurrentUtcTime() { return LocalDateTime.now().atZone(ZoneId.systemDefault()).withZoneSameInstant(UTC_ZONE_ID); diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3Sink.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3Sink.java index 70c6978a25..60c6f9462c 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3Sink.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3Sink.java @@ -25,40 +25,39 @@ /** * Implementation class of s3-sink plugin - * */ @DataPrepperPlugin(name = "s3", pluginType = Sink.class, pluginConfigurationType = S3SinkConfig.class) public class S3Sink extends AbstractSink> { - private static final Logger LOG = LoggerFactory.getLogger(S3Sink.class); - private static final int EVENT_QUEUE_SIZE = 100000; - private static final String IN_MEMORY = "in_memory"; - private static final String LOCAL_FILE = "local_file"; - - private final S3SinkConfig s3SinkConfig; - private S3SinkWorker worker; - private SinkAccumulator accumulator; + private static final Logger LOG = LoggerFactory.getLogger(S3Sink.class); + private static final int EVENT_QUEUE_SIZE = 100000; + private static final String IN_MEMORY = "in_memory"; + private static final String LOCAL_FILE = "local_file"; + + private final S3SinkConfig s3SinkConfig; + private S3SinkWorker worker; + private SinkAccumulator accumulator; private final Codec codec; + private final String storageType; private volatile boolean initialized; private static BlockingQueue eventQueue; - private static boolean isStopRequested; + private static volatile boolean isStopRequested; private Thread workerThread; - - /** - * * @param pluginSetting * @param s3SinkConfig * @param pluginFactory */ @DataPrepperPluginConstructor - public S3Sink(PluginSetting pluginSetting, final S3SinkConfig s3SinkConfig, final PluginFactory pluginFactory) { + public S3Sink(final PluginSetting pluginSetting, final S3SinkConfig s3SinkConfig, final PluginFactory pluginFactory) { super(pluginSetting); this.s3SinkConfig = s3SinkConfig; + storageType = s3SinkConfig.getTemporaryStorage(); final PluginModel codecConfiguration = s3SinkConfig.getCodec(); - final PluginSetting codecPluginSettings = new PluginSetting(codecConfiguration.getPluginName(), codecConfiguration.getPluginSettings()); - codec = pluginFactory.loadPlugin(Codec.class, codecPluginSettings); + final PluginSetting codecPluginSettings = new PluginSetting(codecConfiguration.getPluginName(), + codecConfiguration.getPluginSettings()); + codec = pluginFactory.loadPlugin(Codec.class, codecPluginSettings); initialized = Boolean.FALSE; } @@ -72,11 +71,11 @@ public void doInitialize() { try { doInitializeInternal(); } catch (InvalidPluginConfigurationException e) { - LOG.error("Failed to initialize S3-Sink."); + LOG.error("Failed to initialize S3-Sink.", e); this.shutdown(); throw new RuntimeException(e.getMessage(), e); } catch (Exception e) { - LOG.warn("Failed to initialize S3-Sink, retrying. Error - {} \n {}", e.getMessage(), e.getCause()); + LOG.warn("Failed to initialize S3-Sink, retrying. Error - {} ", e.getMessage(), e); } } @@ -96,7 +95,7 @@ public void doOutput(final Collection> records) { if (records.isEmpty()) { return; } - + for (final Record recordData : records) { Event event = recordData.getData(); getEventQueue().add(event); @@ -120,13 +119,16 @@ public static BlockingQueue getEventQueue() { public static boolean isStopRequested() { return isStopRequested; } - + + /** + * This {@link S3SinkWorkerRunner} keep listing event from {@link doOutput} + */ private class S3SinkWorkerRunner implements Runnable { @Override public void run() { try { while (!S3Sink.isStopRequested()) { - if (s3SinkConfig.getTemporaryStorage().equalsIgnoreCase(IN_MEMORY)) { + if (storageType.equalsIgnoreCase(IN_MEMORY)) { accumulator = worker.inMemmoryAccumulator(); } else { accumulator = worker.localFileAccumulator(); @@ -134,9 +136,7 @@ public void run() { accumulator.doAccumulate(); } } catch (Exception e) { - e.printStackTrace(); - LOG.error("Exception in S3Sink : \n Error message {} \n Exception cause {}", e.getMessage(), - e.getCause(), e); + LOG.error("Exception while runing S3SinkWorkerRunner : ", e); } } } diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkConfig.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkConfig.java index 923964f349..1ee49be362 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkConfig.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkConfig.java @@ -13,15 +13,12 @@ import jakarta.validation.Valid; import jakarta.validation.constraints.NotNull; -/* - An implementation class of s3 sink configuration +/** + * An implementation class of s3 sink configuration */ public class S3SinkConfig { - static final String DEFAULT_BUCKET_NAME = "dataprepper"; - static final String DEFAULT_PATH_PREFIX = "logdata"; - - static final String DEFAULT_TEMP_STORAGE = "local_file"; + static final String DEFAULT_TEMP_STORAGE = "local_file"; @JsonProperty("aws") @NotNull @@ -46,54 +43,54 @@ public class S3SinkConfig { @JsonProperty("bucket") @NotNull - private String bucketName = DEFAULT_BUCKET_NAME; + private String bucketName; @JsonProperty("key_path_prefix") @NotNull - private String keyPathPrefix = DEFAULT_PATH_PREFIX; + private String keyPathPrefix; - /* - Aws Authentication configuration Options + /** + * Aws Authentication configuration Options */ public AwsAuthenticationOptions getAwsAuthenticationOptions() { return awsAuthenticationOptions; } - /* - Threshold configuration Options + /** + * Threshold configuration Options */ public ThresholdOptions getThresholdOptions() { return thresholdOptions; } - /* - s3 index configuration Options + /** + * s3 index configuration Options */ public ObjectOptions getObjectOptions() { return objectOptions; } - /* - sink codec configuration Options + /** + * sink codec configuration Options */ public PluginModel getCodec() { return codec; } - /* - s3 index path configuration Option + /** + * s3 index path configuration Option */ public String getKeyPathPrefix() { return keyPathPrefix; } - /* - s3 bucket name configuration Option + /** + * s3 bucket name configuration Option */ public String getBucketName() { return bucketName; } - /* - Temporary storage location configuration Options + /** + * Temporary storage location configuration Options */ public String getTemporaryStorage() { return temporaryStorage; diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkService.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkService.java index d897e2be46..6b79188994 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkService.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkService.java @@ -20,6 +20,9 @@ public class S3SinkService { private final S3SinkConfig s3SinkConfig; private final S3Client s3Client; + /** + * @param s3SinkConfig + */ S3SinkService(final S3SinkConfig s3SinkConfig){ this.s3SinkConfig = s3SinkConfig; this.s3Client = createS3Client(); @@ -37,6 +40,9 @@ S3Client createS3Client() { .build(); } + /** + * @return s3Client object + */ public S3Client getS3Client() { return s3Client; } diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkWorker.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkWorker.java index 67bcae524c..2fe689c195 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkWorker.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkWorker.java @@ -16,6 +16,9 @@ import org.mapdb.DBMaker; import org.mapdb.Serializer; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.types.ByteCount; +import org.opensearch.dataprepper.plugins.sink.accumulator.InMemoryAccumulator; +import org.opensearch.dataprepper.plugins.sink.accumulator.LocalFileAccumulator; import org.opensearch.dataprepper.plugins.sink.accumulator.SinkAccumulator; import org.opensearch.dataprepper.plugins.sink.codec.Codec; import org.slf4j.Logger; @@ -30,31 +33,41 @@ * numEvents(event_count) must be always divided by 100 completely without any * remnant. * - * Ex. 1) if numEvents = 100 then numStreams = 2 and eventsPerChunk = 50 + * {@code LOAD_FACTOR} required to divide collections of records (numEvents) + * into streams + * + * Ex. 1) if numEvents = 100 then numStreams = 2 and eventsPerChunk = 50 * 2) if numEvents = 1000 then numStreams = 20 and eventsPerChunk = 50 */ public class S3SinkWorker { private static final Logger LOG = LoggerFactory.getLogger(S3SinkWorker.class); + /** + * {@code LOAD_FACTOR} required to divide collections of records into streams + */ private static final float LOAD_FACTOR = 0.02f; private final S3SinkService s3SinkService; private final S3SinkConfig s3SinkConfig; private final Codec codec; private SinkAccumulator accumulator; private final int numEvents; - private int numStreams; + private final ByteCount byteCapacity; + private final long duration; + private final int numStreams; private final int eventsPerChunk; /** - * * @param s3SinkService * @param s3SinkConfig */ public S3SinkWorker(final S3SinkService s3SinkService, final S3SinkConfig s3SinkConfig, final Codec codec) { - this.numEvents = s3SinkConfig.getThresholdOptions().getEeventCount(); this.s3SinkService = s3SinkService; this.s3SinkConfig = s3SinkConfig; this.codec = codec; + numEvents = s3SinkConfig.getThresholdOptions().getEventCount(); + byteCapacity = s3SinkConfig.getThresholdOptions().getByteCapacity(); + duration = s3SinkConfig.getThresholdOptions().getEventCollectionDuration().getSeconds(); + numStreams = (int) (numEvents * LOAD_FACTOR); eventsPerChunk = numEvents / numStreams; } @@ -91,13 +104,16 @@ public SinkAccumulator inMemmoryAccumulator() { streamCount++; } else { // Once threshold reached then No more streaming required per snapshot, hence - // terminate the streaming(outer) loop + // stop the streaming(outer) loop break; } } LOG.info( - "In-Memory snapshot info : Byte_count = {} Bytes \t Event_count = {} Records \t Event_collection_duration = {} sec & \t Number of stream {}", + "In-Memory snapshot info : Byte_count = {} Bytes " + + "\t Event_count = {} Records " + + "\t Event_collection_duration = {} sec & " + + "\t Number of stream {}", byteCount, eventCount, eventCollectionDuration, streamCount); //accumulator = new InMemoryAccumulator(inMemoryEventMap, streamCount, s3SinkService, s3SinkConfig); @@ -133,7 +149,9 @@ public SinkAccumulator localFileAccumulator() { } db.commit(); LOG.info( - "Local-File snapshot info : Byte_count = {} Bytes, \t Event_count = {} Records \n & Event_collection_duration = {} Sec", + "Local-File snapshot info : Byte_count = {} Bytes, " + + "\t Event_count = {} Records " + + "\t & Event_collection_duration = {} Sec", byteCount, eventCount, eventCollectionDuration); //accumulator = new LocalFileAccumulator(localFileEventSet, s3SinkService, s3SinkConfig, db); } catch (Exception e) { @@ -146,7 +164,7 @@ public SinkAccumulator localFileAccumulator() { * Bunch of events based on thresholds set in the configuration. The Thresholds * such as events count, bytes capacity and data collection duration. * - * @param i + * @param eventCount * @param watch * @param byteCount * @return @@ -154,8 +172,8 @@ public SinkAccumulator localFileAccumulator() { private boolean thresholdsCheck(int eventCount, StopWatch watch, int byteCount) { boolean flag = Boolean.FALSE; flag = eventCount < numEvents - && watch.getTime(TimeUnit.SECONDS) < s3SinkConfig.getThresholdOptions().getEventCollectionDuration() - && byteCount < s3SinkConfig.getThresholdOptions().getByteCapacity(); + && watch.getTime(TimeUnit.SECONDS) < duration + && byteCount < byteCapacity.getBytes(); return flag; } } diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/SinkAccumulator.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/SinkAccumulator.java index fff05ff275..5def7e4e06 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/SinkAccumulator.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/SinkAccumulator.java @@ -4,6 +4,9 @@ */ package org.opensearch.dataprepper.plugins.sink.accumulator; +/** + * {@link SinkAccumulator} Accumulate buffer records + */ public interface SinkAccumulator { void doAccumulate(); diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/codec/JsonCodec.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/codec/JsonCodec.java index 6f3762cdf1..486d98279b 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/codec/JsonCodec.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/codec/JsonCodec.java @@ -23,7 +23,7 @@ public class JsonCodec implements Codec { private final ObjectMapper objectMapper = new ObjectMapper(); - /* + /** * Generates a serialized json string of the Events */ @@ -40,7 +40,7 @@ public void parse(final OutputStream outputStream, final Collection awsStsHeaderOverrides; - /* - AWS Region configuration + /** + * AWS Region configuration */ public Region getAwsRegion() { return awsRegion != null ? Region.of(awsRegion) : null; } - /* - Aws Credentials Provider configuration + /** + * Aws Credentials Provider configuration */ public AwsCredentialsProvider authenticateAwsConfiguration() { diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ObjectOptions.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ObjectOptions.java index 9f63bb31a8..e5b99726ef 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ObjectOptions.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ObjectOptions.java @@ -6,11 +6,10 @@ package org.opensearch.dataprepper.plugins.sink.configuration; import com.fasterxml.jackson.annotation.JsonProperty; - import jakarta.validation.constraints.NotNull; /** - An implementation class of Threshold configuration Options + * An implementation class of Threshold configuration Options */ public class ObjectOptions { private static final String DEFAULT_KEY_PATTERN = "logs-${yyyy-MM-dd hh:mm:ss}"; @@ -19,8 +18,8 @@ public class ObjectOptions { @NotNull private String filePattern = DEFAULT_KEY_PATTERN; - /* - Read s3 object index file patten configuration + /** + * Read s3 object index file patten configuration */ public String getFilePattern() { return filePattern; diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ThresholdOptions.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ThresholdOptions.java index b67bf21fd5..331a4a9d73 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ThresholdOptions.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/configuration/ThresholdOptions.java @@ -5,17 +5,25 @@ package org.opensearch.dataprepper.plugins.sink.configuration; +import java.time.Duration; +import java.time.format.DateTimeParseException; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.opensearch.dataprepper.model.types.ByteCount; import com.fasterxml.jackson.annotation.JsonProperty; - import jakarta.validation.constraints.NotNull; /** - An implementation class of s3 index configuration Options + * An implementation class of s3 index configuration Options */ public class ThresholdOptions { - static final int DEFAULT_EVENT_COUNT = 200; - private static final long DEFAULT_BYTE_CAPACITY = 2500; - private static final long DEFAULT_TIMEOUT = 20; + + private static final String SIMPLE_DURATION_REGEX = "^(0|[1-9]\\d*)(s|ms)$"; + private static final Pattern SIMPLE_DURATION_PATTERN = Pattern.compile(SIMPLE_DURATION_REGEX); + + static final int DEFAULT_EVENT_COUNT = 2000; + private static final String DEFAULT_BYTE_CAPACITY = "50mb"; + private static final String DEFAULT_TIMEOUT = "180s"; @JsonProperty("event_count") @NotNull @@ -23,29 +31,68 @@ public class ThresholdOptions { @JsonProperty("byte_capacity") @NotNull - private long byteCapacity = DEFAULT_BYTE_CAPACITY; + private String byteCapacity = DEFAULT_BYTE_CAPACITY; @JsonProperty("event_collection_duration") - private long eventCollectionDuration = DEFAULT_TIMEOUT; + private String eventCollectionDuration = DEFAULT_TIMEOUT; + + /** + * Read event collection duration configuration + */ + public Duration getEventCollectionDuration() { - /* - Read event collection duration configuration - */ - public long getEventCollectionDuration() { - return eventCollectionDuration; + Duration duration; + try { + duration = Duration.parse(eventCollectionDuration); + } catch (final DateTimeParseException e) { + duration = parseSimpleDuration(eventCollectionDuration); + if (duration == null) { + throw new IllegalArgumentException("Durations must use either ISO 8601 notation or simple notations for seconds (60s) or milliseconds (100ms). Whitespace is ignored."); + } + } + return duration; } - /* - Read byte capacity configuration - */ - public long getByteCapacity() { - return byteCapacity; + /** + * Read byte capacity configuration + */ + public ByteCount getByteCapacity() { + return ByteCount.parse(byteCapacity); } - /* - Read the event count configuration - */ - public int getEeventCount() { + /** + * Read the event count configuration + */ + public int getEventCount() { return eventCount; } + + /** + * parse event duration configuration + */ + private Duration parseSimpleDuration(final String durationString) throws IllegalArgumentException { + final String durationStringNoSpaces = durationString.replaceAll("\\s", ""); + final Matcher matcher = SIMPLE_DURATION_PATTERN.matcher(durationStringNoSpaces); + if (!matcher.find()) { + return null; + } + + final long durationNumber = Long.parseLong(matcher.group(1)); + final String durationUnit = matcher.group(2); + + return getDurationFromUnitAndNumber(durationNumber, durationUnit); + } + + /** + * Return Duration in seconds/milliseconds of configuration Event Collection Duration + */ + private Duration getDurationFromUnitAndNumber(final long durationNumber, final String durationUnit) { + switch (durationUnit) { + case "s": + return Duration.ofSeconds(durationNumber); + case "ms": + return Duration.ofMillis(durationNumber); + } + return null; + } } diff --git a/data-prepper-plugins/s3-sink/src/main/resources/pipelines.yaml b/data-prepper-plugins/s3-sink/src/main/resources/pipelines.yaml index 6e51f4a11d..1b2d0409db 100644 --- a/data-prepper-plugins/s3-sink/src/main/resources/pipelines.yaml +++ b/data-prepper-plugins/s3-sink/src/main/resources/pipelines.yaml @@ -13,9 +13,9 @@ simple-sample-pipeline: object: file_pattern: logs-${yyyy-MM-dd} threshold: - event_count: 200 - byte_capacity: 2500 - event_collection_duration: 20 + event_count: 2000 + byte_capacity: 50mb + event_collection_duration: PT2M codec: json: temporary_storage: local_file \ No newline at end of file