Skip to content

Commit

Permalink
Merge pull request #2671 from HubSpot/watcher-fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
fusesource-ci committed Dec 17, 2020
2 parents 50fbea7 + 283716d commit c768aa4
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 65 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### 5.0-SNAPSHOT

#### Bugs
* Fix: Reliability improvements to watchers
* Fix #2592: ConcurrentModificationException in CRUD KubernetesMockServer
* Fix #2519: Generated schemas contains a valid meta-schema URI reference (`http://json-schema.org/draft-05/schema#`)
* Fix #2631: Handle null values when getting current context on OIDC interceptors
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ public void eventReceived(Action action, T resource) {
case DELETED:
if (condition.test(null)) {
future.complete(null);
} else {
future.completeExceptionally(new WatcherException("Unexpected deletion of watched resource, will never satisfy condition"));
}
break;
case ERROR:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,25 +98,16 @@ private void runWatch() {
String fieldQueryString = baseOperation.getFieldQueryParam();
String name = baseOperation.getName();

// for API groups we can use the name in the path rather than a fieldSelector
// which is more likely to work well for API Groups
if (name != null && name.length() > 0) {
if (baseOperation.isApiGroup()) {
httpUrlBuilder.addPathSegment(name);
} else {
if (fieldQueryString.length() > 0) {
fieldQueryString += ",";
}
fieldQueryString += "metadata.name=" + name;
if (fieldQueryString.length() > 0) {
fieldQueryString += ",";
}
fieldQueryString += "metadata.name=" + name;
}
if (Utils.isNotNullOrEmpty(fieldQueryString)) {
if (baseOperation.isApiGroup()) {
logger.warn("Ignoring field selector " + fieldQueryString + " on watch URI " + requestUrl + " as fieldSelector is not yet supported on API Groups APIs");
} else {
httpUrlBuilder.addQueryParameter("fieldSelector", fieldQueryString);
}
httpUrlBuilder.addQueryParameter("fieldSelector", fieldQueryString);
}

listOptions.setResourceVersion(resourceVersion.get());
HttpClientUtils.appendListOptionParams(httpUrlBuilder, listOptions);

Expand Down Expand Up @@ -244,6 +235,8 @@ public void onMessage(WebSocket webSocket, String message) {
logger.error("Received wrong type of object for watch", e);
} catch (IllegalArgumentException e) {
logger.error("Invalid event type", e);
} catch (Throwable e) {
logger.error("Unhandled exception encountered in watcher event handler", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,19 @@

package io.fabric8.kubernetes.client.dsl.internal;

import static java.net.HttpURLConnection.HTTP_GONE;

import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.List;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.KubernetesResource;
import io.fabric8.kubernetes.api.model.KubernetesResourceList;
Expand All @@ -40,18 +53,6 @@
import okhttp3.Response;
import okhttp3.logging.HttpLoggingInterceptor;
import okio.BufferedSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.List;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static java.net.HttpURLConnection.HTTP_GONE;

public class WatchHTTPManager<T extends HasMetadata, L extends KubernetesResourceList<T>> extends AbstractWatchManager<T> {
private static final Logger logger = LoggerFactory.getLogger(WatchHTTPManager.class);
Expand Down Expand Up @@ -101,8 +102,6 @@ public WatchHTTPManager(final OkHttpClient client,
}

private void runWatch() {
logger.debug("Watching via HTTP GET ... {}", this);

HttpUrl.Builder httpUrlBuilder = HttpUrl.get(requestUrl).newBuilder();
String labelQueryParam = baseOperation.getLabelQueryParam();
if (Utils.isNotNullOrEmpty(labelQueryParam)) {
Expand All @@ -112,17 +111,11 @@ private void runWatch() {
String fieldQueryString = baseOperation.getFieldQueryParam();
String name = baseOperation.getName();

// for API groups we can use the name in the path rather than a fieldSelector
// which is more likely to work well for API Groups
if (name != null && name.length() > 0) {
if (baseOperation.isApiGroup()) {
httpUrlBuilder.addPathSegment(name);
} else {
if (fieldQueryString.length() > 0) {
fieldQueryString += ",";
}
fieldQueryString += "metadata.name=" + name;
if (fieldQueryString.length() > 0) {
fieldQueryString += ",";
}
fieldQueryString += "metadata.name=" + name;
}

if (Utils.isNotNullOrEmpty(fieldQueryString)) {
Expand All @@ -133,35 +126,43 @@ private void runWatch() {
HttpClientUtils.appendListOptionParams(httpUrlBuilder, listOptions);
String origin = requestUrl.getProtocol() + "://" + requestUrl.getHost();
if (requestUrl.getPort() != -1) {
origin += ":" + requestUrl.getPort();
origin += ":" + requestUrl.getPort();
}

HttpUrl url = httpUrlBuilder.build();

logger.debug("Watching via HTTP GET {}", url);

final Request request = new Request.Builder()
.get()
.url(httpUrlBuilder.build())
.url(url)
.addHeader("Origin", origin)
.build();

clonedClient.newCall(request).enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
logger.info("Watch connection failed. reason: {}", e.getMessage());
scheduleReconnect();
scheduleReconnect(true);
}

@Override
public void onResponse(Call call, Response response) throws IOException {
if (!response.isSuccessful()) {
throw OperationSupport.requestFailure(request,
OperationSupport.createStatus(response.code(), response.message()));
onStatus(OperationSupport.createStatus(response.code(), response.message()));
}

boolean shouldBackoff = true;

try {
BufferedSource source = response.body().source();
while (!source.exhausted()) {
String message = source.readUtf8LineStrict();
onMessage(message);
}
// the normal operation of a long poll get is to return once a response is available.
// in that case we should reconnect immediately.
shouldBackoff = false;
} catch (Exception e) {
logger.info("Watch terminated unexpectedly. reason: {}", e.getMessage());
}
Expand All @@ -171,12 +172,13 @@ public void onResponse(Call call, Response response) throws IOException {
if (response != null) {
response.body().close();
}
scheduleReconnect();

scheduleReconnect(shouldBackoff);
}
});
}

private void scheduleReconnect() {
private void scheduleReconnect(boolean shouldBackoff) {
if (forceClosed.get()) {
logger.warn("Ignoring error for already closed/closing connection");
return;
Expand All @@ -188,6 +190,7 @@ private void scheduleReconnect() {
}

logger.debug("Submitting reconnect task to the executor");

// make sure that whichever thread calls this method, the tasks are
// performed serially in the executor.
submit(() -> {
Expand All @@ -199,6 +202,11 @@ private void scheduleReconnect() {
// actual reconnect only after the back-off time has passed, without
// blocking the thread
logger.debug("Scheduling reconnect task");

long delay = shouldBackoff
? nextReconnectInterval()
: 0;

schedule(() -> {
try {
WatchHTTPManager.this.runWatch();
Expand All @@ -209,7 +217,7 @@ private void scheduleReconnect() {
close();
watcher.onClose(new WatcherException("Unhandled exception in reconnect attempt", e));
}
}, nextReconnectInterval(), TimeUnit.MILLISECONDS);
}, delay, TimeUnit.MILLISECONDS);
} catch (RejectedExecutionException e) {
// This is a standard exception if we close the scheduler. We should not print it
if (!forceClosed.get()) {
Expand Down Expand Up @@ -243,18 +251,7 @@ public void onMessage(String messageSource) {
}
}
} else if (object instanceof Status) {
Status status = (Status) object;
// The resource version no longer exists - this has to be handled by the caller.
if (status.getCode() == HTTP_GONE) {
// exception
// shut down executor, etc.
close();
watcher.onClose(new WatcherException(status.getMessage(), new KubernetesClientException(status)));
return;
}

watcher.eventReceived(Action.ERROR, null);
logger.error("Error received: {}", status);
onStatus((Status) object);
} else {
logger.error("Unknown message received: {}", messageSource);
}
Expand All @@ -265,6 +262,21 @@ public void onMessage(String messageSource) {
}
}

private void onStatus(Status status) {
// The resource version no longer exists - this has to be handled by the caller.
if (status.getCode() == HTTP_GONE) {
// exception
// shut down executor, etc.
close();
watcher.onClose(new WatcherException(status.getMessage(), new KubernetesClientException(status)));
return;
}

watcher.eventReceived(Action.ERROR, null);
logger.error("Error received: {}", status.toString());
}


protected static WatchEvent readWatchEvent(String messageSource) {
WatchEvent event = Serialization.unmarshal(messageSource, WatchEvent.class);
KubernetesResource object = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ void itCompletesOnMatchAdded() throws Exception {
watcher.eventReceived(Action.ADDED, configMap);
assertTrue(watcher.getFuture().isDone());
assertEquals(watcher.getFuture().get(), configMap);
condition.isCalledWith(configMap);
assertTrue(condition.isCalledWith(configMap));
}

@Test
Expand All @@ -61,7 +61,7 @@ void itCompletesOnMatchModified() throws Exception {
watcher.eventReceived(Action.MODIFIED, configMap);
assertTrue(watcher.getFuture().isDone());
assertEquals(watcher.getFuture().get(), configMap);
condition.isCalledWith(configMap);
assertTrue(condition.isCalledWith(configMap));
}

@Test
Expand All @@ -71,7 +71,7 @@ void itCompletesOnMatchDeleted() throws Exception {
watcher.eventReceived(Action.DELETED, configMap);
assertTrue(watcher.getFuture().isDone());
assertNull(watcher.getFuture().get());
condition.isCalledWith(null);
assertTrue(condition.isCalledWith(null));
}

@Test
Expand All @@ -80,7 +80,7 @@ void itDoesNotCompleteOnNoMatchAdded() {
WaitForConditionWatcher<ConfigMap> watcher = new WaitForConditionWatcher<>(condition);
watcher.eventReceived(Action.ADDED, configMap);
assertFalse(watcher.getFuture().isDone());
condition.isCalledWith(configMap);
assertTrue(condition.isCalledWith(configMap));
}

@Test
Expand All @@ -89,16 +89,23 @@ void itDoesNotCompleteOnNoMatchModified() {
WaitForConditionWatcher<ConfigMap> watcher = new WaitForConditionWatcher<>(condition);
watcher.eventReceived(Action.MODIFIED, configMap);
assertFalse(watcher.getFuture().isDone());
condition.isCalledWith(configMap);
assertTrue(condition.isCalledWith(configMap));
}

@Test
void itDoesNotCompleteOnNoMatchDeleted() {
void itCompletesExceptionallyOnUnexpectedDeletion() throws Exception {
TrackingPredicate condition = condition(Objects::nonNull);
WaitForConditionWatcher<ConfigMap> watcher = new WaitForConditionWatcher<>(condition);
watcher.eventReceived(Action.DELETED, configMap);
assertFalse(watcher.getFuture().isDone());
condition.isCalledWith(null);
assertTrue(watcher.getFuture().isDone());
try {
watcher.getFuture().get();
fail("should have thrown exception");
} catch (ExecutionException e) {
assertEquals(WatcherException.class, e.getCause().getClass());
assertEquals("Unexpected deletion of watched resource, will never satisfy condition", e.getCause().getMessage());
}
assertTrue(condition.isCalledWith(null));
}

@Test
Expand Down

0 comments on commit c768aa4

Please sign in to comment.