Skip to content

Commit

Permalink
split kafka clusters to seperate namespaces (#4925)
Browse files Browse the repository at this point in the history
Signed-off-by: Lukas Kral <lukywill16@gmail.com>
  • Loading branch information
im-konge authored and scholzj committed May 10, 2021
1 parent 681e758 commit 9a02329
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,18 @@ public static KafkaBuilder kafkaJBOD(String name, int kafkaReplicas, int zookeep
}

public static KafkaBuilder kafkaWithMetrics(String name, int kafkaReplicas, int zookeeperReplicas) {
return kafkaWithMetrics(name, kubeClient().getNamespace(), kafkaReplicas, zookeeperReplicas);
}

public static KafkaBuilder kafkaWithMetrics(String name, String namespace, int kafkaReplicas, int zookeeperReplicas) {
Kafka kafka = getKafkaFromYaml(PATH_TO_KAFKA_METRICS_CONFIG);

ConfigMap metricsCm = TestUtils.configMapFromYaml(PATH_TO_KAFKA_METRICS_CONFIG, "kafka-metrics");
KubeClusterResource.kubeClient().getClient().configMaps().inNamespace(kubeClient().getNamespace()).createOrReplace(metricsCm);
KubeClusterResource.kubeClient().getClient().configMaps().inNamespace(namespace).createOrReplace(metricsCm);
return defaultKafka(kafka, name, kafkaReplicas, zookeeperReplicas)
.editOrNewMetadata()
.withNamespace(namespace)
.endMetadata()
.editSpec()
.withNewKafkaExporter()
.endKafkaExporter()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ public class MetricsST extends AbstractST {

private static final Logger LOGGER = LogManager.getLogger(MetricsST.class);

public static final String NAMESPACE = "metrics-cluster-test";
public static final String FIRST_NAMESPACE = "first-metrics-cluster-test";
public static final String SECOND_NAMESPACE = "second-metrics-cluster-test";
public static final String SECOND_CLUSTER = "second-kafka-cluster";
public static final String MIRROR_MAKER_CLUSTER = "mm2-cluster";
private static final String BRIDGE_CLUSTER = "my-bridge";
Expand Down Expand Up @@ -128,7 +129,7 @@ void testKafkaBrokersCount() {
void testKafkaTopicPartitions() {
Pattern topicPartitions = Pattern.compile("kafka_server_replicamanager_partitioncount ([\\d.][^\\n]+)", Pattern.CASE_INSENSITIVE);
ArrayList<Double> values = MetricsUtils.collectSpecificMetric(topicPartitions, kafkaMetricsData);
assertThat("Topic partitions count doesn't match expected value", values.stream().mapToDouble(i -> i).sum(), is(424.0));
assertThat("Topic partitions count doesn't match expected value", values.stream().mapToDouble(i -> i).sum(), is(423.0));
}

@ParallelTest
Expand Down Expand Up @@ -198,11 +199,11 @@ void testKafkaConnectIoNetwork() {
@IsolatedTest
@Tag(ACCEPTANCE)
@Tag(INTERNAL_CLIENTS_USED)
void testKafkaExporterDataAfterExchange(ExtensionContext extensionContext) {
void testKafkaExporterDataAfterExchange() {
InternalKafkaClient internalKafkaClient = new InternalKafkaClient.Builder()
.withUsingPodName(kafkaClientsPodName)
.withTopicName(topicName)
.withNamespaceName(NAMESPACE)
.withNamespaceName(FIRST_NAMESPACE)
.withClusterName(metricsClusterName)
.withMessageCount(5000)
.withListenerName(Constants.PLAIN_LISTENER_DEFAULT_NAME)
Expand Down Expand Up @@ -275,33 +276,33 @@ void testClusterOperatorMetrics(ExtensionContext extensionContext) {
}

assertCoMetricResources(Kafka.RESOURCE_KIND, 2);
assertCoMetricResourceState(Kafka.RESOURCE_KIND, metricsClusterName, 1, "none");
assertCoMetricResourceState(Kafka.RESOURCE_KIND, SECOND_CLUSTER, 1, "none");
assertCoMetricResourceState(Kafka.RESOURCE_KIND, metricsClusterName, FIRST_NAMESPACE, 1, "none");
assertCoMetricResourceState(Kafka.RESOURCE_KIND, SECOND_CLUSTER, SECOND_NAMESPACE, 1, "none");

assertCoMetricResources(KafkaBridge.RESOURCE_KIND, 1);
assertCoMetricResourceState(KafkaBridge.RESOURCE_KIND, BRIDGE_CLUSTER, 1, "none");
assertCoMetricResourceState(KafkaBridge.RESOURCE_KIND, BRIDGE_CLUSTER, FIRST_NAMESPACE, 1, "none");

assertCoMetricResources(KafkaConnect.RESOURCE_KIND, 1);
assertCoMetricResourceState(KafkaConnect.RESOURCE_KIND, metricsClusterName, 1, "none");
assertCoMetricResourceState(KafkaConnect.RESOURCE_KIND, metricsClusterName, FIRST_NAMESPACE, 1, "none");

assertCoMetricResources(KafkaConnectS2I.RESOURCE_KIND, 0);
assertCoMetricResourceState(KafkaConnectS2I.RESOURCE_KIND, metricsClusterName, 0, "Both KafkaConnect and KafkaConnectS2I exist with the same name. " +
assertCoMetricResourceState(KafkaConnectS2I.RESOURCE_KIND, metricsClusterName, FIRST_NAMESPACE, 0, "Both KafkaConnect and KafkaConnectS2I exist with the same name. " +
"KafkaConnect is older and will be used while this custom resource will be ignored.");

assertCoMetricResources(KafkaMirrorMaker.RESOURCE_KIND, 0);
assertCoMetricResourceStateNotExists(KafkaMirrorMaker.RESOURCE_KIND, metricsClusterName);
assertCoMetricResourceStateNotExists(KafkaMirrorMaker.RESOURCE_KIND, FIRST_NAMESPACE, metricsClusterName);

assertCoMetricResources(KafkaMirrorMaker2.RESOURCE_KIND, 1);
assertCoMetricResourceState(KafkaMirrorMaker2.RESOURCE_KIND, MIRROR_MAKER_CLUSTER, 1, "none");
assertCoMetricResourceState(KafkaMirrorMaker2.RESOURCE_KIND, MIRROR_MAKER_CLUSTER, FIRST_NAMESPACE, 1, "none");

assertCoMetricResources(KafkaConnector.RESOURCE_KIND, 0);
assertCoMetricResourceStateNotExists(KafkaConnector.RESOURCE_KIND, metricsClusterName);
assertCoMetricResourceStateNotExists(KafkaConnector.RESOURCE_KIND, FIRST_NAMESPACE, metricsClusterName);

assertCoMetricResources(KafkaRebalance.RESOURCE_KIND, 0);
assertCoMetricResourceStateNotExists(KafkaRebalance.RESOURCE_KIND, metricsClusterName);
assertCoMetricResourceStateNotExists(KafkaRebalance.RESOURCE_KIND, FIRST_NAMESPACE, metricsClusterName);

// Remove s2i CR
KafkaConnectS2IResource.kafkaConnectS2IClient().inNamespace(NAMESPACE).withName(metricsClusterName).delete();
KafkaConnectS2IResource.kafkaConnectS2IClient().inNamespace(FIRST_NAMESPACE).withName(metricsClusterName).delete();
}

@ParallelTest
Expand Down Expand Up @@ -449,11 +450,11 @@ void testKafkaMetricsSettings() {
.withData(Collections.singletonMap(Constants.METRICS_CONFIG_YAML_NAME, metricsConfigYaml))
.withNewMetadata()
.withName("external-metrics-cm")
.withNamespace(NAMESPACE)
.withNamespace(SECOND_NAMESPACE)
.endMetadata()
.build();

kubeClient().getClient().configMaps().inNamespace(NAMESPACE).createOrReplace(externalMetricsCm);
kubeClient().getClient().configMaps().inNamespace(SECOND_NAMESPACE).createOrReplace(externalMetricsCm);

// spec.kafka.metrics -> spec.kafka.jmxExporterMetrics
ConfigMapKeySelector cmks = new ConfigMapKeySelectorBuilder()
Expand All @@ -465,28 +466,28 @@ void testKafkaMetricsSettings() {
.withConfigMapKeyRef(cmks)
.endValueFrom()
.build();
KafkaResource.replaceKafkaResource(SECOND_CLUSTER, k -> {
KafkaResource.replaceKafkaResourceInSpecificNamespace(SECOND_CLUSTER, k -> {
// JMX metrics have higher priority
k.getSpec().getKafka().setMetricsConfig(jmxPrometheusExporterMetrics);
k.getSpec().getKafka().setMetrics(null);
});
}, SECOND_NAMESPACE);

PodUtils.verifyThatRunningPodsAreStable(SECOND_CLUSTER);
ConfigMap actualCm = kubeClient().getConfigMap(KafkaResources.kafkaMetricsAndLogConfigMapName(SECOND_CLUSTER));
PodUtils.verifyThatRunningPodsAreStable(SECOND_NAMESPACE, SECOND_CLUSTER);
ConfigMap actualCm = kubeClient(SECOND_NAMESPACE).getConfigMap(KafkaResources.kafkaMetricsAndLogConfigMapName(SECOND_CLUSTER));
assertThat(actualCm.getData().get(Constants.METRICS_CONFIG_YAML_NAME), is(metricsConfigJson));

// update metrics
ConfigMap externalMetricsUpdatedCm = new ConfigMapBuilder()
.withData(Collections.singletonMap(Constants.METRICS_CONFIG_YAML_NAME, metricsConfigYaml.replace("true", "false")))
.withNewMetadata()
.withName("external-metrics-cm")
.withNamespace(NAMESPACE)
.withNamespace(SECOND_NAMESPACE)
.endMetadata()
.build();

kubeClient().getClient().configMaps().inNamespace(NAMESPACE).createOrReplace(externalMetricsUpdatedCm);
PodUtils.verifyThatRunningPodsAreStable(SECOND_CLUSTER);
actualCm = kubeClient().getConfigMap(KafkaResources.kafkaMetricsAndLogConfigMapName(SECOND_CLUSTER));
kubeClient().getClient().configMaps().inNamespace(SECOND_NAMESPACE).createOrReplace(externalMetricsUpdatedCm);
PodUtils.verifyThatRunningPodsAreStable(SECOND_NAMESPACE, SECOND_CLUSTER);
actualCm = kubeClient(SECOND_NAMESPACE).getConfigMap(KafkaResources.kafkaMetricsAndLogConfigMapName(SECOND_CLUSTER));
assertThat(actualCm.getData().get(Constants.METRICS_CONFIG_YAML_NAME), is(metricsConfigJson.replace("true", "false")));
}

Expand All @@ -495,7 +496,7 @@ private String getExporterRunScript(String podName) throws InterruptedException,
command.add("cat");
command.add("/tmp/run.sh");
ArrayList<String> executableCommand = new ArrayList<>();
executableCommand.addAll(Arrays.asList(cmdKubeClient().toString(), "exec", podName, "-n", NAMESPACE, "--"));
executableCommand.addAll(Arrays.asList(cmdKubeClient().toString(), "exec", podName, "-n", FIRST_NAMESPACE, "--"));
executableCommand.addAll(command);

Exec exec = new Exec();
Expand All @@ -516,14 +517,14 @@ private void assertCoMetricNotNull(String metric, String kind, HashMap<String, S
assertThat(values.stream().mapToDouble(i -> i).count(), notNullValue());
}

private void assertCoMetricResourceStateNotExists(String kind, String name) {
Pattern pattern = Pattern.compile("strimzi_resource_state\\{kind=\"" + kind + "\",name=\"" + name + "\",resource_namespace=\"" + NAMESPACE + "\",} ([\\d.][^\\n]+)", Pattern.CASE_INSENSITIVE);
private void assertCoMetricResourceStateNotExists(String kind, String name, String namespace) {
Pattern pattern = Pattern.compile("strimzi_resource_state\\{kind=\"" + kind + "\",name=\"" + name + "\",resource_namespace=\"" + namespace + "\",} ([\\d.][^\\n]+)", Pattern.CASE_INSENSITIVE);
ArrayList<Double> values = MetricsUtils.collectSpecificMetric(pattern, clusterOperatorMetricsData);
assertThat(values.isEmpty(), is(true));
}

private void assertCoMetricResourceState(String kind, String name, int value, String reason) {
Pattern pattern = Pattern.compile("strimzi_resource_state\\{kind=\"" + kind + "\",name=\"" + name + "\",reason=\"" + reason + "\",resource_namespace=\"" + NAMESPACE + "\",} ([\\d.][^\\n]+)", Pattern.CASE_INSENSITIVE);
private void assertCoMetricResourceState(String kind, String name, String namespace, int value, String reason) {
Pattern pattern = Pattern.compile("strimzi_resource_state\\{kind=\"" + kind + "\",name=\"" + name + "\",reason=\"" + reason + "\",resource_namespace=\"" + namespace + "\",} ([\\d.][^\\n]+)", Pattern.CASE_INSENSITIVE);
ArrayList<Double> values = MetricsUtils.collectSpecificMetric(pattern, clusterOperatorMetricsData);
assertThat("strimzi_resource_state for " + kind + " is not " + value, values.stream().mapToDouble(i -> i).sum(), is((double) value));
}
Expand All @@ -537,9 +538,14 @@ private void assertCoMetricResources(String kind, int value) {
@BeforeAll
void setupEnvironment(ExtensionContext extensionContext) throws Exception {
LOGGER.info("Setting up Environment for MetricsST");
installClusterOperator(extensionContext, NAMESPACE);
installClusterWideClusterOperator(extensionContext, FIRST_NAMESPACE, Constants.CO_OPERATION_TIMEOUT_DEFAULT, Constants.RECONCILIATION_INTERVAL);

cluster.createNamespace(SECOND_NAMESPACE);
NetworkPolicyResource.applyDefaultNetworkPolicySettings(extensionContext, Collections.singletonList(SECOND_NAMESPACE));

cluster.setNamespace(FIRST_NAMESPACE);

final String kafkaClientsName = NAMESPACE + "-shared-" + Constants.KAFKA_CLIENTS;
final String kafkaClientsName = FIRST_NAMESPACE + "-shared-" + Constants.KAFKA_CLIENTS;

// create resources without wait to deploy them simultaneously
resourceManager.createResource(extensionContext, false,
Expand All @@ -553,7 +559,7 @@ void setupEnvironment(ExtensionContext extensionContext) throws Exception {
.endEntityOperator()
.endSpec()
.build(),
KafkaTemplates.kafkaWithMetrics(SECOND_CLUSTER, 1, 1).build(),
KafkaTemplates.kafkaWithMetrics(SECOND_CLUSTER, SECOND_NAMESPACE, 1, 1).build(),
KafkaClientsTemplates.kafkaClients(false, kafkaClientsName).build(),
KafkaMirrorMaker2Templates.kafkaMirrorMaker2WithMetrics(MIRROR_MAKER_CLUSTER, metricsClusterName, SECOND_CLUSTER, 1).build(),
KafkaBridgeTemplates.kafkaBridgeWithMetrics(BRIDGE_CLUSTER, metricsClusterName, KafkaResources.plainBootstrapAddress(metricsClusterName), 1).build()
Expand All @@ -570,7 +576,7 @@ void setupEnvironment(ExtensionContext extensionContext) throws Exception {

kafkaClientsPodName = ResourceManager.kubeClient().listPodsByPrefixInName(kafkaClientsName).get(0).getMetadata().getName();

// Allow connections from clients to operators pods when NetworkPolicies are set to denied by default
// Allow connections from clients to operators pods when NetworkPolicies are set to denied by default
NetworkPolicyResource.allowNetworkPolicySettingsForClusterOperator(extensionContext);
NetworkPolicyResource.allowNetworkPolicySettingsForEntityOperator(extensionContext, metricsClusterName);
NetworkPolicyResource.allowNetworkPolicySettingsForEntityOperator(extensionContext, SECOND_CLUSTER);
Expand All @@ -594,10 +600,9 @@ private List<String> getExpectedTopics() {
list.add("mirrormaker2-cluster-offsets");
list.add("mirrormaker2-cluster-status");
list.add("mm2-offset-syncs.my-cluster.internal");
list.add("my-cluster-connect-config");
list.add("my-cluster-connect-offsets");
list.add("my-cluster-connect-status");
list.add("second-kafka-cluster.checkpoints.internal");
list.add(metricsClusterName + "-connect-config");
list.add(metricsClusterName + "-connect-offsets");
list.add(metricsClusterName + "-connect-status");
list.add("heartbeats");
list.add(topicName);
list.add(bridgeTopic);
Expand Down

0 comments on commit 9a02329

Please sign in to comment.