diff --git a/CHANGELOG.md b/CHANGELOG.md index 1e474e1132a..00d52d41c60 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ### 5.0-SNAPSHOT #### Bugs +* Fix: Reliability improvements to watchers * Fix #2592: ConcurrentModificationException in CRUD KubernetesMockServer * Fix #2519: Generated schemas contains a valid meta-schema URI reference (`http://json-schema.org/draft-05/schema#`) * Fix #2631: Handle null values when getting current context on OIDC interceptors diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/base/WaitForConditionWatcher.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/base/WaitForConditionWatcher.java index a0ea87880c1..675a88ca52a 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/base/WaitForConditionWatcher.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/base/WaitForConditionWatcher.java @@ -48,6 +48,8 @@ public void eventReceived(Action action, T resource) { case DELETED: if (condition.test(null)) { future.complete(null); + } else { + future.completeExceptionally(new WatcherException("Unexpected deletion of watched resource, will never satisfy condition")); } break; case ERROR: diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java index 2062aef63ec..eb172cd4607 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java @@ -98,25 +98,16 @@ private void runWatch() { String fieldQueryString = baseOperation.getFieldQueryParam(); String name = baseOperation.getName(); - // for API groups we can use the name in the path rather than a fieldSelector - // which is more likely to work well for API Groups if (name != null && name.length() > 0) { - if (baseOperation.isApiGroup()) { - httpUrlBuilder.addPathSegment(name); - } else { - if (fieldQueryString.length() > 0) { - fieldQueryString += ","; - } - fieldQueryString += "metadata.name=" + name; + if (fieldQueryString.length() > 0) { + fieldQueryString += ","; } + fieldQueryString += "metadata.name=" + name; } if (Utils.isNotNullOrEmpty(fieldQueryString)) { - if (baseOperation.isApiGroup()) { - logger.warn("Ignoring field selector " + fieldQueryString + " on watch URI " + requestUrl + " as fieldSelector is not yet supported on API Groups APIs"); - } else { - httpUrlBuilder.addQueryParameter("fieldSelector", fieldQueryString); - } + httpUrlBuilder.addQueryParameter("fieldSelector", fieldQueryString); } + listOptions.setResourceVersion(resourceVersion.get()); HttpClientUtils.appendListOptionParams(httpUrlBuilder, listOptions); @@ -244,6 +235,8 @@ public void onMessage(WebSocket webSocket, String message) { logger.error("Received wrong type of object for watch", e); } catch (IllegalArgumentException e) { logger.error("Invalid event type", e); + } catch (Throwable e) { + logger.error("Unhandled exception encountered in watcher event handler", e); } } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java index 03cda81a0eb..1a7acb9be38 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java @@ -16,6 +16,19 @@ package io.fabric8.kubernetes.client.dsl.internal; +import static java.net.HttpURLConnection.HTTP_GONE; + +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.List; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.api.model.KubernetesResource; import io.fabric8.kubernetes.api.model.KubernetesResourceList; @@ -40,18 +53,6 @@ import okhttp3.Response; import okhttp3.logging.HttpLoggingInterceptor; import okio.BufferedSource; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.net.MalformedURLException; -import java.net.URL; -import java.util.List; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -import static java.net.HttpURLConnection.HTTP_GONE; public class WatchHTTPManager> extends AbstractWatchManager { private static final Logger logger = LoggerFactory.getLogger(WatchHTTPManager.class); @@ -101,8 +102,6 @@ public WatchHTTPManager(final OkHttpClient client, } private void runWatch() { - logger.debug("Watching via HTTP GET ... {}", this); - HttpUrl.Builder httpUrlBuilder = HttpUrl.get(requestUrl).newBuilder(); String labelQueryParam = baseOperation.getLabelQueryParam(); if (Utils.isNotNullOrEmpty(labelQueryParam)) { @@ -112,17 +111,11 @@ private void runWatch() { String fieldQueryString = baseOperation.getFieldQueryParam(); String name = baseOperation.getName(); - // for API groups we can use the name in the path rather than a fieldSelector - // which is more likely to work well for API Groups if (name != null && name.length() > 0) { - if (baseOperation.isApiGroup()) { - httpUrlBuilder.addPathSegment(name); - } else { - if (fieldQueryString.length() > 0) { - fieldQueryString += ","; - } - fieldQueryString += "metadata.name=" + name; + if (fieldQueryString.length() > 0) { + fieldQueryString += ","; } + fieldQueryString += "metadata.name=" + name; } if (Utils.isNotNullOrEmpty(fieldQueryString)) { @@ -133,12 +126,16 @@ private void runWatch() { HttpClientUtils.appendListOptionParams(httpUrlBuilder, listOptions); String origin = requestUrl.getProtocol() + "://" + requestUrl.getHost(); if (requestUrl.getPort() != -1) { - origin += ":" + requestUrl.getPort(); + origin += ":" + requestUrl.getPort(); } + HttpUrl url = httpUrlBuilder.build(); + + logger.debug("Watching via HTTP GET {}", url); + final Request request = new Request.Builder() .get() - .url(httpUrlBuilder.build()) + .url(url) .addHeader("Origin", origin) .build(); @@ -146,22 +143,26 @@ private void runWatch() { @Override public void onFailure(Call call, IOException e) { logger.info("Watch connection failed. reason: {}", e.getMessage()); - scheduleReconnect(); + scheduleReconnect(true); } @Override public void onResponse(Call call, Response response) throws IOException { if (!response.isSuccessful()) { - throw OperationSupport.requestFailure(request, - OperationSupport.createStatus(response.code(), response.message())); + onStatus(OperationSupport.createStatus(response.code(), response.message())); } + boolean shouldBackoff = true; + try { BufferedSource source = response.body().source(); while (!source.exhausted()) { String message = source.readUtf8LineStrict(); onMessage(message); } + // the normal operation of a long poll get is to return once a response is available. + // in that case we should reconnect immediately. + shouldBackoff = false; } catch (Exception e) { logger.info("Watch terminated unexpectedly. reason: {}", e.getMessage()); } @@ -171,12 +172,13 @@ public void onResponse(Call call, Response response) throws IOException { if (response != null) { response.body().close(); } - scheduleReconnect(); + + scheduleReconnect(shouldBackoff); } }); } - private void scheduleReconnect() { + private void scheduleReconnect(boolean shouldBackoff) { if (forceClosed.get()) { logger.warn("Ignoring error for already closed/closing connection"); return; @@ -188,6 +190,7 @@ private void scheduleReconnect() { } logger.debug("Submitting reconnect task to the executor"); + // make sure that whichever thread calls this method, the tasks are // performed serially in the executor. submit(() -> { @@ -199,6 +202,11 @@ private void scheduleReconnect() { // actual reconnect only after the back-off time has passed, without // blocking the thread logger.debug("Scheduling reconnect task"); + + long delay = shouldBackoff + ? nextReconnectInterval() + : 0; + schedule(() -> { try { WatchHTTPManager.this.runWatch(); @@ -209,7 +217,7 @@ private void scheduleReconnect() { close(); watcher.onClose(new WatcherException("Unhandled exception in reconnect attempt", e)); } - }, nextReconnectInterval(), TimeUnit.MILLISECONDS); + }, delay, TimeUnit.MILLISECONDS); } catch (RejectedExecutionException e) { // This is a standard exception if we close the scheduler. We should not print it if (!forceClosed.get()) { @@ -243,18 +251,7 @@ public void onMessage(String messageSource) { } } } else if (object instanceof Status) { - Status status = (Status) object; - // The resource version no longer exists - this has to be handled by the caller. - if (status.getCode() == HTTP_GONE) { - // exception - // shut down executor, etc. - close(); - watcher.onClose(new WatcherException(status.getMessage(), new KubernetesClientException(status))); - return; - } - - watcher.eventReceived(Action.ERROR, null); - logger.error("Error received: {}", status); + onStatus((Status) object); } else { logger.error("Unknown message received: {}", messageSource); } @@ -265,6 +262,21 @@ public void onMessage(String messageSource) { } } + private void onStatus(Status status) { + // The resource version no longer exists - this has to be handled by the caller. + if (status.getCode() == HTTP_GONE) { + // exception + // shut down executor, etc. + close(); + watcher.onClose(new WatcherException(status.getMessage(), new KubernetesClientException(status))); + return; + } + + watcher.eventReceived(Action.ERROR, null); + logger.error("Error received: {}", status.toString()); + } + + protected static WatchEvent readWatchEvent(String messageSource) { WatchEvent event = Serialization.unmarshal(messageSource, WatchEvent.class); KubernetesResource object = null; diff --git a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/base/WaitForConditionWatcherTest.java b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/base/WaitForConditionWatcherTest.java index af0d2afdb8c..f3dfb36e16f 100644 --- a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/base/WaitForConditionWatcherTest.java +++ b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/base/WaitForConditionWatcherTest.java @@ -51,7 +51,7 @@ void itCompletesOnMatchAdded() throws Exception { watcher.eventReceived(Action.ADDED, configMap); assertTrue(watcher.getFuture().isDone()); assertEquals(watcher.getFuture().get(), configMap); - condition.isCalledWith(configMap); + assertTrue(condition.isCalledWith(configMap)); } @Test @@ -61,7 +61,7 @@ void itCompletesOnMatchModified() throws Exception { watcher.eventReceived(Action.MODIFIED, configMap); assertTrue(watcher.getFuture().isDone()); assertEquals(watcher.getFuture().get(), configMap); - condition.isCalledWith(configMap); + assertTrue(condition.isCalledWith(configMap)); } @Test @@ -71,7 +71,7 @@ void itCompletesOnMatchDeleted() throws Exception { watcher.eventReceived(Action.DELETED, configMap); assertTrue(watcher.getFuture().isDone()); assertNull(watcher.getFuture().get()); - condition.isCalledWith(null); + assertTrue(condition.isCalledWith(null)); } @Test @@ -80,7 +80,7 @@ void itDoesNotCompleteOnNoMatchAdded() { WaitForConditionWatcher watcher = new WaitForConditionWatcher<>(condition); watcher.eventReceived(Action.ADDED, configMap); assertFalse(watcher.getFuture().isDone()); - condition.isCalledWith(configMap); + assertTrue(condition.isCalledWith(configMap)); } @Test @@ -89,16 +89,23 @@ void itDoesNotCompleteOnNoMatchModified() { WaitForConditionWatcher watcher = new WaitForConditionWatcher<>(condition); watcher.eventReceived(Action.MODIFIED, configMap); assertFalse(watcher.getFuture().isDone()); - condition.isCalledWith(configMap); + assertTrue(condition.isCalledWith(configMap)); } @Test - void itDoesNotCompleteOnNoMatchDeleted() { + void itCompletesExceptionallyOnUnexpectedDeletion() throws Exception { TrackingPredicate condition = condition(Objects::nonNull); WaitForConditionWatcher watcher = new WaitForConditionWatcher<>(condition); watcher.eventReceived(Action.DELETED, configMap); - assertFalse(watcher.getFuture().isDone()); - condition.isCalledWith(null); + assertTrue(watcher.getFuture().isDone()); + try { + watcher.getFuture().get(); + fail("should have thrown exception"); + } catch (ExecutionException e) { + assertEquals(WatcherException.class, e.getCause().getClass()); + assertEquals("Unexpected deletion of watched resource, will never satisfy condition", e.getCause().getMessage()); + } + assertTrue(condition.isCalledWith(null)); } @Test