Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OpenSearch Sink should make the number of retries configurable - Issue #2291 #2339

Merged
merged 7 commits into from
Mar 16, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions data-prepper-plugins/opensearch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pipeline:
password: YOUR_PASSWORD_HERE
index_type: trace-analytics-raw
dlq_file: /your/local/dlq-file
max_retries: 20
bulk_size: 4
```

Expand Down Expand Up @@ -139,6 +140,9 @@ e.g. [otel-v1-apm-span-index-template.json](https://github.com/opensearch-projec
- `dlq_file`(optional): A String of absolute file path for DLQ failed output records. Defaults to null.
If not provided, failed records will be written into the default data-prepper log file (`logs/Data-Prepper.log`).

- `max_retries`(optional): A number indicating the maximum number of times OpenSearch Sink should try to push the data to the OpenSearch server before considering it as failure. Defaults to `Integer.MAX_VALUE`.
If not provided, the sink will try to push the data to OpenSearch server indefinitely because default value is very high and exponential backoff would increase the waiting time before retry.

- `bulk_size` (optional): A long of bulk size in bulk requests in MB. Default to 5 MB. If set to be less than 0,
all the records received from the upstream prepper at a time will be sent as a single bulk request.
If a single record turns out to be larger than the set bulk size, it will be sent as a bulk request of a single document.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import java.util.Objects;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
Expand Down Expand Up @@ -94,6 +97,8 @@ public final class BulkRetryStrategy {
private final BiConsumer<BulkOperation, Throwable> logFailure;
private final PluginMetrics pluginMetrics;
private final Supplier<AccumulatingBulkRequest> bulkRequestSupplier;
private final int maxRetries;
private final Map<AccumulatingBulkRequest<BulkOperation, BulkRequest>, Integer> retryCountMap;

private final Counter sentDocumentsCounter;
private final Counter sentDocumentsOnFirstAttemptCounter;
Expand All @@ -110,11 +115,14 @@ public final class BulkRetryStrategy {
public BulkRetryStrategy(final RequestFunction<AccumulatingBulkRequest<BulkOperation, BulkRequest>, BulkResponse> requestFunction,
final BiConsumer<BulkOperation, Throwable> logFailure,
final PluginMetrics pluginMetrics,
final int maxRetries,
final Supplier<AccumulatingBulkRequest> bulkRequestSupplier) {
this.requestFunction = requestFunction;
this.logFailure = logFailure;
this.pluginMetrics = pluginMetrics;
this.bulkRequestSupplier = bulkRequestSupplier;
this.maxRetries = maxRetries;
this.retryCountMap = new HashMap<>();

sentDocumentsCounter = pluginMetrics.counter(DOCUMENTS_SUCCESS);
sentDocumentsOnFirstAttemptCounter = pluginMetrics.counter(DOCUMENTS_SUCCESS_FIRST_ATTEMPT);
Expand All @@ -134,7 +142,7 @@ public void execute(final AccumulatingBulkRequest bulkRequest) throws Interrupte
// TODO: replace with custom backoff policy setting including maximum interval between retries
final BackOffUtils backOffUtils = new BackOffUtils(
BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(50), Integer.MAX_VALUE).iterator());
handleRetry(bulkRequest, null, backOffUtils, true);
handleRetry(bulkRequest, null, backOffUtils);
}

public boolean canRetry(final BulkResponse response) {
Expand All @@ -152,16 +160,46 @@ public static boolean canRetry(final Exception e) {
!NON_RETRY_STATUS.contains(((OpenSearchException) e).status().getStatus())));
}

private void handleRetriesAndFailures(final AccumulatingBulkRequest bulkRequestForRetry,
final int retryCount,
final BackOffUtils backOffUtils,
final BulkResponse bulkResponse,
Exception e) throws InterruptedException {
boolean doRetry = (Objects.isNull(e)) ? canRetry(bulkResponse) : canRetry(e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: this can be final to avoid unintentional future re-assignment

if (!Objects.isNull(bulkResponse) && doRetry && retryCount == 1) { // first attempt
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does doRetry need to be true here? If it is false and some documents were successfully sent on the first attempt we should increment the counter for those documents.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cmanning09 I guess you are right. I just refactored the existing code. It looks like even existing code is incrementing stat only when canRetry is true. Modified the code to increment the stat even when the doRetry is not true.

for (final BulkResponseItem bulkItemResponse : bulkResponse.items()) {
if (bulkItemResponse.error() == null) {
sentDocumentsOnFirstAttemptCounter.increment();
}
}
}
if (doRetry && retryCount < maxRetries) {
handleRetry(bulkRequestForRetry, bulkResponse, backOffUtils);
bulkRequestNumberOfRetries.increment();
} else {
if (doRetry && retryCount >= maxRetries) {
e = new RuntimeException(String.format("Number of retries reached the limit of max retries(configured value %d)", maxRetries));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should avoid re-assignment of method parameters. This can be dangerous and leads to confusing code.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, the caller gets the new exception returned too, so that it gets passed back. Basically it is an in & out parameter to the function handleRetriesAndFailures

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we need to return an object we should use the standard mechanism for returning an object and return the object as a return value. This current behavior is a side affect that happens conditionally and is not documented. I am concerned this will be problematic in the future.

}
if (Objects.isNull(e)) {
handleFailures(bulkRequestForRetry, bulkResponse.items());
} else {
handleFailures(bulkRequestForRetry, e);
}
bulkRequestFailedCounter.increment();
}
}

private void handleRetry(final AccumulatingBulkRequest request, final BulkResponse response,
final BackOffUtils backOffUtils, final boolean firstAttempt) throws InterruptedException {
final BackOffUtils backOffUtils) throws InterruptedException {
final AccumulatingBulkRequest<BulkOperation, BulkRequest> bulkRequestForRetry = createBulkRequestForRetry(request, response);
int retryCount = retryCountMap.get(bulkRequestForRetry);
if (backOffUtils.hasNext()) {
// Wait for backOff duration
backOffUtils.next();
final BulkResponse bulkResponse;
try {
bulkResponse = requestFunction.apply(bulkRequestForRetry);
} catch (final Exception e) {
} catch (Exception e) {
if (e instanceof OpenSearchException) {
int status = ((OpenSearchException) e).status().getStatus();
if (NOT_ALLOWED_ERRORS.contains(status)) {
Expand All @@ -178,47 +216,35 @@ private void handleRetry(final AccumulatingBulkRequest request, final BulkRespon
bulkRequestBadErrors.increment();
}
}

if (canRetry(e)) {
handleRetry(bulkRequestForRetry, null, backOffUtils, false);
bulkRequestNumberOfRetries.increment();
} else {
handleFailures(bulkRequestForRetry, e);
bulkRequestFailedCounter.increment();
}

handleRetriesAndFailures(bulkRequestForRetry, retryCount, backOffUtils, null, e);
return;
}
if (bulkResponse.errors()) {
if (canRetry(bulkResponse)) {
if (firstAttempt) {
for (final BulkResponseItem bulkItemResponse : bulkResponse.items()) {
if (bulkItemResponse.error() == null) {
sentDocumentsOnFirstAttemptCounter.increment();
}
}
}
handleRetry(bulkRequestForRetry, bulkResponse, backOffUtils, false);
} else {
handleFailures(bulkRequestForRetry, bulkResponse.items());
}
handleRetriesAndFailures(bulkRequestForRetry, retryCount, backOffUtils, bulkResponse, null);
} else {
final int numberOfDocs = bulkRequestForRetry.getOperationsCount();
if (firstAttempt) {
if (retryCount == 1) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be good to create a variable boolean firstAttempt = retryCount == 1. It is a little unclear in this function that retryCount = 1 actually means that there have been no retries and this is the first attempt.

sentDocumentsOnFirstAttemptCounter.increment(numberOfDocs);
}
sentDocumentsCounter.increment(bulkRequestForRetry.getOperationsCount());
retryCountMap.remove(bulkRequestForRetry);
}
}
}

private AccumulatingBulkRequest<BulkOperation, BulkRequest> createBulkRequestForRetry(
final AccumulatingBulkRequest<BulkOperation, BulkRequest> request, final BulkResponse response) {
int newCount = retryCountMap.containsKey(request) ? (retryCountMap.get(request) + 1) : 1;
if (response == null) {
retryCountMap.put(request, newCount);
// first attempt or retry due to Exception
return request;
} else {
final AccumulatingBulkRequest requestToReissue = bulkRequestSupplier.get();
if (request != requestToReissue) {
retryCountMap.put(requestToReissue, newCount);
retryCountMap.remove(request);
}
int index = 0;
for (final BulkResponseItem bulkItemResponse : response.items()) {
if (bulkItemResponse.error() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,12 @@ private void doInitializeInternal() throws IOException {
OpenSearchTransport transport = new RestClientTransport(restHighLevelClient.getLowLevelClient(), new PreSerializedJsonpMapper());
openSearchClient = new OpenSearchClient(transport);
bulkRequestSupplier = () -> new JavaClientAccumulatingBulkRequest(new BulkRequest.Builder());
final int maxRetries = openSearchSinkConfig.getRetryConfiguration().getMaxRetries();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's update the README.md file with this new configuration option.

bulkRetryStrategy = new BulkRetryStrategy(
bulkRequest -> openSearchClient.bulk(bulkRequest.getRequest()),
this::logFailure,
pluginMetrics,
maxRetries,
bulkRequestSupplier);

objectMapper = new ObjectMapper();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,29 +11,45 @@

public class RetryConfiguration {
public static final String DLQ_FILE = "dlq_file";
public static final String MAX_RETRIES = "max_retries";

private final String dlqFile;
private final int maxRetries;

public String getDlqFile() {
return dlqFile;
}

public int getMaxRetries() {
if (maxRetries < 1) {
throw new IllegalArgumentException("max_retries must be > 1");
}
return maxRetries;
}

public static class Builder {
private String dlqFile;
private int maxRetries = Integer.MAX_VALUE;

public Builder withDlqFile(final String dlqFile) {
checkNotNull(dlqFile, "dlqFile cannot be null.");
this.dlqFile = dlqFile;
return this;
}

public Builder withMaxRetries(final Integer maxRetries) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we provide more min and max validations around the maxRetries. There are some edge cases like negative numbers and Integer.MAX_VALUE that I am not convinced we should support.

Copy link
Collaborator Author

@kkondaka kkondaka Mar 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Integer.MAX_VALUE should be OK to support indefinite retries.

checkNotNull(maxRetries, "maxRetries cannot be null.");
this.maxRetries = maxRetries;
return this;
}
public RetryConfiguration build() {
return new RetryConfiguration(this);
}
}

private RetryConfiguration(final Builder builder) {
this.dlqFile = builder.dlqFile;
this.maxRetries = builder.maxRetries;
}

public static RetryConfiguration readRetryConfig(final PluginSetting pluginSetting) {
Expand All @@ -42,6 +58,10 @@ public static RetryConfiguration readRetryConfig(final PluginSetting pluginSetti
if (dlqFile != null) {
builder = builder.withDlqFile(dlqFile);
}
final Integer maxRetries = (Integer) pluginSetting.getAttributeFromSettings(MAX_RETRIES);
if (maxRetries != null) {
builder = builder.withMaxRetries(maxRetries);
}
return builder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public class JavaClientAccumulatingBulkRequest implements AccumulatingBulkReques
private BulkRequest.Builder bulkRequestBuilder;
private long currentBulkSize = 0L;
private int operationCount = 0;
private int retryCount = 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this was left behind from the previous revision.

private BulkRequest builtRequest;

public JavaClientAccumulatingBulkRequest(BulkRequest.Builder bulkRequestBuilder) {
Expand Down
Loading