Skip to content

Commit

Permalink
Use ConfigMapList's resourceVersion for watching (AtomicLong)
Browse files Browse the repository at this point in the history
Rationale: avoid ERROR notifications during rolling upgrades.

Using the greatest `resourceVersion` of all the `ConfigMap`s returned
within the `ConfigMapList` works as expected for *fresh* deployments.
But, when performing a *rolling upgrade* (and depending on the upgrade
strategy), the watcher happens to frequently stop after having received
an `ERROR` notification:
>>>
[ERROR] [KubernetesConfigMapWatcher] [] Kubernetes API returned an error
for a ConfigMap watch event: ConfigMapWatchEvent{type=ERROR,
object=ConfigMap{metadata=Metadata{name='null', namespace='null',
uid='null', labels={}, resourceVersion=null}, data={}}}
>>>
What's actually streamed in that case is a `Status` object such as:
```json
{
  "type": "ERROR",
  "object": {
    "kind": "Status",
    "apiVersion": "v1",
    "metadata": {},
    "status": "Expired",
    "message": "too old resource version: 123 (456)",
    "reason": "Gone",
    "code": 410
  }
}
```

A few references:
* ManageIQ/kubeclient#452
* https://www.baeldung.com/java-kubernetes-watch#1-resource-versions

It's possible to recover by adding some logic to reinstall the watcher
starting with the newly advertised `resourceVersion`, but this may be
avoided at all by starting the initial watch at the `resourceVersion`
of the `ConfigMapList` itself : this one won't expire.

The proposed implementation consists in storing last received
`resourceVersion` as a side effect of `getPropertySourcesFromConfigMaps`
(inspired by how `PropertySource`s get cached through
`configMapAsPropertySource`) and later using it when installing the
watcher.
  • Loading branch information
rdesgroppes committed May 17, 2021
1 parent ab49598 commit 68d5b0e
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.micronaut.kubernetes.client.v1.configmaps;

import io.micronaut.core.annotation.Introspected;
import io.micronaut.kubernetes.client.v1.KubernetesObject;

import java.util.Collections;
import java.util.List;
Expand All @@ -28,7 +29,7 @@
* @since 1.0.0
*/
@Introspected
public class ConfigMapList {
public class ConfigMapList extends KubernetesObject {

private List<ConfigMap> items;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import java.util.Map;
import java.util.concurrent.ExecutorService;

import static io.micronaut.kubernetes.configuration.KubernetesConfigurationClient.KUBERNETES_CONFIG_MAP_NAME_SUFFIX;
import static io.micronaut.kubernetes.configuration.KubernetesConfigurationClient.getLastConfigMapListResourceVersion;
import static io.micronaut.kubernetes.util.KubernetesUtils.computePodLabelSelector;

/**
Expand Down Expand Up @@ -80,7 +80,7 @@ public KubernetesConfigMapWatcher(Environment environment, KubernetesClient clie
@SuppressWarnings("ResultOfMethodCallIgnored")
@Override
public void onApplicationEvent(ServiceReadyEvent event) {
long lastResourceVersion = computeLastResourceVersion();
long lastResourceVersion = getLastConfigMapListResourceVersion();
Map<String, String> labels = configuration.getConfigMaps().getLabels();
Flowable<String> singleLabelSelector = computePodLabelSelector(client, configuration.getConfigMaps().getPodLabels(), configuration.getNamespace(), labels);

Expand All @@ -100,23 +100,6 @@ public void onApplicationEvent(ServiceReadyEvent event) {
.subscribe(this::processEvent);
}

private long computeLastResourceVersion() {
long lastResourceVersion = this.environment
.getPropertySources()
.stream()
.filter(propertySource -> propertySource.getName().endsWith(KUBERNETES_CONFIG_MAP_NAME_SUFFIX))
.map(propertySource -> propertySource.get(KubernetesConfigurationClient.CONFIG_MAP_RESOURCE_VERSION))
.map(o -> Long.parseLong(o.toString()))
.max(Long::compareTo)
.orElse(0L);

if (LOG.isDebugEnabled()) {
LOG.debug("Latest resourceVersion is: {}", lastResourceVersion);
}

return lastResourceVersion;
}

private void processEvent(ConfigMapWatchEvent event) {
switch (event.getType()) {
case ADDED:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.nio.file.Paths;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;

import static io.micronaut.kubernetes.client.v1.secrets.Secret.OPAQUE_SECRET_TYPE;
import static io.micronaut.kubernetes.util.KubernetesUtils.computePodLabelSelector;
Expand All @@ -66,6 +67,7 @@ public class KubernetesConfigurationClient implements ConfigurationClient {
private static final Logger LOG = LoggerFactory.getLogger(KubernetesConfigurationClient.class);

private static Map<String, PropertySource> propertySources = new ConcurrentHashMap<>();
private static AtomicLong lastConfigMapListResourceVersion = new AtomicLong();

private final KubernetesClient client;
private final KubernetesConfiguration configuration;
Expand Down Expand Up @@ -140,6 +142,13 @@ static Map<String, PropertySource> getPropertySourceCache() {
return propertySources;
}

/**
* @return the last config map list resource version.
*/
static long getLastConfigMapListResourceVersion() {
return lastConfigMapListResourceVersion.get();
}

private Flowable<PropertySource> getPropertySourcesFromConfigMaps() {
Predicate<KubernetesObject> includesFilter = KubernetesUtils.getIncludesFilter(configuration.getConfigMaps().getIncludes());
Predicate<KubernetesObject> excludesFilter = KubernetesUtils.getExcludesFilter(configuration.getConfigMaps().getExcludes());
Expand All @@ -150,9 +159,11 @@ private Flowable<PropertySource> getPropertySourcesFromConfigMaps() {
.doOnError(throwable -> LOG.error("Error while trying to list all Kubernetes ConfigMaps in the namespace [" + configuration.getNamespace() + "]", throwable))
.onErrorReturn(throwable -> new ConfigMapList())
.doOnNext(configMapList -> {
final String resourceVersion = configMapList.getMetadata().getResourceVersion();
if (LOG.isDebugEnabled()) {
LOG.debug("Found {} config maps. Applying includes/excludes filters (if any)", configMapList.getItems().size());
LOG.debug("Found {} config maps in list version {}. Applying includes/excludes filters (if any)", configMapList.getItems().size(), resourceVersion);
}
lastConfigMapListResourceVersion.set(Long.parseLong(resourceVersion));
})
.flatMapIterable(ConfigMapList::getItems)
.filter(includesFilter)
Expand Down

0 comments on commit 68d5b0e

Please sign in to comment.