-
Notifications
You must be signed in to change notification settings - Fork 188
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Created "s3-sink" plugin. github-issue#1048 #2362
Conversation
@@ -0,0 +1,9 @@ | |||
Attached simple-sample-pipeline: opensearch-data-prepper\data-prepper\data-prepper-plugins\s3-sink\src\main\resources\pipelines.yaml |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please look at other README.md files for the format of this file. And modify this file to be similar to those files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, @kkondaka for your review. I have modified README.md file as per other existing file format.
/** | ||
* Validate the index with the regular expression pattern. Throws exception if validation fails | ||
*/ | ||
public static DateTimeFormatter getDatePatternFormatter(final String indexAlias) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this is similar to the code in the OpenSearchSink code. Could we move this to some common library and use it in both places?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, @kkondaka for your review. I have moved this code into common library.
.region(s3SinkConfig.getAwsAuthenticationOptions().getAwsRegion()) | ||
.credentialsProvider(s3SinkConfig.getAwsAuthenticationOptions().authenticateAwsConfiguration()) | ||
.overrideConfiguration(ClientOverrideConfiguration.builder() | ||
.retryPolicy(RetryPolicy.builder().numRetries(5).build()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
number of retries should be configurable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Configured in yaml.
/** | ||
* Read the event count configuration | ||
*/ | ||
public int getEventCount() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you are expecting the event_count
to be a multiple of 100, add the check for that here like if (eventCount % 100 != 0) throw new RuntimeException("...."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code has been refactored, we are not using multiple of 100 for now. however check applied.
/** | ||
* {@link SinkAccumulator} Accumulate buffer records | ||
*/ | ||
public interface SinkAccumulator { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where is this being used? the inMemoryAccumulator and localFileAccumulator should implement this interface
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed this interface, not being used.
// For a string like "data-prepper-${yyyy-MM-dd}", "${yyyy-MM-dd}" is matched. | ||
private static final String TIME_PATTERN_REGULAR_EXPRESSION = "\\$\\{.*?\\}"; | ||
|
||
// For matching a string enclosed by "%{" and "}". |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment says "%{" but the code has "${". If we want to be consistent with Opensearch Sink, "%{" is more accurate. Not sure what was decided but make sure at the minimum comment and the code are consistent. I think it would be better to "%{" just like OpenSearch Sink for consistency across different sink configutations. But if there is a good reason to use "${", I am OK.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Modified the comment as "%{".
* Validate the index with the regular expression pattern. Throws exception if | ||
* validation fails | ||
*/ | ||
public static DateTimeFormatter getDatePatternFormatter(final String indexAlias) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have also previously commented that it might be good idea to move these to a common api directory so that same code can be used by OpenSearchSink code. Changes to OpenSearchSink will not be your responsibility.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As per David's suggestion, we have moved to a common package inside data-prepper-plugins instead of the common api directory.
|
||
- `max_connection_retries` (Optional) : An integer value indicates the maximum number of times that a single request should be retried in-order to connect s3 client. | ||
|
||
- `max_upload_retries` (Optional) : An integer value indicates the maximum number of times that single request should be retired in-order to ingest data to amazon s3. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there is a default value it should be specified here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added the default values.
|
||
- `aws_sts_header_overrides` (Optional) : An optional map of header overrides to make when assuming the IAM role for the sink plugin. | ||
|
||
- `max_connection_retries` (Optional) : An integer value indicates the maximum number of times that a single request should be retried in-order to connect s3 client. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there is a default value, it should be specified here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added the default values.
temporary_storage: local_file | ||
``` | ||
|
||
## Configuration |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For all the optional fields, if there is a default value, it should be mentioned.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added the default values.
@@ -0,0 +1,74 @@ | |||
/* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The directory path for this file has dataprepper/plugins/sink/configuration
, Is this code not valid for source S3 configurations? Except possibly for "read" vs "write" permissions, I am assuming most of this code would be common for reading from S3 or writing S3. Correct me if that's not the case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this code is common for s3-source and s3-sink. Please let us know whether we can move to a common package under data-prepper-plugins.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure we need to move this presently. I am working on an alternative solution to this duplication.
import jakarta.validation.constraints.NotNull; | ||
|
||
/** | ||
* An implementation class of Threshold configuration Options |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this comment accurate? It looks like this file has file patten configuration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Modified the comment based on the respective functionality.
private int eventCount; | ||
|
||
@JsonProperty("maximum_size") | ||
private String maximumSize = DEFAULT_BYTE_CAPACITY; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this have min/max values to be checked against?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no annotation to check the byte-size min and max size. As per David's suggestion, we have used the existing ByteCount, and validations are taken care of in the ByteCount.parse method.
} | ||
|
||
@Test | ||
void test_s3_Sink_plugin_isReady_ture() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit. typo. test_s3_Sink_plugin_isReady_ture
-> test_s3_Sink_plugin_isReady_true
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rectified as test_s3_sink_plugin_isReady_true
S3SinkWorker worker = new S3SinkWorker(s3Client, s3SinkConfig, codec, codecFileExtension); | ||
BufferAccumulator bufferAccumulator = worker.bufferAccumulator(generateEventQueue()); | ||
bufferAccumulator.doAccumulate(); | ||
assertNotNull(bufferAccumulator); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you should verify the contents of the buffer accumulator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doAccumulate() does not have any return type rather than calling localfileProcessor() and inMemoryProcessor().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for this contribution! I've made a few suggestions to the configurations and to help with some class design.
I hope to take a further look at the S3 sink logic later.
@@ -0,0 +1,74 @@ | |||
/* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure we need to move this presently. I am working on an alternative solution to this duplication.
* An implementation class of file pattern configuration Options | ||
*/ | ||
public class ObjectOptions { | ||
private static final String DEFAULT_KEY_PATTERN = "logs-${yyyy-MM-dd hh:mm:ss}"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use a default value without spaces. So add a T
instead of a space. This is inline with the ISO-8601 standard.
Also, I think we might not want logs in the name. Maybe "events" instead.
events-${yyyy-MM-ddThh:mm:ss}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updaded name_pattern as per ISO-8601 standard as below.
name_pattern: event-%{yyyy-MM-dd'Thh-mm-ss'}
private String maximumSize = DEFAULT_BYTE_CAPACITY; | ||
|
||
@JsonProperty("event_collection_duration") | ||
@DurationMin(seconds = 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice use of annotations!
private PluginModel codec; | ||
|
||
@JsonProperty("temporary_storage") | ||
private String temporaryStorage = DEFAULT_STORAGE; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be an enum. See the CompressionOption
enum for an example.
And here is an example of it being used:
Lines 29 to 30 in 7bd967a
@JsonProperty("compression") | |
private CompressionOption compression = CompressionOption.NONE; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resolved.
@NotNull | ||
private PluginModel codec; | ||
|
||
@JsonProperty("temporary_storage") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if there is a better name here. Perhaps buffer_type
? Or event_buffer
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the code with buffer_type
assertNull(s3SinkConfig.getCodec()); | ||
} | ||
|
||
private void reflectivelySetField(final S3SinkConfig s3SinkConfig, final String fieldName, final Object value) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Data Prepper has a utility for this. Please use this instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As per updated code, the above ReflectivelySetField is not required.
eventData.put("key2", "value"); | ||
final JacksonEvent event = JacksonLog.builder().withData(eventData).withEventType("LOG").build(); | ||
String output = createObjectUnderTest().parse(event); | ||
assertNotNull(output); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please also verify the actual object.
I think the best way in this scenario is to have an ObjectMapper
in the test. Then you can deserialize the output
and verify it has the same data.
Map deserializedData = objectMapper.readValue(output, Map.class);
assertThat(deserializedData.get("key1"), equalTo(value1));
assertThat(deserializedData..get("key2"), equalTo(value2));
// maybe some more assertions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resolved and added more assertions.
void parse_with_events_outputstream_json_codec(final int numberOfObjects) throws IOException { | ||
|
||
final Map<String, String> eventData = new HashMap<>(); | ||
eventData.put("key1", "value"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please create random and distinct values for both of these keys.
String value1 = UUID.randomUUID().toString();
eventData.put("key1", value1);
String value2 = UUID.randomUUID().toString();
eventData.put("key2", value2);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the code as per the above commet.
private String bucketName; | ||
|
||
@JsonProperty("key_path_prefix") | ||
private String keyPathPrefix; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's move this property into the ObjectOptions
configuration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved the path_prefix property to the ObjectKeyOptions
under BucketOptions
as per latest YAML.
/** | ||
* Store event data into local file | ||
*/ | ||
private void localFileProcessor() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using the strategy pattern to decide how to handle local file versus in-memory can help make the code cleaner. You can have independent tests for each strategy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resolved.
|
||
private static final Logger LOG = LoggerFactory.getLogger(S3SinkService.class); | ||
private final S3SinkConfig s3SinkConfig; | ||
private static final int EVENT_QUEUE_SIZE = 100000; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure about this. @dlvenable could you clarify?
|
||
if (retryCount == 0) { | ||
LOG.warn("Maximum retry count reached, Unable to store {} into amazon S3 bucket", s3ObjectFileName); | ||
break; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should do return false
here otherwise the "unable to upload" message would be printed twice
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resolved.
* S3 {@link ObjectKeyOptions} configuration Options. | ||
*/ | ||
public ObjectKeyOptions getObjectKeyOptions() { | ||
if (objectKeyOptions == null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks wrong. I think you meant
if (objectKeyOptions == null) {
objectKeyOptions = new ObjectKeyOptions();
}
return objectKeyOptions;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resolved.
s3SinkService = mock(S3SinkService.class); | ||
Collection<Record<Event>> records = generateRandomStringEventRecord(); | ||
s3SinkService.accumulateBufferEvents(worker); | ||
verify(s3SinkService, atLeastOnce()).accumulateBufferEvents(worker); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure if this is right check. accumualteBufferEvents
is called above and verify that it is called at least once here seems unnecessary. You have to verify what logic took place inside accumualteBufferEvents
and if that happened correctly. Please validate the functionality of the functions involved.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Verifid the logic of accumualteBufferEvents
.
s3Sink = new S3Sink(pluginSetting, s3SinkConfig, pluginFactory); | ||
s3Sink.doInitialize(); | ||
s3Sink.doOutput(generateRandomStringEventRecord()); | ||
assertNotNull(s3Sink); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please validate if doOutput()
worked as expected.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this is very important.
You should be able to perform the following:
InOrder inOrder = inOrder(s3SinkService);
inOrder.verify(s3SinkService).processRecords(records);
inOrder.verify(s3SinkService).accumulateBufferEvents(sinkWorker);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated test-cases using inOrder and verified.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you uploaded new diffs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resolved.
/** | ||
* Class responsible for creation of s3 key pattern based on date time stamp | ||
*/ | ||
public class S3ObjectIndex { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems this code was moved along with items that are probably beyond the scope of what it should include.
The OpenSearch sink deals with indexes. The S3 sink will deal with objects. Indexes and objects have different constraints and there is no relationship between the two.
The common code is the ability to parse the date-time information. This is the part which can be made into a common utility.
Here is some pseudo-code for how this might look.
OpenSearch:
String unparsedIndexName = getIndexName(); // This might look like logs-${mm.dd}
String parsedIndexName = DestinationExpressionEvaluator.parseExpression(unparsedIndexName); // Now this looks like logs-04.04
validateIndexName(parsedIndexName);
createIndex(parsedIndexName);
S3 sink
String unparsedObjectName = getObject(); // This might look like logs-${mm}-${dd}.json
String parsedObjectName = DestinationExpressionEvaluator.parseExpression(unparsedObjectName); // Now this looks like logs-04-04.json
validateS3ObjectName(parsedObjectName);
createIndex(parsedObjectName);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Modified the functions name based on the date time stamp of the object.
boolean isFileUploadedToS3 = Boolean.FALSE; | ||
String s3ObjectPath = buildingPathPrefix(s3SinkConfig); | ||
String s3ObjectFileName = S3ObjectIndex.getIndexAliasWithDate( | ||
s3SinkConfig.getBucketOptions().getObjectKeyOptions().getNamePattern()) + "." + codecFileExtension; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the name should be WYSIWYG.
So the pipeline author needs to specify the .${extension}
. You can then expand that variable - ${extension}
into the correct value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Modified the code as below:
S3Sink object name extension will be based on the user provided extension in "name_pattern".
The default file extension is json if user does not provide any extension.
* An implementation class of path prefix and file pattern configuration Options | ||
*/ | ||
public class ObjectKeyOptions { | ||
private static final String DEFAULT_FILE_PATTERN = "events-%{yyyy-MM-dd'T'hh-mm-ss}"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be events-%{yyyy-MM-dd'T'hh-mm-ss}-%{epochSeconds}-%{random}.%{extension}
.
Either that, or we should rename the property to be clearer that this is only part of the name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Modifid the property name from DEFAULT_FILE_PATTERN
to DEFAULT_NAME_PATTERN
.
s3Sink = new S3Sink(pluginSetting, s3SinkConfig, pluginFactory); | ||
s3Sink.doInitialize(); | ||
s3Sink.doOutput(generateRandomStringEventRecord()); | ||
assertNotNull(s3Sink); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this is very important.
You should be able to perform the following:
InOrder inOrder = inOrder(s3SinkService);
inOrder.verify(s3SinkService).processRecords(records);
inOrder.verify(s3SinkService).accumulateBufferEvents(sinkWorker);
/** | ||
* Interface for building buffer types. | ||
*/ | ||
public interface BufferType { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class should be unit tested.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added Junit test-cases for BufferType
interface.
@Test | ||
void test_in_memoryAccumulate_with_s3Upoload_success() throws InterruptedException { | ||
S3SinkService s3SinkService = new S3SinkService(s3SinkConfig); | ||
S3Client s3Client = s3SinkService.createS3Client(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should use a mock object here. This will let you verify the interactions with the S3Client.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mocked the s3SinkService
and verified the interactions with the S3Client.
@Test | ||
void test_local_file_accumulate_with_s3Upoload_success() throws InterruptedException { | ||
S3SinkService s3SinkService = new S3SinkService(s3SinkConfig); | ||
S3Client s3Client = s3SinkService.createS3Client(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again, this should be a mocked object. You can then verify the interactions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mocked the s3SinkService
and verified the interactions.
* @throws InterruptedException | ||
*/ | ||
public boolean inMemoryAccumulate(final NavigableSet<String> bufferedEventSet) throws InterruptedException { | ||
String s3ObjectPath = buildingPathPrefix(s3SinkConfig); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some of this code is duplicated with the LocalFileBuffer
. This indicates there should be some refactoring.
Additionally, the decision for building the path is completely independent of the buffering logic. It should probably be moved into its own class which can get the whole object path. Such a small, focused class would be quite easy to unit test as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added new class ObjectKey
for the object path constuction functionality.
|
||
String expectedIndex = S3ObjectIndex.getObjectNameWithDateTimeId("events-%{yyyy-MM-dd}"); | ||
String actualIndex = S3ObjectIndex.getObjectNameWithDateTimeId("events-%{yyyy-MM-dd}"); | ||
assertNotEquals(actualIndex.toString(), expectedIndex.toString()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this assertNotEquals()?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As per requirement, EPOCH SECONDS and UniqueId are appended for the name_pattern.
assertNotEquals are added since both the responses are not equal.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean it would be expected String + EPOCH seconds + unique id? In that case, can we compare that expected String is in the actualIndex.toString()
?
void test_call_createS3Client() { | ||
S3SinkService s3SinkService = new S3SinkService(s3SinkConfig); | ||
S3Client s3Client = s3SinkService.createS3Client(); | ||
assertNotNull(s3Client); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You need to assert() more here to make sures3SinkService
is created properly and s3Client
is properly created with the expected config.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These changes will be addressed as incremental changes, and they will be available in a separate PR.
s3SinkService = mock(S3SinkService.class); | ||
Collection<Record<Event>> records = generateRandomStringEventRecord(); | ||
s3SinkService.processRecords(records); | ||
verify(s3SinkService, atLeastOnce()).processRecords(records); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You need to assert that processRecords
worked as expected. In this case may be you can check to see that event queue has expected number of records in it. You can add an protected
API to S3SinkService
to return the size of eventQueue
to do this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These changes will be addressed as incremental changes, and they will be available in a separate PR.
S3SinkWorker worker = mock(S3SinkWorker.class); | ||
service.processRecords(records); | ||
service.accumulateBufferEvents(worker); | ||
verify(worker, only()).bufferAccumulator(any(BlockingQueue.class)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here. You can check to the eventQueue is empty after doing accumulateBufferEvents
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These changes will be addressed as incremental changes, and they will be available in a separate PR.
s3Sink = new S3Sink(pluginSetting, s3SinkConfig, pluginFactory); | ||
s3Sink.doInitialize(); | ||
s3Sink.doOutput(new ArrayList<>()); | ||
assertNotNull(s3Sink); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please do not do this here. s3Sink is created in 109 and even if it were null, it would have crashed in line 110. You should verify the metrics that s3Sink has actually did output of 0 records here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resolved.
s3Sink = new S3Sink(pluginSetting, s3SinkConfig, pluginFactory); | ||
s3Sink.doInitialize(); | ||
s3Sink.doOutput(generateRandomStringEventRecord()); | ||
assertNotNull(s3Sink); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resolved.
s3Sink = new S3Sink(pluginSetting, s3SinkConfig, pluginFactory); | ||
s3Sink.doInitialize(); | ||
s3Sink.doOutput(generateRandomStringEventRecord()); | ||
assertNotNull(s3Sink); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you uploaded new diffs?
|
||
private static final Logger LOG = LoggerFactory.getLogger(S3SinkService.class); | ||
private final S3SinkConfig s3SinkConfig; | ||
private static final int EVENT_QUEUE_SIZE = 100000; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It appears that you are putting a hard-coded restriction on the number of events to 100,000 here. This will put an upper limit on the ArrayBlockingQueue
.
I believe this will conflict with some user configuration values.
See ThresholdOptions line 24, where we allow a maximum event count of 10,000,000. So if the user sets this value, won't the ArrayBlockingQueue
fill up?
Also, what the user is using bytes instead and it exceeds this value of 100,000?
private static final Logger LOG = LoggerFactory.getLogger(S3SinkService.class); | ||
private final S3SinkConfig s3SinkConfig; | ||
private static final int EVENT_QUEUE_SIZE = 100000; | ||
private BlockingQueue<Event> eventQueue; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are use using a BlockingQueue
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These changes will be addressed as incremental changes, and they will be available in a separate PR.
"Snapshot info : Byte_capacity = {} Bytes, Event_count = {} Records & Event_collection_duration = {} Sec", | ||
byteCount, eventCount, eventCollectionDuration); | ||
|
||
if (s3SinkConfig.getBufferType().equals(BufferTypeOptions.LOCALFILE)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Every time the Data Prepper pipeline sends events to the sink, it calls doOutput
on the S3Sink
class. This in turn, adds events to a BlockingQueue
and then calls this method.
This method is then taking events from the BlockingQueue
, adds them to a new NavigableSet
. After that it, it creates the buffer (local-file or in-memory). Then it calls the "accumulate" methods.
These methods then immediately send to S3. So there is no buffer accumulation. The idea behind the accumulation is to keep data from multiple calls to doOutput
. Otherwise, the user may get very small S3 objects.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These changes will be addressed as incremental changes, and they will be available in a separate PR.
|
||
StringBuilder eventBuilder = new StringBuilder(); | ||
for (String event : bufferedEventSet) { | ||
eventBuilder.append(event); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please provide event.toJsonString()
here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are already applying toJsonString()
jsonCodec parse method before control comes to this file.
File file = new File(s3ObjectFileName); | ||
try (BufferedWriter eventWriter = new BufferedWriter(new FileWriter(s3ObjectFileName))) { | ||
for (String event : bufferedEventSet) { | ||
eventWriter.write(event); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please provide event.toJsonString()
here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are already applying toJsonString()
jsonCodec parse method before control comes to this file.
public void bufferAccumulator(BlockingQueue<Event> eventQueue) { | ||
boolean isFileUploadedToS3 = Boolean.FALSE; | ||
DB eventDb = null; | ||
NavigableSet<String> bufferedEventSet = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this a NavigableSet
? I'm not even sure you need a collection.
I suggested elsewhere that you pass in a List<Event>
to this method. Just provide that List
to the accumulators. Those accumulators can then iterate over the lists and add each Event
into their respective buffers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These changes will be addressed as incremental changes, and they will be available in a separate PR.
try { | ||
StopWatch watch = new StopWatch(); | ||
watch.start(); | ||
eventDb = DBMaker.memoryDB().make(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we have an eventDb
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These changes will be addressed as incremental changes, and they will be available in a separate PR.
*/ | ||
public enum BufferTypeOptions { | ||
|
||
INMEMORY("in_memory", new InMemoryBuffer()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These should be IN_MEMORY
and LOCAL_FILE
. You can use your IDE to rename these easily.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Followed the same notation (lower-case) similar to s3-source CompressionOption.
* An implementation class of path prefix and file pattern configuration Options | ||
*/ | ||
public class ObjectKeyOptions { | ||
private static final String DEFAULT_OBJECT_NAME_PATTERN = "events-%{yyyy-MM-dd'T'hh-mm-ss}"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This value should be:
events-%{yyyy-MM-dd'T'hh-mm-ss}-${epochSeconds}-${uniqueId}.${extension}
I think there may be some broad issues here about format strings as well.
So, my suggestion is that what we do for now is remove the name_pattern
option from customers. We can consider making this configurable later. For this PR, please always create the pattern as the following.
events-%{yyyy-MM-dd'T'hh-mm-ss}-${epochSeconds}-${uniqueId}.${extension}
The epochSeconds
will the the same time as the string in yyyy-MM-dd'T'hh-mm-ss
, but represented as seconds since the 1970 epoch. The unique Id is generated once per Data Prepper instance. And the extension
comes from the codec.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resolved.
private String pathPrefix; | ||
|
||
@JsonProperty("name_pattern") | ||
private String namePattern = DEFAULT_OBJECT_NAME_PATTERN; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As noted above, let's not make this configurable for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resolved.
@JsonProperty("path_prefix") | ||
private String pathPrefix; | ||
|
||
@JsonProperty("name_pattern") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's not expose this as a feature for pipeline authors yet.
- Remove these two lines.
@JsonProperty("name_pattern")
private String namePattern = DEFAULT_OBJECT_NAME_PATTERN;
- Change
getNamePattern
to this:
public String getNamePattern() {
return DEFAULT_OBJECT_NAME_PATTERN;
}
These changes should remove this as a user configuration. And you won't have to change the code that uses this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resolved.
Signed-off-by: Deepak Sahu <deepak.sahu562@gmail.com>
Signed-off-by: Deepak Sahu <deepak.sahu562@gmail.com>
Description
Issues Resolved
Github issue: #1048
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.