Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Created "s3-sink" plugin. github-issue#1048 #2362

Closed
wants to merge 2 commits into from

Conversation

deepaksahu562
Copy link
Contributor

@deepaksahu562 deepaksahu562 commented Mar 3, 2023

Description

  • Created "s3-sink" plugin.
  • Configurations for the bucket name, key path and key pattern.
  • The key pattern support timestamps such as -logs-${YYYY.mm}-${uniqueId}. Collection of objects from Buffer and store it in RAM/Local file.

Issues Resolved

Github issue: #1048

Check List

  • New functionality "s3-sink" plugin.
  • New functionality has been documented.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@deepaksahu562 deepaksahu562 requested a review from a team as a code owner March 3, 2023 16:29
@@ -0,0 +1,9 @@
Attached simple-sample-pipeline: opensearch-data-prepper\data-prepper\data-prepper-plugins\s3-sink\src\main\resources\pipelines.yaml
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please look at other README.md files for the format of this file. And modify this file to be similar to those files.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @kkondaka for your review. I have modified README.md file as per other existing file format.

/**
* Validate the index with the regular expression pattern. Throws exception if validation fails
*/
public static DateTimeFormatter getDatePatternFormatter(final String indexAlias) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this is similar to the code in the OpenSearchSink code. Could we move this to some common library and use it in both places?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @kkondaka for your review. I have moved this code into common library.

.region(s3SinkConfig.getAwsAuthenticationOptions().getAwsRegion())
.credentialsProvider(s3SinkConfig.getAwsAuthenticationOptions().authenticateAwsConfiguration())
.overrideConfiguration(ClientOverrideConfiguration.builder()
.retryPolicy(RetryPolicy.builder().numRetries(5).build())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

number of retries should be configurable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Configured in yaml.

/**
* Read the event count configuration
*/
public int getEventCount() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you are expecting the event_count to be a multiple of 100, add the check for that here like if (eventCount % 100 != 0) throw new RuntimeException("...."

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code has been refactored, we are not using multiple of 100 for now. however check applied.

/**
* {@link SinkAccumulator} Accumulate buffer records
*/
public interface SinkAccumulator {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is this being used? the inMemoryAccumulator and localFileAccumulator should implement this interface

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed this interface, not being used.

// For a string like "data-prepper-${yyyy-MM-dd}", "${yyyy-MM-dd}" is matched.
private static final String TIME_PATTERN_REGULAR_EXPRESSION = "\\$\\{.*?\\}";

// For matching a string enclosed by "%{" and "}".
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment says "%{" but the code has "${". If we want to be consistent with Opensearch Sink, "%{" is more accurate. Not sure what was decided but make sure at the minimum comment and the code are consistent. I think it would be better to "%{" just like OpenSearch Sink for consistency across different sink configutations. But if there is a good reason to use "${", I am OK.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modified the comment as "%{".

* Validate the index with the regular expression pattern. Throws exception if
* validation fails
*/
public static DateTimeFormatter getDatePatternFormatter(final String indexAlias) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have also previously commented that it might be good idea to move these to a common api directory so that same code can be used by OpenSearchSink code. Changes to OpenSearchSink will not be your responsibility.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As per David's suggestion, we have moved to a common package inside data-prepper-plugins instead of the common api directory.


- `max_connection_retries` (Optional) : An integer value indicates the maximum number of times that a single request should be retried in-order to connect s3 client.

- `max_upload_retries` (Optional) : An integer value indicates the maximum number of times that single request should be retired in-order to ingest data to amazon s3.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is a default value it should be specified here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the default values.


- `aws_sts_header_overrides` (Optional) : An optional map of header overrides to make when assuming the IAM role for the sink plugin.

- `max_connection_retries` (Optional) : An integer value indicates the maximum number of times that a single request should be retried in-order to connect s3 client.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is a default value, it should be specified here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the default values.

temporary_storage: local_file
```

## Configuration
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For all the optional fields, if there is a default value, it should be mentioned.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the default values.

@@ -0,0 +1,74 @@
/*
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The directory path for this file has dataprepper/plugins/sink/configuration, Is this code not valid for source S3 configurations? Except possibly for "read" vs "write" permissions, I am assuming most of this code would be common for reading from S3 or writing S3. Correct me if that's not the case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this code is common for s3-source and s3-sink. Please let us know whether we can move to a common package under data-prepper-plugins.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we need to move this presently. I am working on an alternative solution to this duplication.

import jakarta.validation.constraints.NotNull;

/**
* An implementation class of Threshold configuration Options
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this comment accurate? It looks like this file has file patten configuration.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modified the comment based on the respective functionality.

private int eventCount;

@JsonProperty("maximum_size")
private String maximumSize = DEFAULT_BYTE_CAPACITY;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this have min/max values to be checked against?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no annotation to check the byte-size min and max size. As per David's suggestion, we have used the existing ByteCount, and validations are taken care of in the ByteCount.parse method.

}

@Test
void test_s3_Sink_plugin_isReady_ture() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. typo. test_s3_Sink_plugin_isReady_ture -> test_s3_Sink_plugin_isReady_true

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rectified as test_s3_sink_plugin_isReady_true

S3SinkWorker worker = new S3SinkWorker(s3Client, s3SinkConfig, codec, codecFileExtension);
BufferAccumulator bufferAccumulator = worker.bufferAccumulator(generateEventQueue());
bufferAccumulator.doAccumulate();
assertNotNull(bufferAccumulator);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you should verify the contents of the buffer accumulator.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doAccumulate() does not have any return type rather than calling localfileProcessor() and inMemoryProcessor().

Copy link
Member

@dlvenable dlvenable left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for this contribution! I've made a few suggestions to the configurations and to help with some class design.

I hope to take a further look at the S3 sink logic later.

@@ -0,0 +1,74 @@
/*
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we need to move this presently. I am working on an alternative solution to this duplication.

* An implementation class of file pattern configuration Options
*/
public class ObjectOptions {
private static final String DEFAULT_KEY_PATTERN = "logs-${yyyy-MM-dd hh:mm:ss}";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use a default value without spaces. So add a T instead of a space. This is inline with the ISO-8601 standard.

Also, I think we might not want logs in the name. Maybe "events" instead.

events-${yyyy-MM-ddThh:mm:ss}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updaded name_pattern as per ISO-8601 standard as below.
name_pattern: event-%{yyyy-MM-dd'Thh-mm-ss'}

private String maximumSize = DEFAULT_BYTE_CAPACITY;

@JsonProperty("event_collection_duration")
@DurationMin(seconds = 1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice use of annotations!

private PluginModel codec;

@JsonProperty("temporary_storage")
private String temporaryStorage = DEFAULT_STORAGE;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be an enum. See the CompressionOption enum for an example.

https://github.com/opensearch-project/data-prepper/blob/main/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/configuration/CompressionOption.java

And here is an example of it being used:

@JsonProperty("compression")
private CompressionOption compression = CompressionOption.NONE;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved.

@NotNull
private PluginModel codec;

@JsonProperty("temporary_storage")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if there is a better name here. Perhaps buffer_type? Or event_buffer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the code with buffer_type

assertNull(s3SinkConfig.getCodec());
}

private void reflectivelySetField(final S3SinkConfig s3SinkConfig, final String fieldName, final Object value)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As per updated code, the above ReflectivelySetField is not required.

eventData.put("key2", "value");
final JacksonEvent event = JacksonLog.builder().withData(eventData).withEventType("LOG").build();
String output = createObjectUnderTest().parse(event);
assertNotNull(output);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also verify the actual object.

I think the best way in this scenario is to have an ObjectMapper in the test. Then you can deserialize the output and verify it has the same data.

Map deserializedData = objectMapper.readValue(output, Map.class);
assertThat(deserializedData.get("key1"), equalTo(value1));
assertThat(deserializedData..get("key2"), equalTo(value2));
// maybe some more assertions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved and added more assertions.

void parse_with_events_outputstream_json_codec(final int numberOfObjects) throws IOException {

final Map<String, String> eventData = new HashMap<>();
eventData.put("key1", "value");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please create random and distinct values for both of these keys.

String value1 = UUID.randomUUID().toString();
eventData.put("key1", value1);
String value2 = UUID.randomUUID().toString();
eventData.put("key2", value2);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the code as per the above commet.

private String bucketName;

@JsonProperty("key_path_prefix")
private String keyPathPrefix;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's move this property into the ObjectOptions configuration.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved the path_prefix property to the ObjectKeyOptions under BucketOptions as per latest YAML.

/**
* Store event data into local file
*/
private void localFileProcessor() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using the strategy pattern to decide how to handle local file versus in-memory can help make the code cleaner. You can have independent tests for each strategy.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved.


private static final Logger LOG = LoggerFactory.getLogger(S3SinkService.class);
private final S3SinkConfig s3SinkConfig;
private static final int EVENT_QUEUE_SIZE = 100000;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure about this. @dlvenable could you clarify?


if (retryCount == 0) {
LOG.warn("Maximum retry count reached, Unable to store {} into amazon S3 bucket", s3ObjectFileName);
break;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should do return false here otherwise the "unable to upload" message would be printed twice

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved.

* S3 {@link ObjectKeyOptions} configuration Options.
*/
public ObjectKeyOptions getObjectKeyOptions() {
if (objectKeyOptions == null)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks wrong. I think you meant

if (objectKeyOptions == null) {
    objectKeyOptions = new ObjectKeyOptions();
}
return objectKeyOptions;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved.

s3SinkService = mock(S3SinkService.class);
Collection<Record<Event>> records = generateRandomStringEventRecord();
s3SinkService.accumulateBufferEvents(worker);
verify(s3SinkService, atLeastOnce()).accumulateBufferEvents(worker);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if this is right check. accumualteBufferEvents is called above and verify that it is called at least once here seems unnecessary. You have to verify what logic took place inside accumualteBufferEvents and if that happened correctly. Please validate the functionality of the functions involved.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Verifid the logic of accumualteBufferEvents.

s3Sink = new S3Sink(pluginSetting, s3SinkConfig, pluginFactory);
s3Sink.doInitialize();
s3Sink.doOutput(generateRandomStringEventRecord());
assertNotNull(s3Sink);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please validate if doOutput() worked as expected.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is very important.

You should be able to perform the following:

InOrder inOrder = inOrder(s3SinkService);
inOrder.verify(s3SinkService).processRecords(records);
inOrder.verify(s3SinkService).accumulateBufferEvents(sinkWorker);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated test-cases using inOrder and verified.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you uploaded new diffs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved.

/**
* Class responsible for creation of s3 key pattern based on date time stamp
*/
public class S3ObjectIndex {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems this code was moved along with items that are probably beyond the scope of what it should include.

The OpenSearch sink deals with indexes. The S3 sink will deal with objects. Indexes and objects have different constraints and there is no relationship between the two.

The common code is the ability to parse the date-time information. This is the part which can be made into a common utility.

Here is some pseudo-code for how this might look.

OpenSearch:

String unparsedIndexName = getIndexName();  // This might look like logs-${mm.dd}
String parsedIndexName = DestinationExpressionEvaluator.parseExpression(unparsedIndexName);  // Now this looks like logs-04.04
validateIndexName(parsedIndexName);
createIndex(parsedIndexName);

S3 sink

String unparsedObjectName = getObject();  // This might look like logs-${mm}-${dd}.json
String parsedObjectName = DestinationExpressionEvaluator.parseExpression(unparsedObjectName);  // Now this looks like logs-04-04.json
validateS3ObjectName(parsedObjectName);
createIndex(parsedObjectName);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modified the functions name based on the date time stamp of the object.

boolean isFileUploadedToS3 = Boolean.FALSE;
String s3ObjectPath = buildingPathPrefix(s3SinkConfig);
String s3ObjectFileName = S3ObjectIndex.getIndexAliasWithDate(
s3SinkConfig.getBucketOptions().getObjectKeyOptions().getNamePattern()) + "." + codecFileExtension;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the name should be WYSIWYG.

So the pipeline author needs to specify the .${extension}. You can then expand that variable - ${extension} into the correct value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modified the code as below:
S3Sink object name extension will be based on the user provided extension in "name_pattern".
The default file extension is json if user does not provide any extension.

* An implementation class of path prefix and file pattern configuration Options
*/
public class ObjectKeyOptions {
private static final String DEFAULT_FILE_PATTERN = "events-%{yyyy-MM-dd'T'hh-mm-ss}";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be events-%{yyyy-MM-dd'T'hh-mm-ss}-%{epochSeconds}-%{random}.%{extension}.

Either that, or we should rename the property to be clearer that this is only part of the name.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modifid the property name from DEFAULT_FILE_PATTERN to DEFAULT_NAME_PATTERN.

s3Sink = new S3Sink(pluginSetting, s3SinkConfig, pluginFactory);
s3Sink.doInitialize();
s3Sink.doOutput(generateRandomStringEventRecord());
assertNotNull(s3Sink);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is very important.

You should be able to perform the following:

InOrder inOrder = inOrder(s3SinkService);
inOrder.verify(s3SinkService).processRecords(records);
inOrder.verify(s3SinkService).accumulateBufferEvents(sinkWorker);

/**
* Interface for building buffer types.
*/
public interface BufferType {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class should be unit tested.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added Junit test-cases for BufferType interface.

@Test
void test_in_memoryAccumulate_with_s3Upoload_success() throws InterruptedException {
S3SinkService s3SinkService = new S3SinkService(s3SinkConfig);
S3Client s3Client = s3SinkService.createS3Client();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should use a mock object here. This will let you verify the interactions with the S3Client.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mocked the s3SinkService and verified the interactions with the S3Client.

@Test
void test_local_file_accumulate_with_s3Upoload_success() throws InterruptedException {
S3SinkService s3SinkService = new S3SinkService(s3SinkConfig);
S3Client s3Client = s3SinkService.createS3Client();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, this should be a mocked object. You can then verify the interactions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mocked the s3SinkService and verified the interactions.

* @throws InterruptedException
*/
public boolean inMemoryAccumulate(final NavigableSet<String> bufferedEventSet) throws InterruptedException {
String s3ObjectPath = buildingPathPrefix(s3SinkConfig);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some of this code is duplicated with the LocalFileBuffer. This indicates there should be some refactoring.

Additionally, the decision for building the path is completely independent of the buffering logic. It should probably be moved into its own class which can get the whole object path. Such a small, focused class would be quite easy to unit test as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added new class ObjectKey for the object path constuction functionality.

@dlvenable dlvenable self-assigned this Apr 12, 2023

String expectedIndex = S3ObjectIndex.getObjectNameWithDateTimeId("events-%{yyyy-MM-dd}");
String actualIndex = S3ObjectIndex.getObjectNameWithDateTimeId("events-%{yyyy-MM-dd}");
assertNotEquals(actualIndex.toString(), expectedIndex.toString());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this assertNotEquals()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As per requirement, EPOCH SECONDS and UniqueId are appended for the name_pattern.
assertNotEquals are added since both the responses are not equal.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean it would be expected String + EPOCH seconds + unique id? In that case, can we compare that expected String is in the actualIndex.toString()?

void test_call_createS3Client() {
S3SinkService s3SinkService = new S3SinkService(s3SinkConfig);
S3Client s3Client = s3SinkService.createS3Client();
assertNotNull(s3Client);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to assert() more here to make sures3SinkService is created properly and s3Client is properly created with the expected config.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes will be addressed as incremental changes, and they will be available in a separate PR.

s3SinkService = mock(S3SinkService.class);
Collection<Record<Event>> records = generateRandomStringEventRecord();
s3SinkService.processRecords(records);
verify(s3SinkService, atLeastOnce()).processRecords(records);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to assert that processRecords worked as expected. In this case may be you can check to see that event queue has expected number of records in it. You can add an protected API to S3SinkService to return the size of eventQueue to do this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes will be addressed as incremental changes, and they will be available in a separate PR.

S3SinkWorker worker = mock(S3SinkWorker.class);
service.processRecords(records);
service.accumulateBufferEvents(worker);
verify(worker, only()).bufferAccumulator(any(BlockingQueue.class));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here. You can check to the eventQueue is empty after doing accumulateBufferEvents

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes will be addressed as incremental changes, and they will be available in a separate PR.

s3Sink = new S3Sink(pluginSetting, s3SinkConfig, pluginFactory);
s3Sink.doInitialize();
s3Sink.doOutput(new ArrayList<>());
assertNotNull(s3Sink);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please do not do this here. s3Sink is created in 109 and even if it were null, it would have crashed in line 110. You should verify the metrics that s3Sink has actually did output of 0 records here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved.

s3Sink = new S3Sink(pluginSetting, s3SinkConfig, pluginFactory);
s3Sink.doInitialize();
s3Sink.doOutput(generateRandomStringEventRecord());
assertNotNull(s3Sink);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved.

s3Sink = new S3Sink(pluginSetting, s3SinkConfig, pluginFactory);
s3Sink.doInitialize();
s3Sink.doOutput(generateRandomStringEventRecord());
assertNotNull(s3Sink);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you uploaded new diffs?


private static final Logger LOG = LoggerFactory.getLogger(S3SinkService.class);
private final S3SinkConfig s3SinkConfig;
private static final int EVENT_QUEUE_SIZE = 100000;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It appears that you are putting a hard-coded restriction on the number of events to 100,000 here. This will put an upper limit on the ArrayBlockingQueue.

I believe this will conflict with some user configuration values.

See ThresholdOptions line 24, where we allow a maximum event count of 10,000,000. So if the user sets this value, won't the ArrayBlockingQueue fill up?

Also, what the user is using bytes instead and it exceeds this value of 100,000?

private static final Logger LOG = LoggerFactory.getLogger(S3SinkService.class);
private final S3SinkConfig s3SinkConfig;
private static final int EVENT_QUEUE_SIZE = 100000;
private BlockingQueue<Event> eventQueue;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are use using a BlockingQueue here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes will be addressed as incremental changes, and they will be available in a separate PR.

"Snapshot info : Byte_capacity = {} Bytes, Event_count = {} Records & Event_collection_duration = {} Sec",
byteCount, eventCount, eventCollectionDuration);

if (s3SinkConfig.getBufferType().equals(BufferTypeOptions.LOCALFILE)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Every time the Data Prepper pipeline sends events to the sink, it calls doOutput on the S3Sink class. This in turn, adds events to a BlockingQueue and then calls this method.

This method is then taking events from the BlockingQueue, adds them to a new NavigableSet. After that it, it creates the buffer (local-file or in-memory). Then it calls the "accumulate" methods.

These methods then immediately send to S3. So there is no buffer accumulation. The idea behind the accumulation is to keep data from multiple calls to doOutput. Otherwise, the user may get very small S3 objects.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes will be addressed as incremental changes, and they will be available in a separate PR.


StringBuilder eventBuilder = new StringBuilder();
for (String event : bufferedEventSet) {
eventBuilder.append(event);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please provide event.toJsonString() here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are already applying toJsonString() jsonCodec parse method before control comes to this file.

File file = new File(s3ObjectFileName);
try (BufferedWriter eventWriter = new BufferedWriter(new FileWriter(s3ObjectFileName))) {
for (String event : bufferedEventSet) {
eventWriter.write(event);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please provide event.toJsonString() here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are already applying toJsonString() jsonCodec parse method before control comes to this file.

public void bufferAccumulator(BlockingQueue<Event> eventQueue) {
boolean isFileUploadedToS3 = Boolean.FALSE;
DB eventDb = null;
NavigableSet<String> bufferedEventSet = null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this a NavigableSet? I'm not even sure you need a collection.

I suggested elsewhere that you pass in a List<Event> to this method. Just provide that List to the accumulators. Those accumulators can then iterate over the lists and add each Event into their respective buffers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes will be addressed as incremental changes, and they will be available in a separate PR.

try {
StopWatch watch = new StopWatch();
watch.start();
eventDb = DBMaker.memoryDB().make();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we have an eventDb here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes will be addressed as incremental changes, and they will be available in a separate PR.

*/
public enum BufferTypeOptions {

INMEMORY("in_memory", new InMemoryBuffer()),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These should be IN_MEMORY and LOCAL_FILE. You can use your IDE to rename these easily.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Followed the same notation (lower-case) similar to s3-source CompressionOption.

* An implementation class of path prefix and file pattern configuration Options
*/
public class ObjectKeyOptions {
private static final String DEFAULT_OBJECT_NAME_PATTERN = "events-%{yyyy-MM-dd'T'hh-mm-ss}";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This value should be:

events-%{yyyy-MM-dd'T'hh-mm-ss}-${epochSeconds}-${uniqueId}.${extension}

I think there may be some broad issues here about format strings as well.

So, my suggestion is that what we do for now is remove the name_pattern option from customers. We can consider making this configurable later. For this PR, please always create the pattern as the following.

events-%{yyyy-MM-dd'T'hh-mm-ss}-${epochSeconds}-${uniqueId}.${extension}

The epochSeconds will the the same time as the string in yyyy-MM-dd'T'hh-mm-ss, but represented as seconds since the 1970 epoch. The unique Id is generated once per Data Prepper instance. And the extension comes from the codec.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved.

private String pathPrefix;

@JsonProperty("name_pattern")
private String namePattern = DEFAULT_OBJECT_NAME_PATTERN;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As noted above, let's not make this configurable for now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved.

@JsonProperty("path_prefix")
private String pathPrefix;

@JsonProperty("name_pattern")
Copy link
Member

@dlvenable dlvenable Apr 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not expose this as a feature for pipeline authors yet.

  1. Remove these two lines.
@JsonProperty("name_pattern")
private String namePattern = DEFAULT_OBJECT_NAME_PATTERN;
  1. Change getNamePattern to this:
public String getNamePattern() {
  return DEFAULT_OBJECT_NAME_PATTERN;
}

These changes should remove this as a user configuration. And you won't have to change the code that uses this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved.

Signed-off-by: Deepak Sahu <deepak.sahu562@gmail.com>
Signed-off-by: Deepak Sahu <deepak.sahu562@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants