Skip to content

Commit

Permalink
Github-issue#1048 : s3-sink with local-file buffer implementation. (#…
Browse files Browse the repository at this point in the history
…2645)

* GitHub-issue#1048 : Rebase the code from DP main branch.

Signed-off-by: Deepak Sahu <deepak.sahu562@gmail.com>

* GitHub-issue#1048

Signed-off-by: Deepak Sahu <deepak.sahu562@gmail.com>

* GitHub-issue#1048

Signed-off-by: Deepak Sahu <deepak.sahu562@gmail.com>

* GitHub-issue#1048 : Incorporated review comments.

Signed-off-by: Deepak Sahu <deepak.sahu562@gmail.com>

* GitHub-issue#1048 : Incorporated review comments.

Signed-off-by: Deepak Sahu <deepak.sahu562@gmail.com>

* GitHub-issue#1048 : Incorporated review comments.

Signed-off-by: Deepak Sahu <deepak.sahu562@gmail.com>

---------

Signed-off-by: Deepak Sahu <deepak.sahu562@gmail.com>
  • Loading branch information
deepaksahu562 committed May 16, 2023
1 parent b64e7e7 commit 9c7b3ec
Show file tree
Hide file tree
Showing 6 changed files with 309 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
import org.opensearch.dataprepper.model.sink.AbstractSink;
import org.opensearch.dataprepper.model.sink.Sink;
import org.opensearch.dataprepper.plugins.sink.accumulator.BufferFactory;
import org.opensearch.dataprepper.plugins.sink.accumulator.BufferTypeOptions;
import org.opensearch.dataprepper.plugins.sink.accumulator.InMemoryBufferFactory;
import org.opensearch.dataprepper.plugins.sink.accumulator.LocalFileBufferFactory;
import org.opensearch.dataprepper.plugins.sink.codec.Codec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -52,7 +54,11 @@ public S3Sink(final PluginSetting pluginSetting, final S3SinkConfig s3SinkConfig
codec = pluginFactory.loadPlugin(Codec.class, codecPluginSettings);
sinkInitialized = Boolean.FALSE;

bufferFactory = new InMemoryBufferFactory();
if (s3SinkConfig.getBufferType().equals(BufferTypeOptions.LOCALFILE)) {
bufferFactory = new LocalFileBufferFactory();
} else {
bufferFactory = new InMemoryBufferFactory();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,20 @@
*/
public enum BufferTypeOptions {

INMEMORY("in_memory", new InMemoryBuffer());
INMEMORY("in_memory", new InMemoryBufferFactory()),
LOCALFILE("local_file", new LocalFileBufferFactory());

private final String option;
private final Buffer bufferType;
private final BufferFactory bufferType;
private static final Map<String, BufferTypeOptions> OPTIONS_MAP = Arrays.stream(BufferTypeOptions.values())
.collect(Collectors.toMap(value -> value.option, value -> value));

BufferTypeOptions(final String option, final Buffer bufferType) {
BufferTypeOptions(final String option, final BufferFactory bufferType) {
this.option = option.toLowerCase();
this.bufferType = bufferType;
}

public Buffer getBufferType() {
public BufferFactory getBufferType() {
return bufferType;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.accumulator;

import org.apache.commons.lang3.time.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.concurrent.TimeUnit;

/**
* A buffer can hold local file data and flushing it to S3.
*/
public class LocalFileBuffer implements Buffer {

private static final Logger LOG = LoggerFactory.getLogger(LocalFileBuffer.class);
private final OutputStream outputStream;
private int eventCount;
private final StopWatch watch;
private final File localFile;

LocalFileBuffer(File tempFile) throws FileNotFoundException {
localFile = tempFile;
outputStream = new BufferedOutputStream(new FileOutputStream(tempFile));
eventCount = 0;
watch = new StopWatch();
watch.start();
}

@Override
public long getSize() {
try {
outputStream.flush();
} catch (IOException e) {
LOG.error("An exception occurred while flushing data to buffered output stream :", e);
}
return localFile.length();
}

@Override
public int getEventCount() {
return eventCount;
}

@Override
public long getDuration(){
return watch.getTime(TimeUnit.SECONDS);
}

/**
* Upload accumulated data to amazon s3.
* @param s3Client s3 client object.
* @param bucket bucket name.
* @param key s3 object key path.
*/
@Override
public void flushToS3(S3Client s3Client, String bucket, String key) {
flushAndCloseStream();
s3Client.putObject(
PutObjectRequest.builder().bucket(bucket).key(key).build(),
RequestBody.fromFile(localFile));
removeTemporaryFile();
}

/**
* write byte array to output stream.
* @param bytes byte array.
* @throws IOException while writing to output stream fails.
*/
@Override
public void writeEvent(byte[] bytes) throws IOException {
outputStream.write(bytes);
outputStream.write(System.lineSeparator().getBytes());
eventCount++;
}

/**
* Flushing the buffered data into the output stream.
*/
protected void flushAndCloseStream(){
try {
outputStream.flush();
outputStream.close();
} catch (IOException e) {
LOG.error("An exception occurred while flushing data to buffered output stream :", e);
}
}

/**
* Remove the local temp file after flushing data to s3.
*/
protected void removeTemporaryFile() {
if (localFile != null) {
try {
Files.deleteIfExists(Paths.get(localFile.toString()));
} catch (IOException e) {
LOG.error("Unable to delete Local file {}", localFile, e);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.accumulator;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;

public class LocalFileBufferFactory implements BufferFactory {

private static final Logger LOG = LoggerFactory.getLogger(LocalFileBufferFactory.class);
public static final String PREFIX = "local";
public static final String SUFFIX = ".log";
@Override
public Buffer getBuffer() {
File tempFile = null;
Buffer localfileBuffer = null;
try {
tempFile = File.createTempFile(PREFIX, SUFFIX);
localfileBuffer = new LocalFileBuffer(tempFile);
} catch (IOException e) {
LOG.error("Unable to create temp file ", e);
}
return localfileBuffer;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.accumulator;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;

class LocalFileBufferFactoryTest {
@Test
void test_localFileBufferFactory_notNull() {
LocalFileBufferFactory localFileBufferFactory = new LocalFileBufferFactory();
Assertions.assertNotNull(localFileBufferFactory);
}

@Test
void test_buffer_notNull() {
LocalFileBufferFactory localFileBufferFactory = new LocalFileBufferFactory();
Assertions.assertNotNull(localFileBufferFactory);
Buffer buffer = localFileBufferFactory.getBuffer();
Assertions.assertNotNull(buffer);
assertThat(buffer, instanceOf(LocalFileBuffer.class));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package org.opensearch.dataprepper.plugins.sink.accumulator;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import java.io.File;
import java.io.IOException;
import java.util.UUID;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.verify;

@ExtendWith(MockitoExtension.class)
class LocalFileBufferTest {

public static final String BUCKET_NAME = UUID.randomUUID().toString();
public static final String KEY = UUID.randomUUID().toString() + ".log";
public static final String PREFIX = "local";
public static final String SUFFIX = ".log";
@Mock
private S3Client s3Client;
private LocalFileBuffer localFileBuffer;
private File tempFile;

@BeforeEach
void setUp() throws IOException {
tempFile = File.createTempFile(PREFIX, SUFFIX);
localFileBuffer = new LocalFileBuffer(tempFile);
}

@Test
void test_with_write_events_into_buffer() throws IOException {
while (localFileBuffer.getEventCount() < 55) {
localFileBuffer.writeEvent(generateByteArray());
}
assertThat(localFileBuffer.getSize(), greaterThan(1l));
assertThat(localFileBuffer.getEventCount(), equalTo(55));
assertThat(localFileBuffer.getDuration(), equalTo(0L));
localFileBuffer.flushAndCloseStream();
localFileBuffer.removeTemporaryFile();
assertFalse(tempFile.exists(), "The temp file has not been deleted.");
}

@Test
void test_without_write_events_into_buffer() {
assertThat(localFileBuffer.getSize(), equalTo(0L));
assertThat(localFileBuffer.getEventCount(), equalTo(0));
assertThat(localFileBuffer.getDuration(), equalTo(0L));
localFileBuffer.flushAndCloseStream();
localFileBuffer.removeTemporaryFile();
assertFalse(tempFile.exists(), "The temp file has not been deleted.");
}

@Test
void test_with_write_events_into_buffer_and_flush_toS3() throws IOException {
while (localFileBuffer.getEventCount() < 55) {
localFileBuffer.writeEvent(generateByteArray());
}
assertThat(localFileBuffer.getSize(), greaterThan(1l));
assertThat(localFileBuffer.getEventCount(), equalTo(55));
assertThat(localFileBuffer.getDuration(), greaterThanOrEqualTo(0L));

assertDoesNotThrow(() -> {
localFileBuffer.flushToS3(s3Client, BUCKET_NAME, KEY);
});

ArgumentCaptor<PutObjectRequest> putObjectRequestArgumentCaptor = ArgumentCaptor.forClass(PutObjectRequest.class);
verify(s3Client).putObject(putObjectRequestArgumentCaptor.capture(), any(RequestBody.class));
PutObjectRequest actualRequest = putObjectRequestArgumentCaptor.getValue();

assertThat(actualRequest, notNullValue());
assertThat(actualRequest.bucket(), equalTo(BUCKET_NAME));
assertThat(actualRequest.key(), equalTo(KEY));
assertThat(actualRequest.expectedBucketOwner(), nullValue());

assertFalse(tempFile.exists(), "The temp file has not been deleted.");
}

@Test
void test_uploadedToS3_success() {
Assertions.assertNotNull(localFileBuffer);
assertDoesNotThrow(() -> {
localFileBuffer.flushToS3(s3Client, BUCKET_NAME, KEY);
});

ArgumentCaptor<PutObjectRequest> putObjectRequestArgumentCaptor = ArgumentCaptor.forClass(PutObjectRequest.class);
verify(s3Client).putObject(putObjectRequestArgumentCaptor.capture(), any(RequestBody.class));
PutObjectRequest actualRequest = putObjectRequestArgumentCaptor.getValue();

assertThat(actualRequest, notNullValue());
assertThat(actualRequest.bucket(), equalTo(BUCKET_NAME));
assertThat(actualRequest.key(), equalTo(KEY));
assertThat(actualRequest.expectedBucketOwner(), nullValue());

assertFalse(tempFile.exists(), "The temp file has not been deleted.");
}

@AfterEach
void cleanup() {
tempFile.deleteOnExit();
}

private byte[] generateByteArray() {
byte[] bytes = new byte[1000];
for (int i = 0; i < 1000; i++) {
bytes[i] = (byte) i;
}
return bytes;
}
}

0 comments on commit 9c7b3ec

Please sign in to comment.