diff --git a/data-prepper-plugins/s3-sink/README.md b/data-prepper-plugins/s3-sink/README.md index f3ebfeb23b..c8ab865ba2 100644 --- a/data-prepper-plugins/s3-sink/README.md +++ b/data-prepper-plugins/s3-sink/README.md @@ -64,3 +64,12 @@ This plugin is compatible with Java 11. See below - [CONTRIBUTING](https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md) - [monitoring](https://github.com/opensearch-project/data-prepper/blob/main/docs/monitoring.md) + +The integration tests for this plugin do not run as part of the Data Prepper build. + +The following command runs the integration tests: + +``` +./gradlew :data-prepper-plugins:s3-sink:integrationTest -Dtests.s3sink.region= -Dtests.s3sink.bucket= +``` + diff --git a/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/S3SinkServiceIT.java b/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/S3SinkServiceIT.java index 05243acc5d..ada78fa9cb 100644 --- a/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/S3SinkServiceIT.java +++ b/data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/S3SinkServiceIT.java @@ -102,8 +102,8 @@ public void setUp() { when(s3SinkConfig.getAwsAuthenticationOptions()).thenReturn(awsAuthenticationOptions); when(awsAuthenticationOptions.getAwsRegion()).thenReturn(Region.of(s3region)); - lenient().when(pluginMetrics.counter(S3SinkService.SNAPSHOT_SUCCESS)).thenReturn(snapshotSuccessCounter); - lenient().when(pluginMetrics.counter(S3SinkService.SNAPSHOT_FAILED)).thenReturn(snapshotFailedCounter); + lenient().when(pluginMetrics.counter(S3SinkService.OBJECTS_SUCCEEDED)).thenReturn(snapshotSuccessCounter); + lenient().when(pluginMetrics.counter(S3SinkService.OBJECTS_FAILED)).thenReturn(snapshotFailedCounter); lenient().when(pluginMetrics.counter(S3SinkService.NUMBER_OF_RECORDS_FLUSHED_TO_S3_SUCCESS)). thenReturn(numberOfRecordsSuccessCounter); lenient().when(pluginMetrics.counter(S3SinkService.NUMBER_OF_RECORDS_FLUSHED_TO_S3_FAILED)). diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkService.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkService.java index 601d37b79c..6aebbd4b94 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkService.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3SinkService.java @@ -5,10 +5,6 @@ package org.opensearch.dataprepper.plugins.sink; -import java.io.IOException; -import java.util.Collection; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.DistributionSummary; import org.opensearch.dataprepper.metrics.PluginMetrics; @@ -27,6 +23,11 @@ import software.amazon.awssdk.core.retry.RetryPolicy; import software.amazon.awssdk.services.s3.S3Client; +import java.io.IOException; +import java.util.Collection; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + /** * Class responsible for create {@link S3Client} object, check thresholds, * get new buffer and write records into buffer. @@ -34,11 +35,11 @@ public class S3SinkService { private static final Logger LOG = LoggerFactory.getLogger(S3SinkService.class); - public static final String SNAPSHOT_SUCCESS = "snapshotSuccess"; - public static final String SNAPSHOT_FAILED = "snapshotFailed"; - public static final String NUMBER_OF_RECORDS_FLUSHED_TO_S3_SUCCESS = "numberOfRecordsFlushedToS3Success"; - public static final String NUMBER_OF_RECORDS_FLUSHED_TO_S3_FAILED = "numberOfRecordsFlushedToS3Failed"; - static final String S3_OBJECTS_SIZE = "s3ObjectSizeRecords"; + public static final String OBJECTS_SUCCEEDED = "s3ObjectsSucceeded"; + public static final String OBJECTS_FAILED = "s3ObjectsFailed"; + public static final String NUMBER_OF_RECORDS_FLUSHED_TO_S3_SUCCESS = "s3ObjectsEventsSucceeded"; + public static final String NUMBER_OF_RECORDS_FLUSHED_TO_S3_FAILED = "s3ObjectsEventsFailed"; + static final String S3_OBJECTS_SIZE = "s3ObjectSizeBytes"; private final S3SinkConfig s3SinkConfig; private final Lock reentrantLock; private final BufferFactory bufferFactory; @@ -49,8 +50,8 @@ public class S3SinkService { private final long maxCollectionDuration; private final String bucket; private final int maxRetries; - private final Counter snapshotSuccessCounter; - private final Counter snapshotFailedCounter; + private final Counter objectsSucceededCounter; + private final Counter objectsFailedCounter; private final Counter numberOfRecordsSuccessCounter; private final Counter numberOfRecordsFailedCounter; private final DistributionSummary s3ObjectSizeSummary; @@ -62,7 +63,7 @@ public class S3SinkService { * @param pluginMetrics metrics. */ public S3SinkService(final S3SinkConfig s3SinkConfig, final BufferFactory bufferFactory, - final Codec codec, PluginMetrics pluginMetrics) { + final Codec codec, final PluginMetrics pluginMetrics) { this.s3SinkConfig = s3SinkConfig; this.bufferFactory = bufferFactory; this.codec = codec; @@ -75,8 +76,8 @@ public S3SinkService(final S3SinkConfig s3SinkConfig, final BufferFactory buffer bucket = s3SinkConfig.getBucketOptions().getBucketName(); maxRetries = s3SinkConfig.getMaxUploadRetries(); - snapshotSuccessCounter = pluginMetrics.counter(SNAPSHOT_SUCCESS); - snapshotFailedCounter = pluginMetrics.counter(SNAPSHOT_FAILED); + objectsSucceededCounter = pluginMetrics.counter(OBJECTS_SUCCEEDED); + objectsFailedCounter = pluginMetrics.counter(OBJECTS_FAILED); numberOfRecordsSuccessCounter = pluginMetrics.counter(NUMBER_OF_RECORDS_FLUSHED_TO_S3_SUCCESS); numberOfRecordsFailedCounter = pluginMetrics.counter(NUMBER_OF_RECORDS_FLUSHED_TO_S3_FAILED); s3ObjectSizeSummary = pluginMetrics.summary(S3_OBJECTS_SIZE); @@ -100,19 +101,19 @@ void output(Collection> records) { currentBuffer.writeEvent(encodedBytes); if (ThresholdCheck.checkThresholdExceed(currentBuffer, maxEvents, maxBytes, maxCollectionDuration)) { - s3ObjectSizeSummary.record(currentBuffer.getEventCount()); - LOG.info("Event collection Object info : Byte_capacity = {} Bytes," + - " Event_count = {} Records & Event_collection_duration = {} Sec", - currentBuffer.getSize(), currentBuffer.getEventCount(), currentBuffer.getDuration()); - boolean isFlushToS3 = retryFlushToS3(currentBuffer); + final String s3Key = generateKey(); + LOG.info("Writing {} to S3 with {} events and size of {} bytes.", + s3Key, currentBuffer.getEventCount(), currentBuffer.getSize()); + final boolean isFlushToS3 = retryFlushToS3(currentBuffer, s3Key); if (isFlushToS3) { - LOG.info("Event collection Object uploaded successfully"); + LOG.info("Successfully saved {} to S3.", s3Key); numberOfRecordsSuccessCounter.increment(currentBuffer.getEventCount()); - snapshotSuccessCounter.increment(); + objectsSucceededCounter.increment(); + s3ObjectSizeSummary.record(currentBuffer.getSize()); } else { - LOG.info("Event collection Object upload failed"); + LOG.error("Failed to save {} to S3.", s3Key); numberOfRecordsFailedCounter.increment(currentBuffer.getEventCount()); - snapshotFailedCounter.increment(); + objectsFailedCounter.increment(); } currentBuffer = bufferFactory.getBuffer(); } @@ -126,16 +127,18 @@ void output(Collection> records) { /** * perform retry in-case any issue occurred, based on max_upload_retries configuration. + * * @param currentBuffer current buffer. + * @param s3Key * @return boolean based on object upload status. * @throws InterruptedException interruption during sleep. */ - protected boolean retryFlushToS3(Buffer currentBuffer) throws InterruptedException { + protected boolean retryFlushToS3(final Buffer currentBuffer, final String s3Key) throws InterruptedException { boolean isUploadedToS3 = Boolean.FALSE; int retryCount = maxRetries; do { try { - currentBuffer.flushToS3(createS3Client(), bucket, generateKey()); + currentBuffer.flushToS3(createS3Client(), bucket, s3Key); isUploadedToS3 = Boolean.TRUE; } catch (AwsServiceException | SdkClientException e) { LOG.error("Exception occurred while uploading records to s3 bucket. Retry countdown : {} | exception:", diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/S3SinkServiceTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/S3SinkServiceTest.java index 6c2eea1e75..31f7c1a37e 100644 --- a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/S3SinkServiceTest.java +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/S3SinkServiceTest.java @@ -5,29 +5,6 @@ package org.opensearch.dataprepper.plugins.sink; -import static org.hamcrest.CoreMatchers.instanceOf; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.lenient; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; -import java.io.IOException; -import java.time.Duration; -import java.time.LocalDateTime; -import java.time.ZoneId; -import java.time.ZonedDateTime; -import java.time.format.DateTimeFormatter; -import java.util.ArrayList; -import java.util.Collection; -import java.util.TimeZone; -import java.util.UUID; import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.DistributionSummary; import org.junit.jupiter.api.BeforeEach; @@ -51,10 +28,40 @@ import org.opensearch.dataprepper.plugins.sink.configuration.BucketOptions; import org.opensearch.dataprepper.plugins.sink.configuration.ObjectKeyOptions; import org.opensearch.dataprepper.plugins.sink.configuration.ThresholdOptions; -import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.awscore.exception.AwsServiceException; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3Client; +import java.io.IOException; +import java.time.Duration; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Random; +import java.util.TimeZone; +import java.util.UUID; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + class S3SinkServiceTest { public static final int MAX_EVENTS = 10; @@ -66,15 +73,17 @@ class S3SinkServiceTest { public static final String CODEC_PLUGIN_NAME = "json"; public static final String PATH_PREFIX = "logdata/"; private S3SinkConfig s3SinkConfig; - private AwsCredentialsProvider awsCredentialsProvider; private JsonCodec codec; private PluginMetrics pluginMetrics; private BufferFactory bufferFactory; private Counter snapshotSuccessCounter; + private DistributionSummary s3ObjectSizeSummary; + private Random random; @BeforeEach void setUp() throws Exception { + random = new Random(); s3SinkConfig = mock(S3SinkConfig.class); ThresholdOptions thresholdOptions = mock(ThresholdOptions.class); BucketOptions bucketOptions = mock(BucketOptions.class); @@ -89,7 +98,7 @@ void setUp() throws Exception { Counter snapshotFailedCounter = mock(Counter.class); Counter numberOfRecordsSuccessCounter = mock(Counter.class); Counter numberOfRecordsFailedCounter = mock(Counter.class); - DistributionSummary s3ObjectSizeSummary = mock(DistributionSummary.class); + s3ObjectSizeSummary = mock(DistributionSummary.class); bufferFactory = new InMemoryBufferFactory(); @@ -106,13 +115,12 @@ void setUp() throws Exception { when(s3SinkConfig.getBucketOptions().getObjectKeyOptions().getPathPrefix()).thenReturn(PATH_PREFIX); when(s3SinkConfig.getAwsAuthenticationOptions()).thenReturn(awsAuthenticationOptions); when(awsAuthenticationOptions.getAwsRegion()).thenReturn(Region.of(S3_REGION)); - when(awsAuthenticationOptions.authenticateAwsConfiguration()).thenReturn(awsCredentialsProvider); when(s3SinkConfig.getCodec()).thenReturn(pluginModel); when(pluginModel.getPluginName()).thenReturn(CODEC_PLUGIN_NAME); when(pluginFactory.loadPlugin(Codec.class, pluginSetting)).thenReturn(codec); - lenient().when(pluginMetrics.counter(S3SinkService.SNAPSHOT_SUCCESS)).thenReturn(snapshotSuccessCounter); - lenient().when(pluginMetrics.counter(S3SinkService.SNAPSHOT_FAILED)).thenReturn(snapshotFailedCounter); + lenient().when(pluginMetrics.counter(S3SinkService.OBJECTS_SUCCEEDED)).thenReturn(snapshotSuccessCounter); + lenient().when(pluginMetrics.counter(S3SinkService.OBJECTS_FAILED)).thenReturn(snapshotFailedCounter); lenient().when(pluginMetrics.counter(S3SinkService.NUMBER_OF_RECORDS_FLUSHED_TO_S3_SUCCESS)). thenReturn(numberOfRecordsSuccessCounter); lenient().when(pluginMetrics.counter(S3SinkService.NUMBER_OF_RECORDS_FLUSHED_TO_S3_FAILED)). @@ -120,16 +128,20 @@ void setUp() throws Exception { lenient().when(pluginMetrics.summary(S3SinkService.S3_OBJECTS_SIZE)).thenReturn(s3ObjectSizeSummary); } + private S3SinkService createObjectUnderTest() { + return new S3SinkService(s3SinkConfig, bufferFactory, codec, pluginMetrics); + } + @Test void test_s3SinkService_notNull() { - S3SinkService s3SinkService = new S3SinkService(s3SinkConfig, bufferFactory, codec, pluginMetrics); + S3SinkService s3SinkService = createObjectUnderTest(); assertNotNull(s3SinkService); assertThat(s3SinkService, instanceOf(S3SinkService.class)); } @Test void test_s3Client_notNull() { - S3SinkService s3SinkService = new S3SinkService(s3SinkConfig, bufferFactory, codec, pluginMetrics); + S3SinkService s3SinkService = createObjectUnderTest(); S3Client s3Client = s3SinkService.createS3Client(); assertNotNull(s3Client); assertThat(s3Client, instanceOf(S3Client.class)); @@ -139,7 +151,7 @@ void test_s3Client_notNull() { void test_generateKey_with_general_prefix() { String pathPrefix = "events/"; when(s3SinkConfig.getBucketOptions().getObjectKeyOptions().getPathPrefix()).thenReturn(pathPrefix); - S3SinkService s3SinkService = new S3SinkService(s3SinkConfig, bufferFactory, codec, pluginMetrics); + S3SinkService s3SinkService = createObjectUnderTest(); String key = s3SinkService.generateKey(); assertNotNull(key); assertThat(key, true); @@ -158,7 +170,7 @@ void test_generateKey_with_date_prefix() { when(s3SinkConfig.getBucketOptions().getObjectKeyOptions() .getPathPrefix()).thenReturn(pathPrefix + datePattern); - S3SinkService s3SinkService = new S3SinkService(s3SinkConfig, bufferFactory, codec, pluginMetrics); + S3SinkService s3SinkService = createObjectUnderTest(); String key = s3SinkService.generateKey(); assertNotNull(key); assertThat(key, true); @@ -175,7 +187,7 @@ void test_output_with_threshold_set_as_more_then_zero_event_count() throws IOExc when(s3SinkConfig.getThresholdOptions().getEventCount()).thenReturn(5); when(codec.parse(any())).thenReturn("{\"message\":\"31824252-adba-4c47-a2ac-05d16c5b8140\"}"); - S3SinkService s3SinkService = new S3SinkService(s3SinkConfig, bufferFactory, codec, pluginMetrics); + S3SinkService s3SinkService = createObjectUnderTest(); assertNotNull(s3SinkService); s3SinkService.output(generateRandomStringEventRecord()); verify(snapshotSuccessCounter, times(50)).increment(); @@ -196,7 +208,7 @@ void test_output_with_threshold_set_as_zero_event_count() throws IOException { when(s3SinkConfig.getThresholdOptions().getEventCount()).thenReturn(0); when(s3SinkConfig.getThresholdOptions().getMaximumSize()).thenReturn(ByteCount.parse("2kb")); when(codec.parse(any())).thenReturn("{\"message\":\"31824252-adba-4c47-a2ac-05d16c5b8140\"}"); - S3SinkService s3SinkService = new S3SinkService(s3SinkConfig, bufferFactory, codec, pluginMetrics); + S3SinkService s3SinkService = createObjectUnderTest(); assertNotNull(s3SinkService); s3SinkService.output(generateRandomStringEventRecord()); verify(snapshotSuccessCounter, times(50)).increment(); @@ -204,7 +216,8 @@ void test_output_with_threshold_set_as_zero_event_count() throws IOException { @Test void test_catch_output_exception_cover() { - S3SinkService s3SinkService = new S3SinkService(s3SinkConfig, bufferFactory, null, pluginMetrics); + codec = null; + S3SinkService s3SinkService = createObjectUnderTest(); assertNotNull(s3SinkService); assertThat(s3SinkService, instanceOf(S3SinkService.class)); s3SinkService.output(generateRandomStringEventRecord()); @@ -221,25 +234,63 @@ void test_output_with_uploadedToS3_success() throws IOException { when(bufferFactory.getBuffer()).thenReturn(buffer); when(codec.parse(any())).thenReturn("{\"message\":\"31824252-adba-4c47-a2ac-05d16c5b8140\"}"); - S3SinkService s3SinkService = new S3SinkService(s3SinkConfig, bufferFactory, codec, pluginMetrics); + S3SinkService s3SinkService = createObjectUnderTest(); assertNotNull(s3SinkService); assertThat(s3SinkService, instanceOf(S3SinkService.class)); s3SinkService.output(generateRandomStringEventRecord()); verify(snapshotSuccessCounter, times(50)).increment(); } + @Test + void test_output_with_uploadedToS3_success_records_byte_count() throws IOException { + + bufferFactory = mock(BufferFactory.class); + Buffer buffer = mock(Buffer.class); + when(bufferFactory.getBuffer()).thenReturn(buffer); + + final long objectSize = random.nextInt(1_000_000) + 10_000; + when(buffer.getSize()).thenReturn(objectSize); + + when(codec.parse(any())).thenReturn(UUID.randomUUID().toString()); + final S3SinkService s3SinkService = createObjectUnderTest(); + s3SinkService.output(generateRandomStringEventRecord()); + + verify(s3ObjectSizeSummary, times(50)).record(objectSize); + } + @Test void test_output_with_uploadedToS3_failed() throws IOException { when(s3SinkConfig.getBucketOptions().getBucketName()).thenReturn(null); when(s3SinkConfig.getMaxUploadRetries()).thenReturn(3); when(codec.parse(any())).thenReturn("{\"message\":\"31824252-adba-4c47-a2ac-05d16c5b8140\"}"); - S3SinkService s3SinkService = new S3SinkService(s3SinkConfig, bufferFactory, codec, pluginMetrics); + S3SinkService s3SinkService = createObjectUnderTest(); assertNotNull(s3SinkService); assertThat(s3SinkService, instanceOf(S3SinkService.class)); s3SinkService.output(generateLessRandomStringEventRecord()); verify(snapshotSuccessCounter, times(0)).increment(); } + @Test + void test_output_with_uploadedToS3_failure_does_not_record_byte_count() throws IOException { + + bufferFactory = mock(BufferFactory.class); + Buffer buffer = mock(Buffer.class); + when(bufferFactory.getBuffer()).thenReturn(buffer); + + doThrow(AwsServiceException.class).when(buffer).flushToS3(any(), anyString(), anyString()); + + final long objectSize = random.nextInt(1_000_000) + 10_000; + when(buffer.getSize()).thenReturn(objectSize); + + when(codec.parse(any())).thenReturn(UUID.randomUUID().toString()); + final S3SinkService s3SinkService = createObjectUnderTest(); + final Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); + s3SinkService.output(Collections.singletonList(new Record<>(event))); + + verify(s3ObjectSizeSummary, never()).record(anyLong()); + verify(buffer, times(3)).flushToS3(any(), anyString(), anyString()); + } + @Test void test_retryFlushToS3_positive() throws InterruptedException, IOException { @@ -248,23 +299,25 @@ void test_retryFlushToS3_positive() throws InterruptedException, IOException { doNothing().when(buffer).flushToS3(any(S3Client.class), anyString(), any(String.class)); when(bufferFactory.getBuffer()).thenReturn(buffer); - S3SinkService s3SinkService = new S3SinkService(s3SinkConfig, bufferFactory, codec, pluginMetrics); + S3SinkService s3SinkService = createObjectUnderTest(); assertNotNull(s3SinkService); assertNotNull(buffer); buffer.writeEvent(generateByteArray()); - boolean isUploadedToS3 = s3SinkService.retryFlushToS3(buffer); + final String s3Key = UUID.randomUUID().toString(); + boolean isUploadedToS3 = s3SinkService.retryFlushToS3(buffer, s3Key); assertTrue(isUploadedToS3); } @Test void test_retryFlushToS3_negative() throws InterruptedException, IOException { when(s3SinkConfig.getBucketOptions().getBucketName()).thenReturn(""); - S3SinkService s3SinkService = new S3SinkService(s3SinkConfig, bufferFactory, codec, pluginMetrics); + S3SinkService s3SinkService = createObjectUnderTest(); assertNotNull(s3SinkService); Buffer buffer = bufferFactory.getBuffer(); assertNotNull(buffer); buffer.writeEvent(generateByteArray()); - boolean isUploadedToS3 = s3SinkService.retryFlushToS3(buffer); + final String s3Key = UUID.randomUUID().toString(); + boolean isUploadedToS3 = s3SinkService.retryFlushToS3(buffer, s3Key); assertFalse(isUploadedToS3); }