Skip to content

Commit

Permalink
[FLINK-35183] MinorVersion metric for tracking applications
Browse files Browse the repository at this point in the history
  • Loading branch information
mbalassi committed May 9, 2024
1 parent 04829bf commit 84d9b74
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,17 @@ public class FlinkDeploymentMetrics implements CustomResourceMetrics<FlinkDeploy
// map(namespace, map(version, set(deployment)))
private final Map<String, Map<String, Set<String>>> deploymentFlinkVersions =
new ConcurrentHashMap<>();
// map(namespace, map(version, set(deployment)))
private final Map<String, Map<String, Set<String>>> deploymentFlinkMinorVersions =
new ConcurrentHashMap<>();
// map(namespace, map(deployment, cpu))
private final Map<String, Map<String, Double>> deploymentCpuUsage = new ConcurrentHashMap<>();
// map(namespace, map(deployment, memory))
private final Map<String, Map<String, Long>> deploymentMemoryUsage = new ConcurrentHashMap<>();
public static final String FLINK_VERSION_GROUP_NAME = "FlinkVersion";
public static final String FLINK_MINOR_VERSION_GROUP_NAME = "FlinkMinorVersion";
public static final String UNKNOWN_VERSION = "UNKNOWN";
public static final String MALFORMED_MINOR_VERSION = "MALFORMED";
public static final String STATUS_GROUP_NAME = "JmDeploymentStatus";
public static final String RESOURCE_USAGE_GROUP_NAME = "ResourceUsage";
public static final String COUNTER_NAME = "Count";
Expand Down Expand Up @@ -77,12 +83,13 @@ public void onUpdate(FlinkDeployment flinkApp) {
.get(flinkApp.getStatus().getJobManagerDeploymentStatus())
.add(deploymentName);

// Full runtime version queried from the JobManager REST API
var flinkVersion =
flinkApp.getStatus()
.getClusterInfo()
.getOrDefault(DashboardConfiguration.FIELD_NAME_FLINK_VERSION, "");
if (StringUtils.isNullOrWhitespaceOnly(flinkVersion)) {
flinkVersion = "UNKNOWN";
flinkVersion = UNKNOWN_VERSION;
}
deploymentFlinkVersions
.computeIfAbsent(namespace, ns -> new ConcurrentHashMap<>())
Expand All @@ -94,6 +101,22 @@ public void onUpdate(FlinkDeployment flinkApp) {
})
.add(deploymentName);

// Minor version computed from the above
var subVersions = flinkVersion.split("\\.");
String minorVersion = MALFORMED_MINOR_VERSION;
if (subVersions.length >= 2) {
minorVersion = subVersions[0].concat(".").concat(subVersions[1]);
}
deploymentFlinkMinorVersions
.computeIfAbsent(namespace, ns -> new ConcurrentHashMap<>())
.computeIfAbsent(
minorVersion,
v -> {
initFlinkMinorVersions(namespace, v);
return ConcurrentHashMap.newKeySet();
})
.add(deploymentName);

var totalCpu =
NumberUtils.toDouble(
clusterInfo.getOrDefault(AbstractFlinkService.FIELD_NAME_TOTAL_CPU, "0"));
Expand Down Expand Up @@ -133,6 +156,12 @@ public void onRemove(FlinkDeployment flinkApp) {
if (deploymentFlinkVersions.containsKey(namespace)) {
deploymentFlinkVersions.get(namespace).values().forEach(names -> names.remove(name));
}
if (deploymentFlinkMinorVersions.containsKey(namespace)) {
deploymentFlinkMinorVersions
.get(namespace)
.values()
.forEach(names -> names.remove(name));
}
if (deploymentCpuUsage.containsKey(namespace)) {
deploymentCpuUsage.get(namespace).remove(name);
}
Expand Down Expand Up @@ -165,13 +194,21 @@ private void initNamespaceStatusCounts(String ns) {
private void initFlinkVersions(String ns, String flinkVersion) {
parentMetricGroup
.createResourceNamespaceGroup(configuration, FlinkDeployment.class, ns)
.addGroup(FLINK_VERSION_GROUP_NAME)
.addGroup(flinkVersion)
.addGroup(FLINK_VERSION_GROUP_NAME, flinkVersion)
.gauge(
COUNTER_NAME,
() -> deploymentFlinkVersions.get(ns).get(flinkVersion).size());
}

private void initFlinkMinorVersions(String ns, String minorVersion) {
parentMetricGroup
.createResourceNamespaceGroup(configuration, FlinkDeployment.class, ns)
.addGroup(FLINK_MINOR_VERSION_GROUP_NAME, minorVersion)
.gauge(
COUNTER_NAME,
() -> deploymentFlinkMinorVersions.get(ns).get(minorVersion).size());
}

private void initNamespaceCpuUsage(String ns) {
parentMetricGroup
.createResourceNamespaceGroup(configuration, FlinkDeployment.class, ns)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import static org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.COUNTER_NAME;
import static org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.CPU_NAME;
import static org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.FLINK_MINOR_VERSION_GROUP_NAME;
import static org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.FLINK_VERSION_GROUP_NAME;
import static org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.MEMORY_NAME;
import static org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.RESOURCE_USAGE_GROUP_NAME;
Expand Down Expand Up @@ -226,13 +227,14 @@ public void testMetricsMultiNamespace() {
public void testFlinkVersionMetrics() {
Map<String, String> ns1Values = new HashMap<>();
ns1Values.put("deployment1", " ");
ns1Values.put("deployment2", "1.14");
ns1Values.put("deployment3", "1.14");
ns1Values.put("deployment4", "1.15");
ns1Values.put("deployment5", "1.15");
ns1Values.put("deployment6", "1.16");
ns1Values.put("deployment7", "1.17");
ns1Values.put("deployment8", "1.14");
ns1Values.put("deployment2", "1.14.0");
ns1Values.put("deployment3", "1.14.0");
ns1Values.put("deployment4", "1.15.1");
ns1Values.put("deployment5", "1.15.1");
ns1Values.put("deployment6", "1.16.0");
ns1Values.put("deployment7", "1.17.1");
ns1Values.put("deployment8", "1.14.1");
ns1Values.put("deployment9", "test");

Map<String, String> ns2Values = new HashMap<>();
ns2Values.put("deployment1", "");
Expand All @@ -248,20 +250,41 @@ public void testFlinkVersionMetrics() {
var namespaceVersions = Map.of("ns1", ns1Values, "ns2", ns2Values);
var expected =
Map.of(
"ns1", Map.of("UNKNOWN", 1, "1.14", 3, "1.15", 2, "1.16", 1, "1.17", 1),
"ns1",
Map.of(
"UNKNOWN", 1, "1.14.0", 2, "1.14.1", 1, "1.15.1", 2,
"1.16.0", 1, "1.17.1", 1, "test", 1),
"ns2", Map.of("UNKNOWN", 2, "1.14", 2, "1.15", 3, "1.16", 1, "1.17", 1));
updateFlinkVersionsAndAssert(namespaceVersions, expected);
updateFlinkVersionsAndAssert(FLINK_VERSION_GROUP_NAME, namespaceVersions, expected);

// Remove invalid version and insert 1.14
namespaceVersions.get("ns1").put("deployment1", "1.14");
var expectedMinors =
Map.of(
"ns1", Map.of("MALFORMED", 2, "1.14", 3, "1.15", 2, "1.16", 1, "1.17", 1),
"ns2", Map.of("MALFORMED", 2, "1.14", 2, "1.15", 3, "1.16", 1, "1.17", 1));
updateFlinkVersionsAndAssert(
FLINK_MINOR_VERSION_GROUP_NAME, namespaceVersions, expectedMinors);

// Remove invalid version and insert 1.14.1
namespaceVersions.get("ns1").put("deployment1", "1.14.1");
expected =
Map.of(
"ns1", Map.of("UNKNOWN", 0, "1.14", 4, "1.15", 2, "1.16", 1, "1.17", 1),
"ns1",
Map.of(
"1.14.0", 2, "1.14.1", 2, "1.15.1", 2, "1.16.0", 1,
"1.17.1", 1, "test", 1),
"ns2", Map.of("UNKNOWN", 2, "1.14", 2, "1.15", 3, "1.16", 1, "1.17", 1));
updateFlinkVersionsAndAssert(namespaceVersions, expected);
updateFlinkVersionsAndAssert(FLINK_VERSION_GROUP_NAME, namespaceVersions, expected);

expectedMinors =
Map.of(
"ns1", Map.of("MALFORMED", 1, "1.14", 4, "1.15", 2, "1.16", 1, "1.17", 1),
"ns2", Map.of("MALFORMED", 2, "1.14", 2, "1.15", 3, "1.16", 1, "1.17", 1));
updateFlinkVersionsAndAssert(
FLINK_MINOR_VERSION_GROUP_NAME, namespaceVersions, expectedMinors);
}

private void updateFlinkVersionsAndAssert(
String metricGroup,
Map<String, Map<String, String>> namespaceVersions,
Map<String, Map<String, Integer>> expected) {
for (var namespaceEntry : namespaceVersions.entrySet()) {
Expand Down Expand Up @@ -291,7 +314,7 @@ private void updateFlinkVersionsAndAssert(
listener.getNamespaceMetricId(
FlinkDeployment.class,
namespaceName,
FLINK_VERSION_GROUP_NAME,
metricGroup,
version,
COUNTER_NAME);

Expand Down

0 comments on commit 84d9b74

Please sign in to comment.