Skip to content

Commit

Permalink
Updated the name of the metrics for the new S3 sink to match the name…
Browse files Browse the repository at this point in the history
…s in the S3 source for consistency. Some test clean-up, and updated the README.md with development instructions. (#2741)

Signed-off-by: David Venable <dlv@amazon.com>
  • Loading branch information
dlvenable committed May 25, 2023
1 parent 3ce5b53 commit d7c2368
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 69 deletions.
9 changes: 9 additions & 0 deletions data-prepper-plugins/s3-sink/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,12 @@ This plugin is compatible with Java 11. See below

- [CONTRIBUTING](https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md)
- [monitoring](https://github.com/opensearch-project/data-prepper/blob/main/docs/monitoring.md)

The integration tests for this plugin do not run as part of the Data Prepper build.

The following command runs the integration tests:

```
./gradlew :data-prepper-plugins:s3-sink:integrationTest -Dtests.s3sink.region=<your-aws-region> -Dtests.s3sink.bucket=<your-bucket>
```

Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ public void setUp() {
when(s3SinkConfig.getAwsAuthenticationOptions()).thenReturn(awsAuthenticationOptions);
when(awsAuthenticationOptions.getAwsRegion()).thenReturn(Region.of(s3region));

lenient().when(pluginMetrics.counter(S3SinkService.SNAPSHOT_SUCCESS)).thenReturn(snapshotSuccessCounter);
lenient().when(pluginMetrics.counter(S3SinkService.SNAPSHOT_FAILED)).thenReturn(snapshotFailedCounter);
lenient().when(pluginMetrics.counter(S3SinkService.OBJECTS_SUCCEEDED)).thenReturn(snapshotSuccessCounter);
lenient().when(pluginMetrics.counter(S3SinkService.OBJECTS_FAILED)).thenReturn(snapshotFailedCounter);
lenient().when(pluginMetrics.counter(S3SinkService.NUMBER_OF_RECORDS_FLUSHED_TO_S3_SUCCESS)).
thenReturn(numberOfRecordsSuccessCounter);
lenient().when(pluginMetrics.counter(S3SinkService.NUMBER_OF_RECORDS_FLUSHED_TO_S3_FAILED)).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,6 @@

package org.opensearch.dataprepper.plugins.sink;

import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import org.opensearch.dataprepper.metrics.PluginMetrics;
Expand All @@ -27,18 +23,23 @@
import software.amazon.awssdk.core.retry.RetryPolicy;
import software.amazon.awssdk.services.s3.S3Client;

import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* Class responsible for create {@link S3Client} object, check thresholds,
* get new buffer and write records into buffer.
*/
public class S3SinkService {

private static final Logger LOG = LoggerFactory.getLogger(S3SinkService.class);
public static final String SNAPSHOT_SUCCESS = "snapshotSuccess";
public static final String SNAPSHOT_FAILED = "snapshotFailed";
public static final String NUMBER_OF_RECORDS_FLUSHED_TO_S3_SUCCESS = "numberOfRecordsFlushedToS3Success";
public static final String NUMBER_OF_RECORDS_FLUSHED_TO_S3_FAILED = "numberOfRecordsFlushedToS3Failed";
static final String S3_OBJECTS_SIZE = "s3ObjectSizeRecords";
public static final String OBJECTS_SUCCEEDED = "s3ObjectsSucceeded";
public static final String OBJECTS_FAILED = "s3ObjectsFailed";
public static final String NUMBER_OF_RECORDS_FLUSHED_TO_S3_SUCCESS = "s3ObjectsEventsSucceeded";
public static final String NUMBER_OF_RECORDS_FLUSHED_TO_S3_FAILED = "s3ObjectsEventsFailed";
static final String S3_OBJECTS_SIZE = "s3ObjectSizeBytes";
private final S3SinkConfig s3SinkConfig;
private final Lock reentrantLock;
private final BufferFactory bufferFactory;
Expand All @@ -49,8 +50,8 @@ public class S3SinkService {
private final long maxCollectionDuration;
private final String bucket;
private final int maxRetries;
private final Counter snapshotSuccessCounter;
private final Counter snapshotFailedCounter;
private final Counter objectsSucceededCounter;
private final Counter objectsFailedCounter;
private final Counter numberOfRecordsSuccessCounter;
private final Counter numberOfRecordsFailedCounter;
private final DistributionSummary s3ObjectSizeSummary;
Expand All @@ -62,7 +63,7 @@ public class S3SinkService {
* @param pluginMetrics metrics.
*/
public S3SinkService(final S3SinkConfig s3SinkConfig, final BufferFactory bufferFactory,
final Codec codec, PluginMetrics pluginMetrics) {
final Codec codec, final PluginMetrics pluginMetrics) {
this.s3SinkConfig = s3SinkConfig;
this.bufferFactory = bufferFactory;
this.codec = codec;
Expand All @@ -75,8 +76,8 @@ public S3SinkService(final S3SinkConfig s3SinkConfig, final BufferFactory buffer
bucket = s3SinkConfig.getBucketOptions().getBucketName();
maxRetries = s3SinkConfig.getMaxUploadRetries();

snapshotSuccessCounter = pluginMetrics.counter(SNAPSHOT_SUCCESS);
snapshotFailedCounter = pluginMetrics.counter(SNAPSHOT_FAILED);
objectsSucceededCounter = pluginMetrics.counter(OBJECTS_SUCCEEDED);
objectsFailedCounter = pluginMetrics.counter(OBJECTS_FAILED);
numberOfRecordsSuccessCounter = pluginMetrics.counter(NUMBER_OF_RECORDS_FLUSHED_TO_S3_SUCCESS);
numberOfRecordsFailedCounter = pluginMetrics.counter(NUMBER_OF_RECORDS_FLUSHED_TO_S3_FAILED);
s3ObjectSizeSummary = pluginMetrics.summary(S3_OBJECTS_SIZE);
Expand All @@ -100,19 +101,19 @@ void output(Collection<Record<Event>> records) {

currentBuffer.writeEvent(encodedBytes);
if (ThresholdCheck.checkThresholdExceed(currentBuffer, maxEvents, maxBytes, maxCollectionDuration)) {
s3ObjectSizeSummary.record(currentBuffer.getEventCount());
LOG.info("Event collection Object info : Byte_capacity = {} Bytes," +
" Event_count = {} Records & Event_collection_duration = {} Sec",
currentBuffer.getSize(), currentBuffer.getEventCount(), currentBuffer.getDuration());
boolean isFlushToS3 = retryFlushToS3(currentBuffer);
final String s3Key = generateKey();
LOG.info("Writing {} to S3 with {} events and size of {} bytes.",
s3Key, currentBuffer.getEventCount(), currentBuffer.getSize());
final boolean isFlushToS3 = retryFlushToS3(currentBuffer, s3Key);
if (isFlushToS3) {
LOG.info("Event collection Object uploaded successfully");
LOG.info("Successfully saved {} to S3.", s3Key);
numberOfRecordsSuccessCounter.increment(currentBuffer.getEventCount());
snapshotSuccessCounter.increment();
objectsSucceededCounter.increment();
s3ObjectSizeSummary.record(currentBuffer.getSize());
} else {
LOG.info("Event collection Object upload failed");
LOG.error("Failed to save {} to S3.", s3Key);
numberOfRecordsFailedCounter.increment(currentBuffer.getEventCount());
snapshotFailedCounter.increment();
objectsFailedCounter.increment();
}
currentBuffer = bufferFactory.getBuffer();
}
Expand All @@ -126,16 +127,18 @@ void output(Collection<Record<Event>> records) {

/**
* perform retry in-case any issue occurred, based on max_upload_retries configuration.
*
* @param currentBuffer current buffer.
* @param s3Key
* @return boolean based on object upload status.
* @throws InterruptedException interruption during sleep.
*/
protected boolean retryFlushToS3(Buffer currentBuffer) throws InterruptedException {
protected boolean retryFlushToS3(final Buffer currentBuffer, final String s3Key) throws InterruptedException {
boolean isUploadedToS3 = Boolean.FALSE;
int retryCount = maxRetries;
do {
try {
currentBuffer.flushToS3(createS3Client(), bucket, generateKey());
currentBuffer.flushToS3(createS3Client(), bucket, s3Key);
isUploadedToS3 = Boolean.TRUE;
} catch (AwsServiceException | SdkClientException e) {
LOG.error("Exception occurred while uploading records to s3 bucket. Retry countdown : {} | exception:",
Expand Down
Loading

0 comments on commit d7c2368

Please sign in to comment.