Skip to content
This repository has been archived by the owner on Jul 23, 2024. It is now read-only.

Improve waiting strategy for Integration tests - cont #409

Merged
merged 3 commits into from
Jun 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,23 +1,13 @@
package com.redhat.parodos.sdkutils;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.Callable;

import com.google.common.base.Strings;
import com.redhat.parodos.notification.sdk.api.ApiCallback;
import com.redhat.parodos.notification.sdk.api.ApiClient;
import com.redhat.parodos.notification.sdk.api.ApiException;
import com.redhat.parodos.notification.sdk.api.Configuration;
import com.redhat.parodos.notification.sdk.api.NotificationRecordApi;
import com.redhat.parodos.notification.sdk.model.PageNotificationRecordResponseDTO;
import com.redhat.parodos.workflow.utils.CredUtils;
import lombok.Data;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;

import org.springframework.core.env.MissingRequiredPropertiesException;
Expand All @@ -37,8 +27,7 @@ private NotificationServiceUtils() {
* in environment variables.
* @return the ApiClient
*/
public static ApiClient getParodosAPiClient()
throws ApiException, MissingRequiredPropertiesException, InterruptedException {
public static ApiClient getParodosAPiClient() throws MissingRequiredPropertiesException {
ApiClient apiClient = Configuration.getDefaultApiClient();
String serverIp = Optional.ofNullable(System.getenv("NOTIFICATION_SERVICE_HOST")).orElse("localhost");
String serverPort = Optional.ofNullable(System.getenv("SERVER_PORT")).orElse("8081");
Expand All @@ -63,138 +52,23 @@ public static ApiClient getParodosAPiClient()

/**
* Invokes @see
* com.redhat.parodos.notification.sdk.api.NotificationRecordApi#getNotificationsAsync(Pageable,
* String, String, ApiCallback<PageNotificationRecordResponseDTO>) and retries for 60
* seconds.
* com.redhat.parodos.notification.sdk.api.NotificationRecordApi#getNotifications(Pageable,
* String, String) and retries for 120 seconds.
* @param apiclient the API Client
* @throws InterruptedException If the async call reaches the waiting timeout
* @throws ApiException If the API method invocation fails
*/
public static void waitNotificationStart(ApiClient apiclient) throws ApiException, InterruptedException {
public static void waitNotificationStart(ApiClient apiclient) {
NotificationRecordApi notificationRecordApi = new NotificationRecordApi(apiclient);
waitAsyncResponse(new FuncExecutor<PageNotificationRecordResponseDTO>() {
@Override
public boolean check(PageNotificationRecordResponseDTO result, int statusCode) {
return statusCode != 200;
}
try (var executorService = new RetryExecutorService<Void>()) {
Callable<Void> task = () -> {
notificationRecordApi.getNotifications(0, 10, null, null, null);
return null;
};

@Override
public void execute(@NonNull ApiCallback<PageNotificationRecordResponseDTO> callback) throws ApiException {
notificationRecordApi.getNotificationsAsync(0, 10, null, null, null, callback);
}
});
}

/**
* Executes a @see FuncExecutor. Waits at most 60 seconds for a successful result of
* an async API invocation.
* @param f the @see FuncExecutor
* @param <T> the type of the function executor
* @return @see AsyncResult
* @throws ApiException if the api invocation fails
* @throws InterruptedException If the async call reaches the waiting timeout
*/
public static <T> T waitAsyncResponse(FuncExecutor<T> f) throws ApiException, InterruptedException {
AsyncResult<T> asyncResult = new AsyncResult<>();
Lock lock = new ReentrantLock();
Condition response = lock.newCondition();
ApiCallback<T> apiCallback = new ApiCallback<T>() {

@Override
public void onFailure(ApiException e, int statusCode, Map<String, List<String>> responseHeaders) {
log.info("onFailure {}", e.getMessage());
try {
f.execute(this);
}
catch (ApiException apie) {
asyncResult.setError(apie.getMessage());
signal();
}
}

@Override
public void onSuccess(T result, int statusCode, Map<String, List<String>> responseHeaders) {
if (f.check(result, statusCode)) {
try {
f.execute(this);
}
catch (ApiException apie) {
asyncResult.setError(apie.getMessage());
signal();
}
}
else {
asyncResult.setStatusCode(statusCode);
asyncResult.setResult(result);
asyncResult.setError(null);
signal();
}
}

@Override
public void onUploadProgress(long bytesWritten, long contentLength, boolean done) {
}

@Override
public void onDownloadProgress(long bytesRead, long contentLength, boolean done) {
}

private void signal() {
lock.lock();
try {
response.signal();
}
finally {
lock.unlock();
}
}
};
f.execute(apiCallback);
lock.lock();
try {
// should be more than enough
response.await(60, TimeUnit.SECONDS);
if (asyncResult.getError() != null) {
throw new ApiException(
"An error occurred while executing waitAsyncResponse: " + asyncResult.getError());
}
executorService.submitWithRetry(task);
}
finally {
lock.unlock();
catch (Exception e) {
throw new RuntimeException("Notification Record API is not up and running", e);
}
return asyncResult.getResult();
}

public interface FuncExecutor<T> {

/**
* Defines the @see ApiCallback to execute
* @param callback the
* @throws ApiException If the API callback invocation fails
*/
void execute(@NonNull ApiCallback<T> callback) throws ApiException;

/**
* Define when considering an ApiCallback result as successful.
* @param result the result to check
* @return {true} if it is necessary to continue monitoring the result, {false}
* when it's possible to stop the monitoring.
*/
default boolean check(T result, int statusCode) {
return true;
}

}

@Data
private static class AsyncResult<T> {

private String error;

T result;

int statusCode;

}

}
Original file line number Diff line number Diff line change
@@ -1,18 +1,24 @@
package com.redhat.parodos.sdkutils;

import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class RetryExecutorService<T> implements AutoCloseable {

private final ExecutorService executor;
private static final int MAX_RETRY_TIME = 2 * 60 * 1000; // 2 minutes

public static final int RETRY_DELAY = 5 * 1000; // 5 seconds

private final ScheduledExecutorService scheduledExecutor;

public RetryExecutorService() {
executor = Executors.newFixedThreadPool(1);
scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
}

/**
Expand All @@ -22,7 +28,7 @@ public RetryExecutorService() {
*/
public T submitWithRetry(Callable<T> task) {
// @formatter:off
return submitWithRetry(task, () -> {}, () -> {}, 10 * 60 * 1000, 5000);
return submitWithRetry(task, () -> {}, () -> {}, MAX_RETRY_TIME, RETRY_DELAY);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this (and the commit message ;) )

// @formatter:on
}

Expand All @@ -37,48 +43,41 @@ public T submitWithRetry(Callable<T> task) {
*/
public T submitWithRetry(Callable<T> task, Runnable onSuccess, Runnable onFailure, long maxRetryTime,
long retryDelay) {
Future<T> future = executor.submit(() -> {
long startTime = System.currentTimeMillis();
long endTime = startTime + maxRetryTime;

while (System.currentTimeMillis() < endTime) {
try {
T result = task.call();
onSuccess.run();
return result; // Success, no need to retry
}
catch (Exception e) {
// Task failed, invoke onFailure callback
onFailure.run();
CompletableFuture<T> future = new CompletableFuture<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this future here needed?

Copy link
Collaborator Author

@masayag masayag Jun 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it stores the result and is used as a reference to its success or failure.
there are 2 "futures" here: one is responsible for the retries which are the scheduledFuture.
the other one is used to report the result in line 67.
would you suggest a different method of doing it?

long startTime = System.currentTimeMillis();
long endTime = startTime + maxRetryTime;

// Sleep for the retry delay
try {
// FIXME: This is a blocking call, we should use a non-blocking
// sleep
Thread.sleep(retryDelay);
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
return null; // Interrupted, exit the task
}
}
ScheduledFuture<?> scheduledFuture = scheduledExecutor.scheduleWithFixedDelay(() -> {
if (System.currentTimeMillis() >= endTime) {
future.completeExceptionally(new TimeoutException("Retry limit reached."));
return;
}
Comment on lines +51 to 54
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Set a timeout in future.get() (i.e. future.get(endTime, TimeUnit.MILLISECONDS); does it have equivalent behavior to this block of code?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, setting a timeout using future.get(endTime, TimeUnit.MILLISECONDS) does not provide an equivalent behavior to the previous block of code in the RetryExecutorService.

The future.get(endTime, TimeUnit.MILLISECONDS) method call sets a timeout for waiting to retrieve the result from the future. If the result is not available within the specified timeout period, a TimeoutException will be thrown. However, it does not cancel the ongoing execution of the task or stop further retry attempts.


return null; // Retry limit reached
});
try {
T result = task.call();
onSuccess.run();
future.complete(result); // Success, complete the future with the result
}
catch (Exception e) {
onFailure.run();
}
}, 0, retryDelay, TimeUnit.MILLISECONDS);

try {
return future.get();
}
catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
finally {
scheduledFuture.cancel(false);
scheduledExecutor.shutdown();
}
}

@Override
public void close() throws Exception {
executor.shutdown();
boolean awaited = executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
boolean awaited = scheduledExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
if (!awaited) {
throw new RuntimeException("Failed to await termination of executor service");
}
Expand Down