diff --git a/systemtest/src/main/java/io/strimzi/systemtest/templates/crd/KafkaTemplates.java b/systemtest/src/main/java/io/strimzi/systemtest/templates/crd/KafkaTemplates.java index 1be11ef83c..531be7bbb9 100644 --- a/systemtest/src/main/java/io/strimzi/systemtest/templates/crd/KafkaTemplates.java +++ b/systemtest/src/main/java/io/strimzi/systemtest/templates/crd/KafkaTemplates.java @@ -95,11 +95,18 @@ public static KafkaBuilder kafkaJBOD(String name, int kafkaReplicas, int zookeep } public static KafkaBuilder kafkaWithMetrics(String name, int kafkaReplicas, int zookeeperReplicas) { + return kafkaWithMetrics(name, kubeClient().getNamespace(), kafkaReplicas, zookeeperReplicas); + } + + public static KafkaBuilder kafkaWithMetrics(String name, String namespace, int kafkaReplicas, int zookeeperReplicas) { Kafka kafka = getKafkaFromYaml(PATH_TO_KAFKA_METRICS_CONFIG); ConfigMap metricsCm = TestUtils.configMapFromYaml(PATH_TO_KAFKA_METRICS_CONFIG, "kafka-metrics"); - KubeClusterResource.kubeClient().getClient().configMaps().inNamespace(kubeClient().getNamespace()).createOrReplace(metricsCm); + KubeClusterResource.kubeClient().getClient().configMaps().inNamespace(namespace).createOrReplace(metricsCm); return defaultKafka(kafka, name, kafkaReplicas, zookeeperReplicas) + .editOrNewMetadata() + .withNamespace(namespace) + .endMetadata() .editSpec() .withNewKafkaExporter() .endKafkaExporter() diff --git a/systemtest/src/test/java/io/strimzi/systemtest/metrics/MetricsST.java b/systemtest/src/test/java/io/strimzi/systemtest/metrics/MetricsST.java index 20f8d495a8..4223ddc17c 100644 --- a/systemtest/src/test/java/io/strimzi/systemtest/metrics/MetricsST.java +++ b/systemtest/src/test/java/io/strimzi/systemtest/metrics/MetricsST.java @@ -94,7 +94,8 @@ public class MetricsST extends AbstractST { private static final Logger LOGGER = LogManager.getLogger(MetricsST.class); - public static final String NAMESPACE = "metrics-cluster-test"; + public static final String FIRST_NAMESPACE = "first-metrics-cluster-test"; + public static final String SECOND_NAMESPACE = "second-metrics-cluster-test"; public static final String SECOND_CLUSTER = "second-kafka-cluster"; public static final String MIRROR_MAKER_CLUSTER = "mm2-cluster"; private static final String BRIDGE_CLUSTER = "my-bridge"; @@ -128,7 +129,7 @@ void testKafkaBrokersCount() { void testKafkaTopicPartitions() { Pattern topicPartitions = Pattern.compile("kafka_server_replicamanager_partitioncount ([\\d.][^\\n]+)", Pattern.CASE_INSENSITIVE); ArrayList values = MetricsUtils.collectSpecificMetric(topicPartitions, kafkaMetricsData); - assertThat("Topic partitions count doesn't match expected value", values.stream().mapToDouble(i -> i).sum(), is(424.0)); + assertThat("Topic partitions count doesn't match expected value", values.stream().mapToDouble(i -> i).sum(), is(423.0)); } @ParallelTest @@ -198,11 +199,11 @@ void testKafkaConnectIoNetwork() { @IsolatedTest @Tag(ACCEPTANCE) @Tag(INTERNAL_CLIENTS_USED) - void testKafkaExporterDataAfterExchange(ExtensionContext extensionContext) { + void testKafkaExporterDataAfterExchange() { InternalKafkaClient internalKafkaClient = new InternalKafkaClient.Builder() .withUsingPodName(kafkaClientsPodName) .withTopicName(topicName) - .withNamespaceName(NAMESPACE) + .withNamespaceName(FIRST_NAMESPACE) .withClusterName(metricsClusterName) .withMessageCount(5000) .withListenerName(Constants.PLAIN_LISTENER_DEFAULT_NAME) @@ -275,33 +276,33 @@ void testClusterOperatorMetrics(ExtensionContext extensionContext) { } assertCoMetricResources(Kafka.RESOURCE_KIND, 2); - assertCoMetricResourceState(Kafka.RESOURCE_KIND, metricsClusterName, 1, "none"); - assertCoMetricResourceState(Kafka.RESOURCE_KIND, SECOND_CLUSTER, 1, "none"); + assertCoMetricResourceState(Kafka.RESOURCE_KIND, metricsClusterName, FIRST_NAMESPACE, 1, "none"); + assertCoMetricResourceState(Kafka.RESOURCE_KIND, SECOND_CLUSTER, SECOND_NAMESPACE, 1, "none"); assertCoMetricResources(KafkaBridge.RESOURCE_KIND, 1); - assertCoMetricResourceState(KafkaBridge.RESOURCE_KIND, BRIDGE_CLUSTER, 1, "none"); + assertCoMetricResourceState(KafkaBridge.RESOURCE_KIND, BRIDGE_CLUSTER, FIRST_NAMESPACE, 1, "none"); assertCoMetricResources(KafkaConnect.RESOURCE_KIND, 1); - assertCoMetricResourceState(KafkaConnect.RESOURCE_KIND, metricsClusterName, 1, "none"); + assertCoMetricResourceState(KafkaConnect.RESOURCE_KIND, metricsClusterName, FIRST_NAMESPACE, 1, "none"); assertCoMetricResources(KafkaConnectS2I.RESOURCE_KIND, 0); - assertCoMetricResourceState(KafkaConnectS2I.RESOURCE_KIND, metricsClusterName, 0, "Both KafkaConnect and KafkaConnectS2I exist with the same name. " + + assertCoMetricResourceState(KafkaConnectS2I.RESOURCE_KIND, metricsClusterName, FIRST_NAMESPACE, 0, "Both KafkaConnect and KafkaConnectS2I exist with the same name. " + "KafkaConnect is older and will be used while this custom resource will be ignored."); assertCoMetricResources(KafkaMirrorMaker.RESOURCE_KIND, 0); - assertCoMetricResourceStateNotExists(KafkaMirrorMaker.RESOURCE_KIND, metricsClusterName); + assertCoMetricResourceStateNotExists(KafkaMirrorMaker.RESOURCE_KIND, FIRST_NAMESPACE, metricsClusterName); assertCoMetricResources(KafkaMirrorMaker2.RESOURCE_KIND, 1); - assertCoMetricResourceState(KafkaMirrorMaker2.RESOURCE_KIND, MIRROR_MAKER_CLUSTER, 1, "none"); + assertCoMetricResourceState(KafkaMirrorMaker2.RESOURCE_KIND, MIRROR_MAKER_CLUSTER, FIRST_NAMESPACE, 1, "none"); assertCoMetricResources(KafkaConnector.RESOURCE_KIND, 0); - assertCoMetricResourceStateNotExists(KafkaConnector.RESOURCE_KIND, metricsClusterName); + assertCoMetricResourceStateNotExists(KafkaConnector.RESOURCE_KIND, FIRST_NAMESPACE, metricsClusterName); assertCoMetricResources(KafkaRebalance.RESOURCE_KIND, 0); - assertCoMetricResourceStateNotExists(KafkaRebalance.RESOURCE_KIND, metricsClusterName); + assertCoMetricResourceStateNotExists(KafkaRebalance.RESOURCE_KIND, FIRST_NAMESPACE, metricsClusterName); // Remove s2i CR - KafkaConnectS2IResource.kafkaConnectS2IClient().inNamespace(NAMESPACE).withName(metricsClusterName).delete(); + KafkaConnectS2IResource.kafkaConnectS2IClient().inNamespace(FIRST_NAMESPACE).withName(metricsClusterName).delete(); } @ParallelTest @@ -449,11 +450,11 @@ void testKafkaMetricsSettings() { .withData(Collections.singletonMap(Constants.METRICS_CONFIG_YAML_NAME, metricsConfigYaml)) .withNewMetadata() .withName("external-metrics-cm") - .withNamespace(NAMESPACE) + .withNamespace(SECOND_NAMESPACE) .endMetadata() .build(); - kubeClient().getClient().configMaps().inNamespace(NAMESPACE).createOrReplace(externalMetricsCm); + kubeClient().getClient().configMaps().inNamespace(SECOND_NAMESPACE).createOrReplace(externalMetricsCm); // spec.kafka.metrics -> spec.kafka.jmxExporterMetrics ConfigMapKeySelector cmks = new ConfigMapKeySelectorBuilder() @@ -465,14 +466,14 @@ void testKafkaMetricsSettings() { .withConfigMapKeyRef(cmks) .endValueFrom() .build(); - KafkaResource.replaceKafkaResource(SECOND_CLUSTER, k -> { + KafkaResource.replaceKafkaResourceInSpecificNamespace(SECOND_CLUSTER, k -> { // JMX metrics have higher priority k.getSpec().getKafka().setMetricsConfig(jmxPrometheusExporterMetrics); k.getSpec().getKafka().setMetrics(null); - }); + }, SECOND_NAMESPACE); - PodUtils.verifyThatRunningPodsAreStable(SECOND_CLUSTER); - ConfigMap actualCm = kubeClient().getConfigMap(KafkaResources.kafkaMetricsAndLogConfigMapName(SECOND_CLUSTER)); + PodUtils.verifyThatRunningPodsAreStable(SECOND_NAMESPACE, SECOND_CLUSTER); + ConfigMap actualCm = kubeClient(SECOND_NAMESPACE).getConfigMap(KafkaResources.kafkaMetricsAndLogConfigMapName(SECOND_CLUSTER)); assertThat(actualCm.getData().get(Constants.METRICS_CONFIG_YAML_NAME), is(metricsConfigJson)); // update metrics @@ -480,13 +481,13 @@ void testKafkaMetricsSettings() { .withData(Collections.singletonMap(Constants.METRICS_CONFIG_YAML_NAME, metricsConfigYaml.replace("true", "false"))) .withNewMetadata() .withName("external-metrics-cm") - .withNamespace(NAMESPACE) + .withNamespace(SECOND_NAMESPACE) .endMetadata() .build(); - kubeClient().getClient().configMaps().inNamespace(NAMESPACE).createOrReplace(externalMetricsUpdatedCm); - PodUtils.verifyThatRunningPodsAreStable(SECOND_CLUSTER); - actualCm = kubeClient().getConfigMap(KafkaResources.kafkaMetricsAndLogConfigMapName(SECOND_CLUSTER)); + kubeClient().getClient().configMaps().inNamespace(SECOND_NAMESPACE).createOrReplace(externalMetricsUpdatedCm); + PodUtils.verifyThatRunningPodsAreStable(SECOND_NAMESPACE, SECOND_CLUSTER); + actualCm = kubeClient(SECOND_NAMESPACE).getConfigMap(KafkaResources.kafkaMetricsAndLogConfigMapName(SECOND_CLUSTER)); assertThat(actualCm.getData().get(Constants.METRICS_CONFIG_YAML_NAME), is(metricsConfigJson.replace("true", "false"))); } @@ -495,7 +496,7 @@ private String getExporterRunScript(String podName) throws InterruptedException, command.add("cat"); command.add("/tmp/run.sh"); ArrayList executableCommand = new ArrayList<>(); - executableCommand.addAll(Arrays.asList(cmdKubeClient().toString(), "exec", podName, "-n", NAMESPACE, "--")); + executableCommand.addAll(Arrays.asList(cmdKubeClient().toString(), "exec", podName, "-n", FIRST_NAMESPACE, "--")); executableCommand.addAll(command); Exec exec = new Exec(); @@ -516,14 +517,14 @@ private void assertCoMetricNotNull(String metric, String kind, HashMap i).count(), notNullValue()); } - private void assertCoMetricResourceStateNotExists(String kind, String name) { - Pattern pattern = Pattern.compile("strimzi_resource_state\\{kind=\"" + kind + "\",name=\"" + name + "\",resource_namespace=\"" + NAMESPACE + "\",} ([\\d.][^\\n]+)", Pattern.CASE_INSENSITIVE); + private void assertCoMetricResourceStateNotExists(String kind, String name, String namespace) { + Pattern pattern = Pattern.compile("strimzi_resource_state\\{kind=\"" + kind + "\",name=\"" + name + "\",resource_namespace=\"" + namespace + "\",} ([\\d.][^\\n]+)", Pattern.CASE_INSENSITIVE); ArrayList values = MetricsUtils.collectSpecificMetric(pattern, clusterOperatorMetricsData); assertThat(values.isEmpty(), is(true)); } - private void assertCoMetricResourceState(String kind, String name, int value, String reason) { - Pattern pattern = Pattern.compile("strimzi_resource_state\\{kind=\"" + kind + "\",name=\"" + name + "\",reason=\"" + reason + "\",resource_namespace=\"" + NAMESPACE + "\",} ([\\d.][^\\n]+)", Pattern.CASE_INSENSITIVE); + private void assertCoMetricResourceState(String kind, String name, String namespace, int value, String reason) { + Pattern pattern = Pattern.compile("strimzi_resource_state\\{kind=\"" + kind + "\",name=\"" + name + "\",reason=\"" + reason + "\",resource_namespace=\"" + namespace + "\",} ([\\d.][^\\n]+)", Pattern.CASE_INSENSITIVE); ArrayList values = MetricsUtils.collectSpecificMetric(pattern, clusterOperatorMetricsData); assertThat("strimzi_resource_state for " + kind + " is not " + value, values.stream().mapToDouble(i -> i).sum(), is((double) value)); } @@ -537,9 +538,14 @@ private void assertCoMetricResources(String kind, int value) { @BeforeAll void setupEnvironment(ExtensionContext extensionContext) throws Exception { LOGGER.info("Setting up Environment for MetricsST"); - installClusterOperator(extensionContext, NAMESPACE); + installClusterWideClusterOperator(extensionContext, FIRST_NAMESPACE, Constants.CO_OPERATION_TIMEOUT_DEFAULT, Constants.RECONCILIATION_INTERVAL); + + cluster.createNamespace(SECOND_NAMESPACE); + NetworkPolicyResource.applyDefaultNetworkPolicySettings(extensionContext, Collections.singletonList(SECOND_NAMESPACE)); + + cluster.setNamespace(FIRST_NAMESPACE); - final String kafkaClientsName = NAMESPACE + "-shared-" + Constants.KAFKA_CLIENTS; + final String kafkaClientsName = FIRST_NAMESPACE + "-shared-" + Constants.KAFKA_CLIENTS; // create resources without wait to deploy them simultaneously resourceManager.createResource(extensionContext, false, @@ -553,7 +559,7 @@ void setupEnvironment(ExtensionContext extensionContext) throws Exception { .endEntityOperator() .endSpec() .build(), - KafkaTemplates.kafkaWithMetrics(SECOND_CLUSTER, 1, 1).build(), + KafkaTemplates.kafkaWithMetrics(SECOND_CLUSTER, SECOND_NAMESPACE, 1, 1).build(), KafkaClientsTemplates.kafkaClients(false, kafkaClientsName).build(), KafkaMirrorMaker2Templates.kafkaMirrorMaker2WithMetrics(MIRROR_MAKER_CLUSTER, metricsClusterName, SECOND_CLUSTER, 1).build(), KafkaBridgeTemplates.kafkaBridgeWithMetrics(BRIDGE_CLUSTER, metricsClusterName, KafkaResources.plainBootstrapAddress(metricsClusterName), 1).build() @@ -570,7 +576,7 @@ void setupEnvironment(ExtensionContext extensionContext) throws Exception { kafkaClientsPodName = ResourceManager.kubeClient().listPodsByPrefixInName(kafkaClientsName).get(0).getMetadata().getName(); -// Allow connections from clients to operators pods when NetworkPolicies are set to denied by default + // Allow connections from clients to operators pods when NetworkPolicies are set to denied by default NetworkPolicyResource.allowNetworkPolicySettingsForClusterOperator(extensionContext); NetworkPolicyResource.allowNetworkPolicySettingsForEntityOperator(extensionContext, metricsClusterName); NetworkPolicyResource.allowNetworkPolicySettingsForEntityOperator(extensionContext, SECOND_CLUSTER); @@ -594,10 +600,9 @@ private List getExpectedTopics() { list.add("mirrormaker2-cluster-offsets"); list.add("mirrormaker2-cluster-status"); list.add("mm2-offset-syncs.my-cluster.internal"); - list.add("my-cluster-connect-config"); - list.add("my-cluster-connect-offsets"); - list.add("my-cluster-connect-status"); - list.add("second-kafka-cluster.checkpoints.internal"); + list.add(metricsClusterName + "-connect-config"); + list.add(metricsClusterName + "-connect-offsets"); + list.add(metricsClusterName + "-connect-status"); list.add("heartbeats"); list.add(topicName); list.add(bridgeTopic);