-
Notifications
You must be signed in to change notification settings - Fork 188
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
OpenSearch Sink should make the number of retries configurable - Issue #2291 #2339
Conversation
…opensearch-project#2291 Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>
Codecov Report
📣 This organization is not using Codecov’s GitHub App Integration. We recommend you install it so Codecov can continue to function properly for your repositories. Learn more @@ Coverage Diff @@
## main #2339 +/- ##
============================================
- Coverage 93.81% 93.65% -0.17%
+ Complexity 1977 1975 -2
============================================
Files 229 229
Lines 5499 5499
Branches 442 442
============================================
- Hits 5159 5150 -9
- Misses 230 241 +11
+ Partials 110 108 -2 see 1 file with indirect coverage changes Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here. |
|
||
public Builder withDlqFile(final String dlqFile) { | ||
checkNotNull(dlqFile, "dlqFile cannot be null."); | ||
this.dlqFile = dlqFile; | ||
return this; | ||
} | ||
|
||
public Builder withMaxRetries(final Integer maxRetries) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we provide more min and max validations around the maxRetries. There are some edge cases like negative numbers and Integer.MAX_VALUE
that I am not convinced we should support.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Integer.MAX_VALUE
should be OK to support indefinite retries.
@@ -138,10 +138,12 @@ private void doInitializeInternal() throws IOException { | |||
OpenSearchTransport transport = new RestClientTransport(restHighLevelClient.getLowLevelClient(), new PreSerializedJsonpMapper()); | |||
openSearchClient = new OpenSearchClient(transport); | |||
bulkRequestSupplier = () -> new JavaClientAccumulatingBulkRequest(new BulkRequest.Builder()); | |||
final int maxRetries = openSearchSinkConfig.getRetryConfiguration().getMaxRetries(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's update the README.md file with this new configuration option.
@@ -68,7 +68,7 @@ public void testCanRetry() { | |||
AccumulatingBulkRequest accumulatingBulkRequest = mock(AccumulatingBulkRequest.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Which one of these tests are validating we are honoring the configured retry config?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added new tests to test retry config.
@@ -24,6 +24,8 @@ | |||
|
|||
int getOperationsCount(); | |||
|
|||
int incrementAndGetRetryCount(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is not the right place to have this.
The purpose of this class is to accumulate documents to know when to flush. This is a different concern than handling retries.
Also, each new retry may issue a new AccumulatingBulkRequest
which would lose the count.
Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>
Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>
@@ -26,6 +26,10 @@ | |||
|
|||
int incrementAndGetRetryCount(); | |||
|
|||
void setRetryCount(int count); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding the setter will help the issue of not losing count. But, I still think that there should be a distinct class for handling the retry counts. The AccumulatingBulkRequest
can focus on counting accumulations which is its current responsibility.
Another class like RetryableBulkRequest
could handle the retry counting.
…pper into os-limit-retries rebasing
Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>
@@ -19,6 +19,7 @@ public class JavaClientAccumulatingBulkRequest implements AccumulatingBulkReques | |||
private BulkRequest.Builder bulkRequestBuilder; | |||
private long currentBulkSize = 0L; | |||
private int operationCount = 0; | |||
private int retryCount = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this was left behind from the previous revision.
|
||
private BulkResponse bulkMaxRetriesResponse(final BulkRequest bulkRequest) { | ||
final int requestSize = bulkRequest.operations().size(); | ||
assert requestSize == 4; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use assertThat(requestSize, equalTo(4));
} else { | ||
final int numberOfDocs = bulkRequestForRetry.getOperationsCount(); | ||
if (firstAttempt) { | ||
if (retryCount == 1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be good to create a variable boolean firstAttempt = retryCount == 1
. It is a little unclear in this function that retryCount = 1 actually means that there have been no retries and this is the first attempt.
Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>
final BackOffUtils backOffUtils, | ||
final BulkResponse bulkResponse, | ||
Exception e) throws InterruptedException { | ||
boolean doRetry = (Objects.isNull(e)) ? canRetry(bulkResponse) : canRetry(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: this can be final to avoid unintentional future re-assignment
final BulkResponse bulkResponse, | ||
Exception e) throws InterruptedException { | ||
boolean doRetry = (Objects.isNull(e)) ? canRetry(bulkResponse) : canRetry(e); | ||
if (!Objects.isNull(bulkResponse) && doRetry && retryCount == 1) { // first attempt |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does doRetry
need to be true here? If it is false
and some documents were successfully sent on the first attempt we should increment the counter for those documents.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cmanning09 I guess you are right. I just refactored the existing code. It looks like even existing code is incrementing stat only when canRetry
is true. Modified the code to increment the stat even when the doRetry is not true.
bulkRequestNumberOfRetries.increment(); | ||
} else { | ||
if (doRetry && retryCount >= maxRetries) { | ||
e = new RuntimeException(String.format("Number of retries reached the limit of max retries(configured value %d)", maxRetries)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should avoid re-assignment of method parameters. This can be dangerous and leads to confusing code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case, the caller gets the new exception returned too, so that it gets passed back. Basically it is an in & out parameter to the function handleRetriesAndFailures
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we need to return an object we should use the standard mechanism for returning an object and return the object as a return value. This current behavior is a side affect that happens conditionally and is not documented. I am concerned this will be problematic in the future.
Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>
Description
Opensearch Sink retries the retryable operations forever. This changes adds configuration option to opensearch sink to limit the number of retries.
Issues Resolved
Resolves #2291
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.