Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updates to the S3 sink: metric names, README #2741

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions data-prepper-plugins/s3-sink/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,12 @@ This plugin is compatible with Java 11. See below

- [CONTRIBUTING](https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md)
- [monitoring](https://github.com/opensearch-project/data-prepper/blob/main/docs/monitoring.md)

The integration tests for this plugin do not run as part of the Data Prepper build.

The following command runs the integration tests:

```
./gradlew :data-prepper-plugins:s3-sink:integrationTest -Dtests.s3sink.region=<your-aws-region> -Dtests.s3sink.bucket=<your-bucket>
```

Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ public void setUp() {
when(s3SinkConfig.getAwsAuthenticationOptions()).thenReturn(awsAuthenticationOptions);
when(awsAuthenticationOptions.getAwsRegion()).thenReturn(Region.of(s3region));

lenient().when(pluginMetrics.counter(S3SinkService.SNAPSHOT_SUCCESS)).thenReturn(snapshotSuccessCounter);
lenient().when(pluginMetrics.counter(S3SinkService.SNAPSHOT_FAILED)).thenReturn(snapshotFailedCounter);
lenient().when(pluginMetrics.counter(S3SinkService.OBJECTS_SUCCEEDED)).thenReturn(snapshotSuccessCounter);
lenient().when(pluginMetrics.counter(S3SinkService.OBJECTS_FAILED)).thenReturn(snapshotFailedCounter);
lenient().when(pluginMetrics.counter(S3SinkService.NUMBER_OF_RECORDS_FLUSHED_TO_S3_SUCCESS)).
thenReturn(numberOfRecordsSuccessCounter);
lenient().when(pluginMetrics.counter(S3SinkService.NUMBER_OF_RECORDS_FLUSHED_TO_S3_FAILED)).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,6 @@

package org.opensearch.dataprepper.plugins.sink;

import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import org.opensearch.dataprepper.metrics.PluginMetrics;
Expand All @@ -27,18 +23,23 @@
import software.amazon.awssdk.core.retry.RetryPolicy;
import software.amazon.awssdk.services.s3.S3Client;

import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* Class responsible for create {@link S3Client} object, check thresholds,
* get new buffer and write records into buffer.
*/
public class S3SinkService {

private static final Logger LOG = LoggerFactory.getLogger(S3SinkService.class);
public static final String SNAPSHOT_SUCCESS = "snapshotSuccess";
public static final String SNAPSHOT_FAILED = "snapshotFailed";
public static final String NUMBER_OF_RECORDS_FLUSHED_TO_S3_SUCCESS = "numberOfRecordsFlushedToS3Success";
public static final String NUMBER_OF_RECORDS_FLUSHED_TO_S3_FAILED = "numberOfRecordsFlushedToS3Failed";
static final String S3_OBJECTS_SIZE = "s3ObjectSizeRecords";
public static final String OBJECTS_SUCCEEDED = "s3ObjectsSucceeded";
public static final String OBJECTS_FAILED = "s3ObjectsFailed";
public static final String NUMBER_OF_RECORDS_FLUSHED_TO_S3_SUCCESS = "s3ObjectsEventsSucceeded";
public static final String NUMBER_OF_RECORDS_FLUSHED_TO_S3_FAILED = "s3ObjectsEventsFailed";
static final String S3_OBJECTS_SIZE = "s3ObjectSizeBytes";
private final S3SinkConfig s3SinkConfig;
private final Lock reentrantLock;
private final BufferFactory bufferFactory;
Expand All @@ -49,8 +50,8 @@ public class S3SinkService {
private final long maxCollectionDuration;
private final String bucket;
private final int maxRetries;
private final Counter snapshotSuccessCounter;
private final Counter snapshotFailedCounter;
private final Counter objectsSucceededCounter;
private final Counter objectsFailedCounter;
private final Counter numberOfRecordsSuccessCounter;
private final Counter numberOfRecordsFailedCounter;
private final DistributionSummary s3ObjectSizeSummary;
Expand All @@ -62,7 +63,7 @@ public class S3SinkService {
* @param pluginMetrics metrics.
*/
public S3SinkService(final S3SinkConfig s3SinkConfig, final BufferFactory bufferFactory,
final Codec codec, PluginMetrics pluginMetrics) {
final Codec codec, final PluginMetrics pluginMetrics) {
this.s3SinkConfig = s3SinkConfig;
this.bufferFactory = bufferFactory;
this.codec = codec;
Expand All @@ -75,8 +76,8 @@ public S3SinkService(final S3SinkConfig s3SinkConfig, final BufferFactory buffer
bucket = s3SinkConfig.getBucketOptions().getBucketName();
maxRetries = s3SinkConfig.getMaxUploadRetries();

snapshotSuccessCounter = pluginMetrics.counter(SNAPSHOT_SUCCESS);
snapshotFailedCounter = pluginMetrics.counter(SNAPSHOT_FAILED);
objectsSucceededCounter = pluginMetrics.counter(OBJECTS_SUCCEEDED);
objectsFailedCounter = pluginMetrics.counter(OBJECTS_FAILED);
numberOfRecordsSuccessCounter = pluginMetrics.counter(NUMBER_OF_RECORDS_FLUSHED_TO_S3_SUCCESS);
numberOfRecordsFailedCounter = pluginMetrics.counter(NUMBER_OF_RECORDS_FLUSHED_TO_S3_FAILED);
s3ObjectSizeSummary = pluginMetrics.summary(S3_OBJECTS_SIZE);
Expand All @@ -100,19 +101,19 @@ void output(Collection<Record<Event>> records) {

currentBuffer.writeEvent(encodedBytes);
if (ThresholdCheck.checkThresholdExceed(currentBuffer, maxEvents, maxBytes, maxCollectionDuration)) {
s3ObjectSizeSummary.record(currentBuffer.getEventCount());
LOG.info("Event collection Object info : Byte_capacity = {} Bytes," +
" Event_count = {} Records & Event_collection_duration = {} Sec",
currentBuffer.getSize(), currentBuffer.getEventCount(), currentBuffer.getDuration());
boolean isFlushToS3 = retryFlushToS3(currentBuffer);
final String s3Key = generateKey();
LOG.info("Writing {} to S3 with {} events and size of {} bytes.",
s3Key, currentBuffer.getEventCount(), currentBuffer.getSize());
final boolean isFlushToS3 = retryFlushToS3(currentBuffer, s3Key);
if (isFlushToS3) {
LOG.info("Event collection Object uploaded successfully");
LOG.info("Successfully saved {} to S3.", s3Key);
numberOfRecordsSuccessCounter.increment(currentBuffer.getEventCount());
snapshotSuccessCounter.increment();
objectsSucceededCounter.increment();
s3ObjectSizeSummary.record(currentBuffer.getSize());
} else {
LOG.info("Event collection Object upload failed");
LOG.error("Failed to save {} to S3.", s3Key);
numberOfRecordsFailedCounter.increment(currentBuffer.getEventCount());
snapshotFailedCounter.increment();
objectsFailedCounter.increment();
}
currentBuffer = bufferFactory.getBuffer();
}
Expand All @@ -126,16 +127,18 @@ void output(Collection<Record<Event>> records) {

/**
* perform retry in-case any issue occurred, based on max_upload_retries configuration.
*
* @param currentBuffer current buffer.
* @param s3Key
* @return boolean based on object upload status.
* @throws InterruptedException interruption during sleep.
*/
protected boolean retryFlushToS3(Buffer currentBuffer) throws InterruptedException {
protected boolean retryFlushToS3(final Buffer currentBuffer, final String s3Key) throws InterruptedException {
boolean isUploadedToS3 = Boolean.FALSE;
int retryCount = maxRetries;
do {
try {
currentBuffer.flushToS3(createS3Client(), bucket, generateKey());
currentBuffer.flushToS3(createS3Client(), bucket, s3Key);
isUploadedToS3 = Boolean.TRUE;
} catch (AwsServiceException | SdkClientException e) {
LOG.error("Exception occurred while uploading records to s3 bucket. Retry countdown : {} | exception:",
Expand Down
Loading