Skip to content

Commit

Permalink
git-issue#1048
Browse files Browse the repository at this point in the history
Signed-off-by: Deepak Sahu <deepak.sahu562@gmail.com>
  • Loading branch information
deepaksahu562 committed Apr 17, 2023
1 parent 834af7c commit b8c0b81
Show file tree
Hide file tree
Showing 15 changed files with 165 additions and 181 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.dataprepper.plugins.s3keyindex;

import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
Expand Down Expand Up @@ -41,7 +42,18 @@ public class S3ObjectIndex {
public static String getIndexAliasWithDate(final String indexAlias) {
DateTimeFormatter dateFormatter = getDatePatternFormatter(indexAlias);
String suffix = (dateFormatter != null) ? dateFormatter.format(getCurrentUtcTime()) : "";
return indexAlias.replaceAll(TIME_PATTERN_REGULAR_EXPRESSION, "") + suffix + "-" + UUID.randomUUID();
return indexAlias.replaceAll(TIME_PATTERN_REGULAR_EXPRESSION, "") + suffix + "-" + getTimeNanos() + "-"
+ UUID.randomUUID();
}

/**
* Creates epoch seconds.
*/
public static long getTimeNanos() {
Instant time = Instant.now();
final long NANO_MULTIPLIER = 1_000 * 1_000 * 1_000;
long currentTimeNanos = time.getEpochSecond() * NANO_MULTIPLIER + time.getNano();
return currentTimeNanos;
}

/**
Expand Down
37 changes: 19 additions & 18 deletions data-prepper-plugins/s3-sink/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# S3 Sink

This is the Data Prepper S3 sink plugin that sends records to an S3 bucket via S3Client. You can use the sink to send data to OpenSearch, Amazon OpenSearch Service, or Elasticsearch.
This is the Data Prepper S3 sink plugin that sends records to an S3 bucket via S3Client.

The S3 sink plugin supports OpenSearch 2.0.0 and greater.

Expand All @@ -17,45 +17,46 @@ pipeline:
- s3:
aws:
region: us-east-1
sts_role_arn: arn:aws:iam::701869769844 :role/s3-full-access
sts_role_arn: arn:aws:iam::123456789012:role/Data-Prepper
aws_sts_header_overrides:
s3_client_connect_retries: 5
s3_object_upload_retries: 3
bucket: dataprepper
key_path_prefix: logdata
s3_object_upload_retries: 5
bucket: bucket_name
key_path_prefix:
object:
file_pattern: logs-${yyyy-MM-dd}
file_pattern: events-${yyyy-MM-dd}
threshold:
event_count: 2000
byte_capacity: 50mb
event_collection_duration: PT3M
event_collection_duration: 20s
codec:
json:
temporary_storage: local_file
```

## Configuration

- `aws_region`: A String represents the region of Amazon OpenSearch Service domain, e.g. us-west-2. Only applies to Amazon OpenSearch Service. Defaults to `us-east-1`.
- `aws_region` (Optional) : The AWS region to use for credentials. Defaults to [standard SDK behavior to determine the region](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/region-selection.html).

- `aws_sts_role_arn`: A IAM role arn which the sink plugin will assume to sign request to Amazon OpenSearch Service. If not provided the plugin will use the default credentials.
- `aws_sts_role_arn` (Optional) : The AWS STS role to assume for requests to SQS and S3. Defaults to null, which will use the [standard SDK behavior for credentials](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html).

- `aws_sts_header_overrides`: An optional map of header overrides to make when assuming the IAM role for the sink plugin.
- `aws_sts_header_overrides` (Optional) : An optional map of header overrides to make when assuming the IAM role for the sink plugin.

- `s3_client_connect_retries`: An integer value indicates the maximum number of times that a single request should be retried in-order to connect s3 client.
- `max_connection_retries` (Optional) : An integer value indicates the maximum number of times that a single request should be retried in-order to connect s3 client.

- `s3_object_upload_retries`: An integer value indicates the maximum number of times that single request should be retired in-order to ingest data to amazon s3.
- `max_upload_retries` (Optional) : An integer value indicates the maximum number of times that single request should be retired in-order to ingest data to amazon s3.

- `bucket`(required): Object storage built to store and retrieve any amount of data from anywhere, User must provide bucket name.
- `bucket` (Optional) : Object storage built to store and retrieve any amount of data from anywhere, User must provide bucket name.

- `key_path_prefix`(optional): key_path_prefix nothing but directory structure inside bucket in-order to store objects.
- `key_path_prefix` (Optional) : key_path_prefix nothing but directory structure inside bucket in-order to store objects.

- `file_pattern`(optional): s3-bucket object will be created based on the file-pattern (logs-${yyyy-MM-dd}), the file-pattern based on the configuration or default.
- `file_pattern` (Required) : s3-bucket object name will be created based on following pattern (${USER-DEFINED-PORTION}-${EPOCH_SECONDS}-${RANDOM}.${EXTENSION}). Example pattern: event-${yyyy-MM-dd HH-MM-SS}.

- `event_count`(required): An integer value indicates the maximum number events required to ingest into s3-bucket as part of threshold.
- `event_count` (Required) : An integer value indicates the maximum number events required to ingest into s3-bucket as part of threshold.

- `byte_capacity`(required): A String representing the count or size of bytes required to ingest into s3-bucket as part of threshold.
- `maximum_size` (Optional) : A String representing the count or size of bytes required to ingest into s3-bucket as part of threshold.

- `event_collection_duration`(required): A String representing how long events should be collected before ingest into s3-bucket as part of threshold. All Duration values are a string that represents a duration. They support ISO_8601 notation string ("PT20.345S", "PT15M", etc.) as well as simple notation Strings for seconds ("60s") and milliseconds ("1500ms").
- `event_collection_duration` (Required) : A String representing how long events should be collected before ingest into s3-bucket as part of threshold. All Duration values are a string that represents a duration. They support ISO_8601 notation string ("PT20.345S", "PT15M", etc.) as well as simple notation Strings for seconds ("60s") and milliseconds ("1500ms").



Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@
import software.amazon.awssdk.services.s3.S3Client;

class S3ObjectWorkerIT {

private static final String DEFAULT_CODEC_FILE_EXTENSION = "json";
private static final int EVENT_QUEUE_SIZE = 100000;
private static final long DURATION = 20L;
private static final String BYTE_CAPACITY = "50mb";
private S3Client s3Client;
private final BlockingQueue<Event> eventQueue;
private String bucket;
Expand All @@ -38,7 +43,6 @@ class S3ObjectWorkerIT {
private S3SinkService s3SinkService;
private ThresholdOptions thresholdOptions;
private ObjectOptions objectOptions;
private static final int EVENT_QUEUE_SIZE = 100000;

public S3ObjectWorkerIT() {
eventQueue = new ArrayBlockingQueue<>(EVENT_QUEUE_SIZE);
Expand All @@ -60,14 +64,15 @@ public void setUp() {

when(s3SinkConfig.getThresholdOptions()).thenReturn(thresholdOptions);
when(thresholdOptions.getEventCount()).thenReturn(10);
when(thresholdOptions.getByteCapacity()).thenReturn(ByteCount.parse("5mb"));
when(thresholdOptions.getEventCollectionDuration()).thenReturn(Duration.parse("PT1M"));
when(thresholdOptions.getMaximumSize()).thenReturn(ByteCount.parse(BYTE_CAPACITY));
when(thresholdOptions.getEventCollectionDuration()).thenReturn(Duration.ofSeconds(DURATION));
}

@Test
void copy_s3_object_correctly_into_s3_bucket() {
String codecFileExtension = DEFAULT_CODEC_FILE_EXTENSION;
RecordsGenerator recordsGenerator = new JsonRecordsGenerator();
s3SinkWorker = new S3SinkWorker(s3Client, s3SinkConfig, recordsGenerator.getCodec());
s3SinkWorker = new S3SinkWorker(s3Client, s3SinkConfig, recordsGenerator.getCodec(), codecFileExtension);
Collection<Record<Event>> records = setEventQueue();
s3SinkService.processRecods(records);
s3SinkService.accumulateBufferEvents(s3SinkWorker);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@
public class S3Sink extends AbstractSink<Record<Event>> {

private static final Logger LOG = LoggerFactory.getLogger(S3Sink.class);
private static final String DEFAULT_CODEC_FILE_EXTENSION = "json";
private final S3SinkConfig s3SinkConfig;
private final Codec codec;
private S3SinkWorker sinkWorker;
private volatile boolean sinkInitialized;
private S3SinkService s3SinkService;
private String codecFileExtension = null;

/**
* @param pluginSetting
Expand All @@ -46,6 +48,10 @@ public S3Sink(final PluginSetting pluginSetting, final S3SinkConfig s3SinkConfig
super(pluginSetting);
this.s3SinkConfig = s3SinkConfig;
final PluginModel codecConfiguration = s3SinkConfig.getCodec();
codecFileExtension = codecConfiguration.getPluginName();
if (codecFileExtension == null || codecFileExtension.isEmpty()) {
codecFileExtension = DEFAULT_CODEC_FILE_EXTENSION;
}
final PluginSetting codecPluginSettings = new PluginSetting(codecConfiguration.getPluginName(),
codecConfiguration.getPluginSettings());
codec = pluginFactory.loadPlugin(Codec.class, codecPluginSettings);
Expand Down Expand Up @@ -77,7 +83,7 @@ public void doInitialize() {
*/
private void doInitializeInternal() {
s3SinkService = new S3SinkService(s3SinkConfig);
sinkWorker = new S3SinkWorker(s3SinkService.createS3Client(), s3SinkConfig, codec);
sinkWorker = new S3SinkWorker(s3SinkService.createS3Client(), s3SinkConfig, codec, codecFileExtension);
sinkInitialized = Boolean.TRUE;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
*/
public class S3SinkConfig {

private static final String DEFAULT_STORAGE = "in_memory";
private static final int DEFAULT_CONNECTION_RETRIES = 5;
private static final int DEFAULT_UPLOAD_RETRIES = 5;

@JsonProperty("aws")
@NotNull
@Valid
Expand All @@ -36,24 +40,20 @@ public class S3SinkConfig {
private PluginModel codec;

@JsonProperty("temporary_storage")
@NotNull
private String temporaryStorage;
private String temporaryStorage = DEFAULT_STORAGE;

@JsonProperty("bucket")
@NotNull
private String bucketName;

@JsonProperty("key_path_prefix")
@NotNull
private String keyPathPrefix;

@JsonProperty("s3_client_connect_retries")
@NotNull
private int s3ClientConnectRetries;
@JsonProperty("max_connection_retries")
private int maxConnectionRetries = DEFAULT_CONNECTION_RETRIES;

@JsonProperty("s3_object_upload_retries")
@NotNull
private int s3ObjectUploadRetries;
@JsonProperty("max_upload_retries")
private int maxUploadRetries = DEFAULT_UPLOAD_RETRIES;

/**
* Aws Authentication configuration Options
Expand Down Expand Up @@ -107,14 +107,14 @@ public String getTemporaryStorage() {
/**
* s3 client connection retries configuration Options
*/
public int getS3ClientConnectRetries() {
return s3ClientConnectRetries;
public int getMaxConnectionRetries() {
return maxConnectionRetries;
}

/**
* s3 object upload retries configuration Options
*/
public int getS3ObjectUploadRetries() {
return s3ObjectUploadRetries;
public int getMaxUploadRetries() {
return maxUploadRetries;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public S3Client createS3Client() {
return S3Client.builder().region(s3SinkConfig.getAwsAuthenticationOptions().getAwsRegion())
.credentialsProvider(s3SinkConfig.getAwsAuthenticationOptions().authenticateAwsConfiguration())
.overrideConfiguration(ClientOverrideConfiguration.builder()
.retryPolicy(RetryPolicy.builder().numRetries(s3SinkConfig.getS3ClientConnectRetries()).build())
.retryPolicy(RetryPolicy.builder().numRetries(s3SinkConfig.getMaxConnectionRetries()).build())
.build())
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class S3SinkWorker {
private final S3SinkConfig s3SinkConfig;
private final Codec codec;
private BufferAccumulator accumulator;
private final String codecFileExtension;
private final int numEvents;
private final ByteCount byteCapacity;
private final long duration;
Expand All @@ -40,13 +41,16 @@ public class S3SinkWorker {
* @param s3Client
* @param s3SinkConfig
* @param codec
* @param codecFileExtension
*/
public S3SinkWorker(final S3Client s3Client, final S3SinkConfig s3SinkConfig, final Codec codec) {
public S3SinkWorker(final S3Client s3Client, final S3SinkConfig s3SinkConfig, final Codec codec,
final String codecFileExtension) {
this.s3Client = s3Client;
this.s3SinkConfig = s3SinkConfig;
this.codec = codec;
this.codecFileExtension = codecFileExtension;
numEvents = s3SinkConfig.getThresholdOptions().getEventCount();
byteCapacity = s3SinkConfig.getThresholdOptions().getByteCapacity();
byteCapacity = s3SinkConfig.getThresholdOptions().getMaximumSize();
duration = s3SinkConfig.getThresholdOptions().getEventCollectionDuration().getSeconds();

}
Expand All @@ -71,7 +75,7 @@ public BufferAccumulator bufferAccumulator(BlockingQueue<Event> eventQueue) {
Event event = eventQueue.take();
String jsonSerEvent = codec.parse(event);
byteCount += jsonSerEvent.getBytes().length;
localFileEventSet.add(jsonSerEvent);
localFileEventSet.add(codec.parse(event).concat(System.lineSeparator()));
eventCount++;
data++;
eventCollectionDuration = watch.getTime(TimeUnit.SECONDS);
Expand All @@ -82,7 +86,8 @@ public BufferAccumulator bufferAccumulator(BlockingQueue<Event> eventQueue) {
"Snapshot info : Byte_count = {} Bytes, " + "\t Event_count = {} Records "
+ "\t & Event_collection_duration = {} Sec",
byteCount, eventCount, eventCollectionDuration);
accumulator = new BufferAccumulator(localFileEventSet, s3Client, s3SinkConfig, eventData);
accumulator = new BufferAccumulator(localFileEventSet, s3Client, s3SinkConfig, eventData,
codecFileExtension);
} catch (InterruptedException e) {
LOG.error("Exception while storing recoreds to buffer", e);
Thread.currentThread().interrupt();
Expand All @@ -102,7 +107,11 @@ public BufferAccumulator bufferAccumulator(BlockingQueue<Event> eventQueue) {
* @return
*/
private boolean thresholdsCheck(int eventCount, StopWatch watch, int byteCount) {
return eventCount < numEvents && watch.getTime(TimeUnit.SECONDS) < duration
&& byteCount < byteCapacity.getBytes();
if (eventCount > 0) {
return eventCount < numEvents && watch.getTime(TimeUnit.SECONDS) < duration
&& byteCount < byteCapacity.getBytes();
} else {
return watch.getTime(TimeUnit.SECONDS) < duration && byteCount < byteCapacity.getBytes();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,27 +45,32 @@ public class BufferAccumulator {
private StringBuilder eventBuilder;
private final String temporaryStorage;
private final int maxObjectUploadRetry;
private final String codecFileExtension;

/**
* @param localEventSet
* @param s3Client
* @param s3SinkConfig
* @param eventDataDb
* @param codecFileExtension
*/
public BufferAccumulator(final NavigableSet<String> localEventSet, final S3Client s3Client,
final S3SinkConfig s3SinkConfig, DB eventDataDb) {
final S3SinkConfig s3SinkConfig, DB eventDataDb, final String codecFileExtension) {
this.localEventSet = localEventSet;
this.s3Client = s3Client;
this.s3SinkConfig = s3SinkConfig;
this.eventDataDb = eventDataDb;
this.codecFileExtension = codecFileExtension;
temporaryStorage = s3SinkConfig.getTemporaryStorage();
maxObjectUploadRetry = s3SinkConfig.getS3ObjectUploadRetries();
maxObjectUploadRetry = s3SinkConfig.getMaxUploadRetries();
}

/**
* Writing data to local-file/in-memory based on temporary_storage configuration
*/
public void doAccumulate() {
s3ObjectFileName = S3ObjectIndex.getIndexAliasWithDate(s3SinkConfig.getObjectOptions().getFilePattern());
s3ObjectFileName = S3ObjectIndex.getIndexAliasWithDate(s3SinkConfig.getObjectOptions().getFilePattern()) + "."
+ codecFileExtension;
eventBuilder = new StringBuilder();
if (temporaryStorage.equals(LOCAL_FILE)) {
localFileProcessor();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@

import java.util.Map;
import java.util.UUID;

import com.fasterxml.jackson.annotation.JsonProperty;

import jakarta.validation.constraints.Size;
import software.amazon.awssdk.arns.Arn;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
Expand Down
Loading

0 comments on commit b8c0b81

Please sign in to comment.