diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3Sink.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3Sink.java index 1346163039..2e312631f4 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3Sink.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/S3Sink.java @@ -16,7 +16,9 @@ import org.opensearch.dataprepper.model.sink.AbstractSink; import org.opensearch.dataprepper.model.sink.Sink; import org.opensearch.dataprepper.plugins.sink.accumulator.BufferFactory; +import org.opensearch.dataprepper.plugins.sink.accumulator.BufferTypeOptions; import org.opensearch.dataprepper.plugins.sink.accumulator.InMemoryBufferFactory; +import org.opensearch.dataprepper.plugins.sink.accumulator.LocalFileBufferFactory; import org.opensearch.dataprepper.plugins.sink.codec.Codec; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,7 +54,11 @@ public S3Sink(final PluginSetting pluginSetting, final S3SinkConfig s3SinkConfig codec = pluginFactory.loadPlugin(Codec.class, codecPluginSettings); sinkInitialized = Boolean.FALSE; - bufferFactory = new InMemoryBufferFactory(); + if (s3SinkConfig.getBufferType().equals(BufferTypeOptions.LOCALFILE)) { + bufferFactory = new LocalFileBufferFactory(); + } else { + bufferFactory = new InMemoryBufferFactory(); + } } @Override diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/BufferTypeOptions.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/BufferTypeOptions.java index ba2b36928e..5200f8c741 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/BufferTypeOptions.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/BufferTypeOptions.java @@ -15,19 +15,20 @@ */ public enum BufferTypeOptions { - INMEMORY("in_memory", new InMemoryBuffer()); + INMEMORY("in_memory", new InMemoryBufferFactory()), + LOCALFILE("local_file", new LocalFileBufferFactory()); private final String option; - private final Buffer bufferType; + private final BufferFactory bufferType; private static final Map OPTIONS_MAP = Arrays.stream(BufferTypeOptions.values()) .collect(Collectors.toMap(value -> value.option, value -> value)); - BufferTypeOptions(final String option, final Buffer bufferType) { + BufferTypeOptions(final String option, final BufferFactory bufferType) { this.option = option.toLowerCase(); this.bufferType = bufferType; } - public Buffer getBufferType() { + public BufferFactory getBufferType() { return bufferType; } diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/LocalFileBuffer.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/LocalFileBuffer.java new file mode 100644 index 0000000000..da5c9faa1a --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/LocalFileBuffer.java @@ -0,0 +1,114 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.accumulator; + +import org.apache.commons.lang3.time.StopWatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.concurrent.TimeUnit; + +/** + * A buffer can hold local file data and flushing it to S3. + */ +public class LocalFileBuffer implements Buffer { + + private static final Logger LOG = LoggerFactory.getLogger(LocalFileBuffer.class); + private final OutputStream outputStream; + private int eventCount; + private final StopWatch watch; + private final File localFile; + + LocalFileBuffer(File tempFile) throws FileNotFoundException { + localFile = tempFile; + outputStream = new BufferedOutputStream(new FileOutputStream(tempFile)); + eventCount = 0; + watch = new StopWatch(); + watch.start(); + } + + @Override + public long getSize() { + try { + outputStream.flush(); + } catch (IOException e) { + LOG.error("An exception occurred while flushing data to buffered output stream :", e); + } + return localFile.length(); + } + + @Override + public int getEventCount() { + return eventCount; + } + + @Override + public long getDuration(){ + return watch.getTime(TimeUnit.SECONDS); + } + + /** + * Upload accumulated data to amazon s3. + * @param s3Client s3 client object. + * @param bucket bucket name. + * @param key s3 object key path. + */ + @Override + public void flushToS3(S3Client s3Client, String bucket, String key) { + flushAndCloseStream(); + s3Client.putObject( + PutObjectRequest.builder().bucket(bucket).key(key).build(), + RequestBody.fromFile(localFile)); + removeTemporaryFile(); + } + + /** + * write byte array to output stream. + * @param bytes byte array. + * @throws IOException while writing to output stream fails. + */ + @Override + public void writeEvent(byte[] bytes) throws IOException { + outputStream.write(bytes); + outputStream.write(System.lineSeparator().getBytes()); + eventCount++; + } + + /** + * Flushing the buffered data into the output stream. + */ + protected void flushAndCloseStream(){ + try { + outputStream.flush(); + outputStream.close(); + } catch (IOException e) { + LOG.error("An exception occurred while flushing data to buffered output stream :", e); + } + } + + /** + * Remove the local temp file after flushing data to s3. + */ + protected void removeTemporaryFile() { + if (localFile != null) { + try { + Files.deleteIfExists(Paths.get(localFile.toString())); + } catch (IOException e) { + LOG.error("Unable to delete Local file {}", localFile, e); + } + } + } +} \ No newline at end of file diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/LocalFileBufferFactory.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/LocalFileBufferFactory.java new file mode 100644 index 0000000000..3d27ed08b8 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/accumulator/LocalFileBufferFactory.java @@ -0,0 +1,30 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.accumulator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.File; +import java.io.IOException; + +public class LocalFileBufferFactory implements BufferFactory { + + private static final Logger LOG = LoggerFactory.getLogger(LocalFileBufferFactory.class); + public static final String PREFIX = "local"; + public static final String SUFFIX = ".log"; + @Override + public Buffer getBuffer() { + File tempFile = null; + Buffer localfileBuffer = null; + try { + tempFile = File.createTempFile(PREFIX, SUFFIX); + localfileBuffer = new LocalFileBuffer(tempFile); + } catch (IOException e) { + LOG.error("Unable to create temp file ", e); + } + return localfileBuffer; + } +} diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/accumulator/LocalFileBufferFactoryTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/accumulator/LocalFileBufferFactoryTest.java new file mode 100644 index 0000000000..dc2d3b0432 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/accumulator/LocalFileBufferFactoryTest.java @@ -0,0 +1,28 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.accumulator; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.MatcherAssert.assertThat; + +class LocalFileBufferFactoryTest { + @Test + void test_localFileBufferFactory_notNull() { + LocalFileBufferFactory localFileBufferFactory = new LocalFileBufferFactory(); + Assertions.assertNotNull(localFileBufferFactory); + } + + @Test + void test_buffer_notNull() { + LocalFileBufferFactory localFileBufferFactory = new LocalFileBufferFactory(); + Assertions.assertNotNull(localFileBufferFactory); + Buffer buffer = localFileBufferFactory.getBuffer(); + Assertions.assertNotNull(buffer); + assertThat(buffer, instanceOf(LocalFileBuffer.class)); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/accumulator/LocalFileBufferTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/accumulator/LocalFileBufferTest.java new file mode 100644 index 0000000000..65ad70963d --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/accumulator/LocalFileBufferTest.java @@ -0,0 +1,125 @@ +package org.opensearch.dataprepper.plugins.sink.accumulator; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import java.io.File; +import java.io.IOException; +import java.util.UUID; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.verify; + +@ExtendWith(MockitoExtension.class) +class LocalFileBufferTest { + + public static final String BUCKET_NAME = UUID.randomUUID().toString(); + public static final String KEY = UUID.randomUUID().toString() + ".log"; + public static final String PREFIX = "local"; + public static final String SUFFIX = ".log"; + @Mock + private S3Client s3Client; + private LocalFileBuffer localFileBuffer; + private File tempFile; + + @BeforeEach + void setUp() throws IOException { + tempFile = File.createTempFile(PREFIX, SUFFIX); + localFileBuffer = new LocalFileBuffer(tempFile); + } + + @Test + void test_with_write_events_into_buffer() throws IOException { + while (localFileBuffer.getEventCount() < 55) { + localFileBuffer.writeEvent(generateByteArray()); + } + assertThat(localFileBuffer.getSize(), greaterThan(1l)); + assertThat(localFileBuffer.getEventCount(), equalTo(55)); + assertThat(localFileBuffer.getDuration(), equalTo(0L)); + localFileBuffer.flushAndCloseStream(); + localFileBuffer.removeTemporaryFile(); + assertFalse(tempFile.exists(), "The temp file has not been deleted."); + } + + @Test + void test_without_write_events_into_buffer() { + assertThat(localFileBuffer.getSize(), equalTo(0L)); + assertThat(localFileBuffer.getEventCount(), equalTo(0)); + assertThat(localFileBuffer.getDuration(), equalTo(0L)); + localFileBuffer.flushAndCloseStream(); + localFileBuffer.removeTemporaryFile(); + assertFalse(tempFile.exists(), "The temp file has not been deleted."); + } + + @Test + void test_with_write_events_into_buffer_and_flush_toS3() throws IOException { + while (localFileBuffer.getEventCount() < 55) { + localFileBuffer.writeEvent(generateByteArray()); + } + assertThat(localFileBuffer.getSize(), greaterThan(1l)); + assertThat(localFileBuffer.getEventCount(), equalTo(55)); + assertThat(localFileBuffer.getDuration(), greaterThanOrEqualTo(0L)); + + assertDoesNotThrow(() -> { + localFileBuffer.flushToS3(s3Client, BUCKET_NAME, KEY); + }); + + ArgumentCaptor putObjectRequestArgumentCaptor = ArgumentCaptor.forClass(PutObjectRequest.class); + verify(s3Client).putObject(putObjectRequestArgumentCaptor.capture(), any(RequestBody.class)); + PutObjectRequest actualRequest = putObjectRequestArgumentCaptor.getValue(); + + assertThat(actualRequest, notNullValue()); + assertThat(actualRequest.bucket(), equalTo(BUCKET_NAME)); + assertThat(actualRequest.key(), equalTo(KEY)); + assertThat(actualRequest.expectedBucketOwner(), nullValue()); + + assertFalse(tempFile.exists(), "The temp file has not been deleted."); + } + + @Test + void test_uploadedToS3_success() { + Assertions.assertNotNull(localFileBuffer); + assertDoesNotThrow(() -> { + localFileBuffer.flushToS3(s3Client, BUCKET_NAME, KEY); + }); + + ArgumentCaptor putObjectRequestArgumentCaptor = ArgumentCaptor.forClass(PutObjectRequest.class); + verify(s3Client).putObject(putObjectRequestArgumentCaptor.capture(), any(RequestBody.class)); + PutObjectRequest actualRequest = putObjectRequestArgumentCaptor.getValue(); + + assertThat(actualRequest, notNullValue()); + assertThat(actualRequest.bucket(), equalTo(BUCKET_NAME)); + assertThat(actualRequest.key(), equalTo(KEY)); + assertThat(actualRequest.expectedBucketOwner(), nullValue()); + + assertFalse(tempFile.exists(), "The temp file has not been deleted."); + } + + @AfterEach + void cleanup() { + tempFile.deleteOnExit(); + } + + private byte[] generateByteArray() { + byte[] bytes = new byte[1000]; + for (int i = 0; i < 1000; i++) { + bytes[i] = (byte) i; + } + return bytes; + } +} \ No newline at end of file