diff --git a/sdk-utils/src/main/java/com/redhat/parodos/sdkutils/NotificationServiceUtils.java b/sdk-utils/src/main/java/com/redhat/parodos/sdkutils/NotificationServiceUtils.java index c00609d6e..0a9e94a1c 100644 --- a/sdk-utils/src/main/java/com/redhat/parodos/sdkutils/NotificationServiceUtils.java +++ b/sdk-utils/src/main/java/com/redhat/parodos/sdkutils/NotificationServiceUtils.java @@ -1,23 +1,13 @@ package com.redhat.parodos.sdkutils; -import java.util.List; -import java.util.Map; import java.util.Optional; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.Callable; import com.google.common.base.Strings; -import com.redhat.parodos.notification.sdk.api.ApiCallback; import com.redhat.parodos.notification.sdk.api.ApiClient; -import com.redhat.parodos.notification.sdk.api.ApiException; import com.redhat.parodos.notification.sdk.api.Configuration; import com.redhat.parodos.notification.sdk.api.NotificationRecordApi; -import com.redhat.parodos.notification.sdk.model.PageNotificationRecordResponseDTO; import com.redhat.parodos.workflow.utils.CredUtils; -import lombok.Data; -import lombok.NonNull; import lombok.extern.slf4j.Slf4j; import org.springframework.core.env.MissingRequiredPropertiesException; @@ -37,8 +27,7 @@ private NotificationServiceUtils() { * in environment variables. * @return the ApiClient */ - public static ApiClient getParodosAPiClient() - throws ApiException, MissingRequiredPropertiesException, InterruptedException { + public static ApiClient getParodosAPiClient() throws MissingRequiredPropertiesException { ApiClient apiClient = Configuration.getDefaultApiClient(); String serverIp = Optional.ofNullable(System.getenv("NOTIFICATION_SERVICE_HOST")).orElse("localhost"); String serverPort = Optional.ofNullable(System.getenv("SERVER_PORT")).orElse("8081"); @@ -63,138 +52,23 @@ public static ApiClient getParodosAPiClient() /** * Invokes @see - * com.redhat.parodos.notification.sdk.api.NotificationRecordApi#getNotificationsAsync(Pageable, - * String, String, ApiCallback) and retries for 60 - * seconds. + * com.redhat.parodos.notification.sdk.api.NotificationRecordApi#getNotifications(Pageable, + * String, String) and retries for 120 seconds. * @param apiclient the API Client - * @throws InterruptedException If the async call reaches the waiting timeout - * @throws ApiException If the API method invocation fails */ - public static void waitNotificationStart(ApiClient apiclient) throws ApiException, InterruptedException { + public static void waitNotificationStart(ApiClient apiclient) { NotificationRecordApi notificationRecordApi = new NotificationRecordApi(apiclient); - waitAsyncResponse(new FuncExecutor() { - @Override - public boolean check(PageNotificationRecordResponseDTO result, int statusCode) { - return statusCode != 200; - } + try (var executorService = new RetryExecutorService()) { + Callable task = () -> { + notificationRecordApi.getNotifications(0, 10, null, null, null); + return null; + }; - @Override - public void execute(@NonNull ApiCallback callback) throws ApiException { - notificationRecordApi.getNotificationsAsync(0, 10, null, null, null, callback); - } - }); - } - - /** - * Executes a @see FuncExecutor. Waits at most 60 seconds for a successful result of - * an async API invocation. - * @param f the @see FuncExecutor - * @param the type of the function executor - * @return @see AsyncResult - * @throws ApiException if the api invocation fails - * @throws InterruptedException If the async call reaches the waiting timeout - */ - public static T waitAsyncResponse(FuncExecutor f) throws ApiException, InterruptedException { - AsyncResult asyncResult = new AsyncResult<>(); - Lock lock = new ReentrantLock(); - Condition response = lock.newCondition(); - ApiCallback apiCallback = new ApiCallback() { - - @Override - public void onFailure(ApiException e, int statusCode, Map> responseHeaders) { - log.info("onFailure {}", e.getMessage()); - try { - f.execute(this); - } - catch (ApiException apie) { - asyncResult.setError(apie.getMessage()); - signal(); - } - } - - @Override - public void onSuccess(T result, int statusCode, Map> responseHeaders) { - if (f.check(result, statusCode)) { - try { - f.execute(this); - } - catch (ApiException apie) { - asyncResult.setError(apie.getMessage()); - signal(); - } - } - else { - asyncResult.setStatusCode(statusCode); - asyncResult.setResult(result); - asyncResult.setError(null); - signal(); - } - } - - @Override - public void onUploadProgress(long bytesWritten, long contentLength, boolean done) { - } - - @Override - public void onDownloadProgress(long bytesRead, long contentLength, boolean done) { - } - - private void signal() { - lock.lock(); - try { - response.signal(); - } - finally { - lock.unlock(); - } - } - }; - f.execute(apiCallback); - lock.lock(); - try { - // should be more than enough - response.await(60, TimeUnit.SECONDS); - if (asyncResult.getError() != null) { - throw new ApiException( - "An error occurred while executing waitAsyncResponse: " + asyncResult.getError()); - } + executorService.submitWithRetry(task); } - finally { - lock.unlock(); + catch (Exception e) { + throw new RuntimeException("Notification Record API is not up and running", e); } - return asyncResult.getResult(); - } - - public interface FuncExecutor { - - /** - * Defines the @see ApiCallback to execute - * @param callback the - * @throws ApiException If the API callback invocation fails - */ - void execute(@NonNull ApiCallback callback) throws ApiException; - - /** - * Define when considering an ApiCallback result as successful. - * @param result the result to check - * @return {true} if it is necessary to continue monitoring the result, {false} - * when it's possible to stop the monitoring. - */ - default boolean check(T result, int statusCode) { - return true; - } - - } - - @Data - private static class AsyncResult { - - private String error; - - T result; - - int statusCode; - } } diff --git a/sdk-utils/src/main/java/com/redhat/parodos/sdkutils/RetryExecutorService.java b/sdk-utils/src/main/java/com/redhat/parodos/sdkutils/RetryExecutorService.java index 5e4233c1f..6be878fc3 100644 --- a/sdk-utils/src/main/java/com/redhat/parodos/sdkutils/RetryExecutorService.java +++ b/sdk-utils/src/main/java/com/redhat/parodos/sdkutils/RetryExecutorService.java @@ -1,18 +1,24 @@ package com.redhat.parodos.sdkutils; import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; public class RetryExecutorService implements AutoCloseable { - private final ExecutorService executor; + private static final int MAX_RETRY_TIME = 2 * 60 * 1000; // 2 minutes + + public static final int RETRY_DELAY = 5 * 1000; // 5 seconds + + private final ScheduledExecutorService scheduledExecutor; public RetryExecutorService() { - executor = Executors.newFixedThreadPool(1); + scheduledExecutor = Executors.newSingleThreadScheduledExecutor(); } /** @@ -22,7 +28,7 @@ public RetryExecutorService() { */ public T submitWithRetry(Callable task) { // @formatter:off - return submitWithRetry(task, () -> {}, () -> {}, 10 * 60 * 1000, 5000); + return submitWithRetry(task, () -> {}, () -> {}, MAX_RETRY_TIME, RETRY_DELAY); // @formatter:on } @@ -37,35 +43,25 @@ public T submitWithRetry(Callable task) { */ public T submitWithRetry(Callable task, Runnable onSuccess, Runnable onFailure, long maxRetryTime, long retryDelay) { - Future future = executor.submit(() -> { - long startTime = System.currentTimeMillis(); - long endTime = startTime + maxRetryTime; - - while (System.currentTimeMillis() < endTime) { - try { - T result = task.call(); - onSuccess.run(); - return result; // Success, no need to retry - } - catch (Exception e) { - // Task failed, invoke onFailure callback - onFailure.run(); + CompletableFuture future = new CompletableFuture<>(); + long startTime = System.currentTimeMillis(); + long endTime = startTime + maxRetryTime; - // Sleep for the retry delay - try { - // FIXME: This is a blocking call, we should use a non-blocking - // sleep - Thread.sleep(retryDelay); - } - catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - return null; // Interrupted, exit the task - } - } + ScheduledFuture scheduledFuture = scheduledExecutor.scheduleWithFixedDelay(() -> { + if (System.currentTimeMillis() >= endTime) { + future.completeExceptionally(new TimeoutException("Retry limit reached.")); + return; } - return null; // Retry limit reached - }); + try { + T result = task.call(); + onSuccess.run(); + future.complete(result); // Success, complete the future with the result + } + catch (Exception e) { + onFailure.run(); + } + }, 0, retryDelay, TimeUnit.MILLISECONDS); try { return future.get(); @@ -73,12 +69,15 @@ public T submitWithRetry(Callable task, Runnable onSuccess, Runnable onFailur catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e); } + finally { + scheduledFuture.cancel(false); + scheduledExecutor.shutdown(); + } } @Override public void close() throws Exception { - executor.shutdown(); - boolean awaited = executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS); + boolean awaited = scheduledExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS); if (!awaited) { throw new RuntimeException("Failed to await termination of executor service"); }