-
Notifications
You must be signed in to change notification settings - Fork 188
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Github-issue#1048 : s3-sink with local-file buffer implementation. #2645
Github-issue#1048 : s3-sink with local-file buffer implementation. #2645
Conversation
/** | ||
* Defines all the buffer types enumerations. | ||
*/ | ||
public enum BufferTypeOptions { | ||
|
||
INMEMORY, | ||
LOCALFILE | ||
LOCALFILE("local_file", new LocalFileBuffer()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought the plan is to support both INMEMORY and LOCALFILE. Isn't it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are supporting both INMEMORY and LOCALFILE buffer type. The INMEMORY buffer functionality was raised as separate PR: #2623
4bfe6b1
to
2512d1c
Compare
Signed-off-by: Deepak Sahu <deepak.sahu562@gmail.com>
Codecov Report
@@ Coverage Diff @@
## main #2645 +/- ##
============================================
+ Coverage 93.56% 93.61% +0.04%
Complexity 2254 2254
============================================
Files 262 262
Lines 6308 6308
Branches 521 521
============================================
+ Hits 5902 5905 +3
+ Misses 268 266 -2
+ Partials 138 137 -1 |
Signed-off-by: Deepak Sahu <deepak.sahu562@gmail.com>
Signed-off-by: Deepak Sahu <deepak.sahu562@gmail.com>
@ashoktelukuntla \ @dlvenable |
|
||
LocalFileBuffer() { | ||
try { | ||
localFile = new File(String.valueOf(UUID.randomUUID())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should use a temp file here. It could be in the system default temp, or we can allow a user to configure this directory.
Java provides two createTempFile
functions that can help here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed. Implemented using Java provided functionality.
public class LocalFileBuffer implements Buffer { | ||
|
||
private static final Logger LOG = LoggerFactory.getLogger(LocalFileBuffer.class); | ||
private BufferedOutputStream bufferedOutputStream; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd recommend making this just an OutputStream
so that the code doesn't have to change if you change this stream class.
private OutputStream outputStream;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Modified.
*/ | ||
@Override | ||
public void writeEvent(byte[] bytes) throws IOException { | ||
bufferedOutputStream.write(bytes); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need some synchronization for these writes. Or the events may end up out-of-order.
We can add it here (and to the InMemoryBuffer
). Or perhaps we can synchronize in the S3SinkService
itself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we have synchronized in the S3SinkService
itself.
Assertions.assertNotNull(localFileBufferFactory); | ||
Buffer buffer = localFileBufferFactory.getBuffer(); | ||
Assertions.assertNotNull(buffer); | ||
assertThat(buffer, instanceOf(Buffer.class)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should assert that this is an instance of LocalFileBuffer
as well.
assertThat(buffer, instanceOf(LocalFileBuffer.class))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@deepaksahu562 , Did you push the change here? I don't see a new assertion for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dlvenable, Earlier, I didn't understand. Now I can understand and modify as requested.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be as simple as changing this line to:
assertThat(buffer, instanceOf(LocalFileBuffer.class))
Here it is with a little more context.
Buffer buffer = localFileBufferFactory.getBuffer();
Assertions.assertNotNull(buffer);
assertThat(buffer, instanceOf(LocalFileBuffer.class))
We already know this is a Buffer
. That is handled statically by Java. But, we want to know that this is returning the LocalFileBuffer
and not an InMemoryBuffer
(or some other buffer sub-class).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please change this line per my latest comment. Then we should be good.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your clarification, Modified as you suggested.
} | ||
assertThat(localFileBuffer.getSize(), greaterThan(1l)); | ||
assertThat(localFileBuffer.getEventCount(), equalTo(55)); | ||
assertThat(localFileBuffer.getDuration(), greaterThanOrEqualTo(0L)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should be able to assert the exact duration here.
assertThat(localFileBuffer.getDuration(), equalTo(55 * 1000L));
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed.
assertDoesNotThrow(() -> { | ||
localFileBuffer.flushToS3(s3Client, BUCKET_NAME, "log.txt"); | ||
}); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You need to verify that the file was flushed to S3. Since this is not an integration test, you will do this via mocking.
It should look something like the following:
ArgumentCaptor<PutObjectRequest> putObjectRequestArgumentCaptor = ArgumentCaptor.forClass(PutObjectRequest.class);
verify(s3Client).putObject(putObjectRequestArgumentCaptor.capture(), any(RequestBody.class));
PutObjectRequest actualRequest = putObjectRequestArgumentCaptor.getValue();
// add assertions for the bucket and key values.
Also, this should validate that the file was actually deleted. You will do this with real files (not mocks).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Modified and addressed as per suggestions.
public class LocalFileBufferFactory implements BufferFactory { | ||
@Override | ||
public Buffer getBuffer() { | ||
return new LocalFileBuffer(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can probably improve your testing by choosing the File
here and passing it in as a parameter.
File = File.createTempDir(...);
return new LocalFileBuffer(file);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed.
Signed-off-by: Deepak Sahu <deepak.sahu562@gmail.com>
@@ -15,7 +18,16 @@ | |||
*/ | |||
public enum BufferTypeOptions { | |||
|
|||
INMEMORY("in_memory", new InMemoryBuffer()); | |||
INMEMORY("in_memory", new InMemoryBuffer()), | |||
LOCALFILE("local_file", new Object() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did you change to make this anonymous class? I think you can continue to make this work with new LocalFileBufferFactory
. This helps with unit testing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Modified with new LocalFileBufferFactory
.
Assertions.assertNotNull(localFileBufferFactory); | ||
Buffer buffer = localFileBufferFactory.getBuffer(); | ||
Assertions.assertNotNull(buffer); | ||
assertThat(buffer, instanceOf(Buffer.class)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@deepaksahu562 , Did you push the change here? I don't see a new assertion for this.
File tempFile = null; | ||
Buffer localfileBuffer = null; | ||
try { | ||
tempFile = File.createTempFile(PREFIX, null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe the suffix should be .log
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Modified.
Signed-off-by: Deepak Sahu <deepak.sahu562@gmail.com>
this.option = option.toLowerCase(); | ||
this.bufferType = bufferType; | ||
} | ||
|
||
public Buffer getBufferType() { | ||
public BufferFactory getBufferType() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this is a good change. Thanks!
Assertions.assertNotNull(localFileBufferFactory); | ||
Buffer buffer = localFileBufferFactory.getBuffer(); | ||
Assertions.assertNotNull(buffer); | ||
assertThat(buffer, instanceOf(Buffer.class)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please change this line per my latest comment. Then we should be good.
Signed-off-by: Deepak Sahu <deepak.sahu562@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good.
Description
Issues Resolved
GitHub-issue #1048
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.