Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OpenSearch Sink should make the number of retries configurable - Issue #2291 #2339

Merged
merged 7 commits into from
Mar 16, 2023

Conversation

kkondaka
Copy link
Collaborator

@kkondaka kkondaka commented Mar 1, 2023

Description

Opensearch Sink retries the retryable operations forever. This changes adds configuration option to opensearch sink to limit the number of retries.

Issues Resolved

Resolves #2291

Check List

  • [X ] New functionality includes testing.
  • New functionality has been documented.
    • New functionality has javadoc added
  • [ X] Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@kkondaka kkondaka requested a review from a team as a code owner March 1, 2023 06:24
@codecov-commenter
Copy link

codecov-commenter commented Mar 1, 2023

Codecov Report

Merging #2339 (a166221) into main (6d4eb42) will decrease coverage by 0.17%.
The diff coverage is n/a.

📣 This organization is not using Codecov’s GitHub App Integration. We recommend you install it so Codecov can continue to function properly for your repositories. Learn more

@@             Coverage Diff              @@
##               main    #2339      +/-   ##
============================================
- Coverage     93.81%   93.65%   -0.17%     
+ Complexity     1977     1975       -2     
============================================
  Files           229      229              
  Lines          5499     5499              
  Branches        442      442              
============================================
- Hits           5159     5150       -9     
- Misses          230      241      +11     
+ Partials        110      108       -2     

see 1 file with indirect coverage changes

Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here.


public Builder withDlqFile(final String dlqFile) {
checkNotNull(dlqFile, "dlqFile cannot be null.");
this.dlqFile = dlqFile;
return this;
}

public Builder withMaxRetries(final Integer maxRetries) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we provide more min and max validations around the maxRetries. There are some edge cases like negative numbers and Integer.MAX_VALUE that I am not convinced we should support.

Copy link
Collaborator Author

@kkondaka kkondaka Mar 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Integer.MAX_VALUE should be OK to support indefinite retries.

@@ -138,10 +138,12 @@ private void doInitializeInternal() throws IOException {
OpenSearchTransport transport = new RestClientTransport(restHighLevelClient.getLowLevelClient(), new PreSerializedJsonpMapper());
openSearchClient = new OpenSearchClient(transport);
bulkRequestSupplier = () -> new JavaClientAccumulatingBulkRequest(new BulkRequest.Builder());
final int maxRetries = openSearchSinkConfig.getRetryConfiguration().getMaxRetries();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's update the README.md file with this new configuration option.

@@ -68,7 +68,7 @@ public void testCanRetry() {
AccumulatingBulkRequest accumulatingBulkRequest = mock(AccumulatingBulkRequest.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which one of these tests are validating we are honoring the configured retry config?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added new tests to test retry config.

@@ -24,6 +24,8 @@

int getOperationsCount();

int incrementAndGetRetryCount();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is not the right place to have this.

The purpose of this class is to accumulate documents to know when to flush. This is a different concern than handling retries.

Also, each new retry may issue a new AccumulatingBulkRequest which would lose the count.

Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>
Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>
@@ -26,6 +26,10 @@

int incrementAndGetRetryCount();

void setRetryCount(int count);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding the setter will help the issue of not losing count. But, I still think that there should be a distinct class for handling the retry counts. The AccumulatingBulkRequest can focus on counting accumulations which is its current responsibility.

Another class like RetryableBulkRequest could handle the retry counting.

@@ -19,6 +19,7 @@ public class JavaClientAccumulatingBulkRequest implements AccumulatingBulkReques
private BulkRequest.Builder bulkRequestBuilder;
private long currentBulkSize = 0L;
private int operationCount = 0;
private int retryCount = 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this was left behind from the previous revision.


private BulkResponse bulkMaxRetriesResponse(final BulkRequest bulkRequest) {
final int requestSize = bulkRequest.operations().size();
assert requestSize == 4;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use assertThat(requestSize, equalTo(4));

} else {
final int numberOfDocs = bulkRequestForRetry.getOperationsCount();
if (firstAttempt) {
if (retryCount == 1) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be good to create a variable boolean firstAttempt = retryCount == 1. It is a little unclear in this function that retryCount = 1 actually means that there have been no retries and this is the first attempt.

Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>
final BackOffUtils backOffUtils,
final BulkResponse bulkResponse,
Exception e) throws InterruptedException {
boolean doRetry = (Objects.isNull(e)) ? canRetry(bulkResponse) : canRetry(e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: this can be final to avoid unintentional future re-assignment

final BulkResponse bulkResponse,
Exception e) throws InterruptedException {
boolean doRetry = (Objects.isNull(e)) ? canRetry(bulkResponse) : canRetry(e);
if (!Objects.isNull(bulkResponse) && doRetry && retryCount == 1) { // first attempt
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does doRetry need to be true here? If it is false and some documents were successfully sent on the first attempt we should increment the counter for those documents.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cmanning09 I guess you are right. I just refactored the existing code. It looks like even existing code is incrementing stat only when canRetry is true. Modified the code to increment the stat even when the doRetry is not true.

bulkRequestNumberOfRetries.increment();
} else {
if (doRetry && retryCount >= maxRetries) {
e = new RuntimeException(String.format("Number of retries reached the limit of max retries(configured value %d)", maxRetries));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should avoid re-assignment of method parameters. This can be dangerous and leads to confusing code.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, the caller gets the new exception returned too, so that it gets passed back. Basically it is an in & out parameter to the function handleRetriesAndFailures

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we need to return an object we should use the standard mechanism for returning an object and return the object as a return value. This current behavior is a side affect that happens conditionally and is not documented. I am concerned this will be problematic in the future.

Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>
@dlvenable dlvenable merged commit 5cf8659 into opensearch-project:main Mar 16, 2023
@kkondaka kkondaka deleted the os-limit-retries branch July 13, 2023 04:31
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

OpenSearch Sink should make the number of retries configurable
4 participants