Skip to content
This repository has been archived by the owner on Jul 23, 2024. It is now read-only.

Commit

Permalink
Replace async invocation with RetryExecutorService
Browse files Browse the repository at this point in the history
The PR modifies the notification service integration tests to use the
same RetryExecutorService class for waiting on the notification service
to become available.

Signed-off-by: Moti Asayag <masayag@redhat.com>
  • Loading branch information
masayag committed Jun 7, 2023
1 parent 1eada4a commit 80e0948
Showing 1 changed file with 13 additions and 139 deletions.
Original file line number Diff line number Diff line change
@@ -1,23 +1,13 @@
package com.redhat.parodos.sdkutils;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.Callable;

import com.google.common.base.Strings;
import com.redhat.parodos.notification.sdk.api.ApiCallback;
import com.redhat.parodos.notification.sdk.api.ApiClient;
import com.redhat.parodos.notification.sdk.api.ApiException;
import com.redhat.parodos.notification.sdk.api.Configuration;
import com.redhat.parodos.notification.sdk.api.NotificationRecordApi;
import com.redhat.parodos.notification.sdk.model.PageNotificationRecordResponseDTO;
import com.redhat.parodos.workflow.utils.CredUtils;
import lombok.Data;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;

import org.springframework.core.env.MissingRequiredPropertiesException;
Expand All @@ -37,8 +27,7 @@ private NotificationServiceUtils() {
* in environment variables.
* @return the ApiClient
*/
public static ApiClient getParodosAPiClient()
throws ApiException, MissingRequiredPropertiesException, InterruptedException {
public static ApiClient getParodosAPiClient() throws MissingRequiredPropertiesException {
ApiClient apiClient = Configuration.getDefaultApiClient();
String serverIp = Optional.ofNullable(System.getenv("NOTIFICATION_SERVICE_HOST")).orElse("localhost");
String serverPort = Optional.ofNullable(System.getenv("SERVER_PORT")).orElse("8081");
Expand All @@ -63,138 +52,23 @@ public static ApiClient getParodosAPiClient()

/**
* Invokes @see
* com.redhat.parodos.notification.sdk.api.NotificationRecordApi#getNotificationsAsync(Pageable,
* String, String, ApiCallback<PageNotificationRecordResponseDTO>) and retries for 60
* seconds.
* com.redhat.parodos.notification.sdk.api.NotificationRecordApi#getNotifications(Pageable,
* String, String) and retries for 120 seconds.
* @param apiclient the API Client
* @throws InterruptedException If the async call reaches the waiting timeout
* @throws ApiException If the API method invocation fails
*/
public static void waitNotificationStart(ApiClient apiclient) throws ApiException, InterruptedException {
public static void waitNotificationStart(ApiClient apiclient) {
NotificationRecordApi notificationRecordApi = new NotificationRecordApi(apiclient);
waitAsyncResponse(new FuncExecutor<PageNotificationRecordResponseDTO>() {
@Override
public boolean check(PageNotificationRecordResponseDTO result, int statusCode) {
return statusCode != 200;
}
try (var executorService = new RetryExecutorService<Void>()) {
Callable<Void> task = () -> {
notificationRecordApi.getNotifications(0, 10, null, null, null);
return null;
};

@Override
public void execute(@NonNull ApiCallback<PageNotificationRecordResponseDTO> callback) throws ApiException {
notificationRecordApi.getNotificationsAsync(0, 10, null, null, null, callback);
}
});
}

/**
* Executes a @see FuncExecutor. Waits at most 60 seconds for a successful result of
* an async API invocation.
* @param f the @see FuncExecutor
* @param <T> the type of the function executor
* @return @see AsyncResult
* @throws ApiException if the api invocation fails
* @throws InterruptedException If the async call reaches the waiting timeout
*/
public static <T> T waitAsyncResponse(FuncExecutor<T> f) throws ApiException, InterruptedException {
AsyncResult<T> asyncResult = new AsyncResult<>();
Lock lock = new ReentrantLock();
Condition response = lock.newCondition();
ApiCallback<T> apiCallback = new ApiCallback<T>() {

@Override
public void onFailure(ApiException e, int statusCode, Map<String, List<String>> responseHeaders) {
log.info("onFailure {}", e.getMessage());
try {
f.execute(this);
}
catch (ApiException apie) {
asyncResult.setError(apie.getMessage());
signal();
}
}

@Override
public void onSuccess(T result, int statusCode, Map<String, List<String>> responseHeaders) {
if (f.check(result, statusCode)) {
try {
f.execute(this);
}
catch (ApiException apie) {
asyncResult.setError(apie.getMessage());
signal();
}
}
else {
asyncResult.setStatusCode(statusCode);
asyncResult.setResult(result);
asyncResult.setError(null);
signal();
}
}

@Override
public void onUploadProgress(long bytesWritten, long contentLength, boolean done) {
}

@Override
public void onDownloadProgress(long bytesRead, long contentLength, boolean done) {
}

private void signal() {
lock.lock();
try {
response.signal();
}
finally {
lock.unlock();
}
}
};
f.execute(apiCallback);
lock.lock();
try {
// should be more than enough
response.await(60, TimeUnit.SECONDS);
if (asyncResult.getError() != null) {
throw new ApiException(
"An error occurred while executing waitAsyncResponse: " + asyncResult.getError());
}
executorService.submitWithRetry(task);
}
finally {
lock.unlock();
catch (Exception e) {
throw new RuntimeException("Notification Record API is not up and running", e);
}
return asyncResult.getResult();
}

public interface FuncExecutor<T> {

/**
* Defines the @see ApiCallback to execute
* @param callback the
* @throws ApiException If the API callback invocation fails
*/
void execute(@NonNull ApiCallback<T> callback) throws ApiException;

/**
* Define when considering an ApiCallback result as successful.
* @param result the result to check
* @return {true} if it is necessary to continue monitoring the result, {false}
* when it's possible to stop the monitoring.
*/
default boolean check(T result, int statusCode) {
return true;
}

}

@Data
private static class AsyncResult<T> {

private String error;

T result;

int statusCode;

}

}

0 comments on commit 80e0948

Please sign in to comment.