-
Notifications
You must be signed in to change notification settings - Fork 188
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
OpenSearch Sink should make the number of retries configurable - Issue #2291 #2339
Changes from 5 commits
dc1dc6f
739c543
6b6a0c1
c88e995
6ec4834
a166221
35b5e18
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,6 +22,9 @@ | |
import java.util.Arrays; | ||
import java.util.HashSet; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.HashMap; | ||
import java.util.Objects; | ||
import java.util.Set; | ||
import java.util.function.BiConsumer; | ||
import java.util.function.Supplier; | ||
|
@@ -94,6 +97,8 @@ public final class BulkRetryStrategy { | |
private final BiConsumer<BulkOperation, Throwable> logFailure; | ||
private final PluginMetrics pluginMetrics; | ||
private final Supplier<AccumulatingBulkRequest> bulkRequestSupplier; | ||
private final int maxRetries; | ||
private final Map<AccumulatingBulkRequest<BulkOperation, BulkRequest>, Integer> retryCountMap; | ||
|
||
private final Counter sentDocumentsCounter; | ||
private final Counter sentDocumentsOnFirstAttemptCounter; | ||
|
@@ -110,11 +115,14 @@ public final class BulkRetryStrategy { | |
public BulkRetryStrategy(final RequestFunction<AccumulatingBulkRequest<BulkOperation, BulkRequest>, BulkResponse> requestFunction, | ||
final BiConsumer<BulkOperation, Throwable> logFailure, | ||
final PluginMetrics pluginMetrics, | ||
final int maxRetries, | ||
final Supplier<AccumulatingBulkRequest> bulkRequestSupplier) { | ||
this.requestFunction = requestFunction; | ||
this.logFailure = logFailure; | ||
this.pluginMetrics = pluginMetrics; | ||
this.bulkRequestSupplier = bulkRequestSupplier; | ||
this.maxRetries = maxRetries; | ||
this.retryCountMap = new HashMap<>(); | ||
|
||
sentDocumentsCounter = pluginMetrics.counter(DOCUMENTS_SUCCESS); | ||
sentDocumentsOnFirstAttemptCounter = pluginMetrics.counter(DOCUMENTS_SUCCESS_FIRST_ATTEMPT); | ||
|
@@ -134,7 +142,7 @@ public void execute(final AccumulatingBulkRequest bulkRequest) throws Interrupte | |
// TODO: replace with custom backoff policy setting including maximum interval between retries | ||
final BackOffUtils backOffUtils = new BackOffUtils( | ||
BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(50), Integer.MAX_VALUE).iterator()); | ||
handleRetry(bulkRequest, null, backOffUtils, true); | ||
handleRetry(bulkRequest, null, backOffUtils); | ||
} | ||
|
||
public boolean canRetry(final BulkResponse response) { | ||
|
@@ -152,16 +160,46 @@ public static boolean canRetry(final Exception e) { | |
!NON_RETRY_STATUS.contains(((OpenSearchException) e).status().getStatus()))); | ||
} | ||
|
||
private void handleRetriesAndFailures(final AccumulatingBulkRequest bulkRequestForRetry, | ||
final int retryCount, | ||
final BackOffUtils backOffUtils, | ||
final BulkResponse bulkResponse, | ||
Exception e) throws InterruptedException { | ||
boolean doRetry = (Objects.isNull(e)) ? canRetry(bulkResponse) : canRetry(e); | ||
if (!Objects.isNull(bulkResponse) && doRetry && retryCount == 1) { // first attempt | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why does There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @cmanning09 I guess you are right. I just refactored the existing code. It looks like even existing code is incrementing stat only when |
||
for (final BulkResponseItem bulkItemResponse : bulkResponse.items()) { | ||
if (bulkItemResponse.error() == null) { | ||
sentDocumentsOnFirstAttemptCounter.increment(); | ||
} | ||
} | ||
} | ||
if (doRetry && retryCount < maxRetries) { | ||
handleRetry(bulkRequestForRetry, bulkResponse, backOffUtils); | ||
bulkRequestNumberOfRetries.increment(); | ||
} else { | ||
if (doRetry && retryCount >= maxRetries) { | ||
e = new RuntimeException(String.format("Number of retries reached the limit of max retries(configured value %d)", maxRetries)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should avoid re-assignment of method parameters. This can be dangerous and leads to confusing code. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In this case, the caller gets the new exception returned too, so that it gets passed back. Basically it is an in & out parameter to the function There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we need to return an object we should use the standard mechanism for returning an object and return the object as a return value. This current behavior is a side affect that happens conditionally and is not documented. I am concerned this will be problematic in the future. |
||
} | ||
if (Objects.isNull(e)) { | ||
handleFailures(bulkRequestForRetry, bulkResponse.items()); | ||
} else { | ||
handleFailures(bulkRequestForRetry, e); | ||
} | ||
bulkRequestFailedCounter.increment(); | ||
} | ||
} | ||
|
||
private void handleRetry(final AccumulatingBulkRequest request, final BulkResponse response, | ||
final BackOffUtils backOffUtils, final boolean firstAttempt) throws InterruptedException { | ||
final BackOffUtils backOffUtils) throws InterruptedException { | ||
final AccumulatingBulkRequest<BulkOperation, BulkRequest> bulkRequestForRetry = createBulkRequestForRetry(request, response); | ||
int retryCount = retryCountMap.get(bulkRequestForRetry); | ||
if (backOffUtils.hasNext()) { | ||
// Wait for backOff duration | ||
backOffUtils.next(); | ||
final BulkResponse bulkResponse; | ||
try { | ||
bulkResponse = requestFunction.apply(bulkRequestForRetry); | ||
} catch (final Exception e) { | ||
} catch (Exception e) { | ||
if (e instanceof OpenSearchException) { | ||
int status = ((OpenSearchException) e).status().getStatus(); | ||
if (NOT_ALLOWED_ERRORS.contains(status)) { | ||
|
@@ -178,47 +216,35 @@ private void handleRetry(final AccumulatingBulkRequest request, final BulkRespon | |
bulkRequestBadErrors.increment(); | ||
} | ||
} | ||
|
||
if (canRetry(e)) { | ||
handleRetry(bulkRequestForRetry, null, backOffUtils, false); | ||
bulkRequestNumberOfRetries.increment(); | ||
} else { | ||
handleFailures(bulkRequestForRetry, e); | ||
bulkRequestFailedCounter.increment(); | ||
} | ||
|
||
handleRetriesAndFailures(bulkRequestForRetry, retryCount, backOffUtils, null, e); | ||
return; | ||
} | ||
if (bulkResponse.errors()) { | ||
if (canRetry(bulkResponse)) { | ||
if (firstAttempt) { | ||
for (final BulkResponseItem bulkItemResponse : bulkResponse.items()) { | ||
if (bulkItemResponse.error() == null) { | ||
sentDocumentsOnFirstAttemptCounter.increment(); | ||
} | ||
} | ||
} | ||
handleRetry(bulkRequestForRetry, bulkResponse, backOffUtils, false); | ||
} else { | ||
handleFailures(bulkRequestForRetry, bulkResponse.items()); | ||
} | ||
handleRetriesAndFailures(bulkRequestForRetry, retryCount, backOffUtils, bulkResponse, null); | ||
} else { | ||
final int numberOfDocs = bulkRequestForRetry.getOperationsCount(); | ||
if (firstAttempt) { | ||
if (retryCount == 1) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It might be good to create a variable |
||
sentDocumentsOnFirstAttemptCounter.increment(numberOfDocs); | ||
} | ||
sentDocumentsCounter.increment(bulkRequestForRetry.getOperationsCount()); | ||
retryCountMap.remove(bulkRequestForRetry); | ||
} | ||
} | ||
} | ||
|
||
private AccumulatingBulkRequest<BulkOperation, BulkRequest> createBulkRequestForRetry( | ||
final AccumulatingBulkRequest<BulkOperation, BulkRequest> request, final BulkResponse response) { | ||
int newCount = retryCountMap.containsKey(request) ? (retryCountMap.get(request) + 1) : 1; | ||
if (response == null) { | ||
retryCountMap.put(request, newCount); | ||
// first attempt or retry due to Exception | ||
return request; | ||
} else { | ||
final AccumulatingBulkRequest requestToReissue = bulkRequestSupplier.get(); | ||
if (request != requestToReissue) { | ||
retryCountMap.put(requestToReissue, newCount); | ||
retryCountMap.remove(request); | ||
} | ||
int index = 0; | ||
for (final BulkResponseItem bulkItemResponse : response.items()) { | ||
if (bulkItemResponse.error() != null) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -138,10 +138,12 @@ private void doInitializeInternal() throws IOException { | |
OpenSearchTransport transport = new RestClientTransport(restHighLevelClient.getLowLevelClient(), new PreSerializedJsonpMapper()); | ||
openSearchClient = new OpenSearchClient(transport); | ||
bulkRequestSupplier = () -> new JavaClientAccumulatingBulkRequest(new BulkRequest.Builder()); | ||
final int maxRetries = openSearchSinkConfig.getRetryConfiguration().getMaxRetries(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's update the README.md file with this new configuration option. |
||
bulkRetryStrategy = new BulkRetryStrategy( | ||
bulkRequest -> openSearchClient.bulk(bulkRequest.getRequest()), | ||
this::logFailure, | ||
pluginMetrics, | ||
maxRetries, | ||
bulkRequestSupplier); | ||
|
||
objectMapper = new ObjectMapper(); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,29 +11,45 @@ | |
|
||
public class RetryConfiguration { | ||
public static final String DLQ_FILE = "dlq_file"; | ||
public static final String MAX_RETRIES = "max_retries"; | ||
|
||
private final String dlqFile; | ||
private final int maxRetries; | ||
|
||
public String getDlqFile() { | ||
return dlqFile; | ||
} | ||
|
||
public int getMaxRetries() { | ||
if (maxRetries < 1) { | ||
throw new IllegalArgumentException("max_retries must be > 1"); | ||
} | ||
return maxRetries; | ||
} | ||
|
||
public static class Builder { | ||
private String dlqFile; | ||
private int maxRetries = Integer.MAX_VALUE; | ||
|
||
public Builder withDlqFile(final String dlqFile) { | ||
checkNotNull(dlqFile, "dlqFile cannot be null."); | ||
this.dlqFile = dlqFile; | ||
return this; | ||
} | ||
|
||
public Builder withMaxRetries(final Integer maxRetries) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we provide more min and max validations around the maxRetries. There are some edge cases like negative numbers and There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
checkNotNull(maxRetries, "maxRetries cannot be null."); | ||
this.maxRetries = maxRetries; | ||
return this; | ||
} | ||
public RetryConfiguration build() { | ||
return new RetryConfiguration(this); | ||
} | ||
} | ||
|
||
private RetryConfiguration(final Builder builder) { | ||
this.dlqFile = builder.dlqFile; | ||
this.maxRetries = builder.maxRetries; | ||
} | ||
|
||
public static RetryConfiguration readRetryConfig(final PluginSetting pluginSetting) { | ||
|
@@ -42,6 +58,10 @@ public static RetryConfiguration readRetryConfig(final PluginSetting pluginSetti | |
if (dlqFile != null) { | ||
builder = builder.withDlqFile(dlqFile); | ||
} | ||
final Integer maxRetries = (Integer) pluginSetting.getAttributeFromSettings(MAX_RETRIES); | ||
if (maxRetries != null) { | ||
builder = builder.withMaxRetries(maxRetries); | ||
} | ||
return builder.build(); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,6 +19,7 @@ public class JavaClientAccumulatingBulkRequest implements AccumulatingBulkReques | |
private BulkRequest.Builder bulkRequestBuilder; | ||
private long currentBulkSize = 0L; | ||
private int operationCount = 0; | ||
private int retryCount = 0; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this was left behind from the previous revision. |
||
private BulkRequest builtRequest; | ||
|
||
public JavaClientAccumulatingBulkRequest(BulkRequest.Builder bulkRequestBuilder) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: this can be final to avoid unintentional future re-assignment