diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/FlinkSessionJobReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/FlinkSessionJobReconciler.java index cad95c9624..7893ac89f0 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/FlinkSessionJobReconciler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/FlinkSessionJobReconciler.java @@ -42,12 +42,14 @@ import javax.annotation.Nullable; +import java.util.HashMap; import java.util.Optional; /** The reconciler for the {@link FlinkSessionJob}. */ public class FlinkSessionJobReconciler implements Reconciler { private static final Logger LOG = LoggerFactory.getLogger(FlinkSessionJobReconciler.class); + public static final String LABEL_SESSION_CLUSTER = "target.session.name"; private final FlinkConfigManager configManager; private final KubernetesClient kubernetesClient; @@ -76,6 +78,24 @@ public void reconcile(FlinkSessionJob flinkSessionJob, Context context) throws E OperatorUtils.getSecondaryResource( flinkSessionJob, context, configManager.getOperatorConfiguration()); + var labels = flinkSessionJob.getMetadata().getLabels(); + if (labels == null) { + labels = new HashMap<>(); + } + if (!flinkSessionJob + .getSpec() + .getDeploymentName() + .equals(labels.get(LABEL_SESSION_CLUSTER))) { + labels.put(LABEL_SESSION_CLUSTER, flinkSessionJob.getSpec().getDeploymentName()); + flinkSessionJob.getMetadata().setLabels(labels); + kubernetesClient + .resources(FlinkSessionJob.class) + .inNamespace(flinkSessionJob.getMetadata().getNamespace()) + .withName(flinkSessionJob.getMetadata().getName()) + .patch(flinkSessionJob); + return; + } + // if session cluster is not ready, we can't do reconcile for the job. if (!helper.sessionClusterReady(flinkDepOptional)) { return; diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java index 38ea772864..f57a23ad06 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java @@ -42,6 +42,7 @@ import org.apache.flink.kubernetes.operator.metrics.MetricManager; import org.apache.flink.kubernetes.operator.observer.deployment.ObserverFactory; import org.apache.flink.kubernetes.operator.reconciler.deployment.ReconcilerFactory; +import org.apache.flink.kubernetes.operator.reconciler.sessionjob.FlinkSessionJobReconciler; import org.apache.flink.kubernetes.operator.utils.StatusHelper; import org.apache.flink.kubernetes.operator.utils.ValidatorUtils; import org.apache.flink.metrics.testutils.MetricListener; @@ -131,6 +132,10 @@ public static FlinkSessionJob buildSessionJob() { new ObjectMetaBuilder() .withName(TEST_SESSION_JOB_NAME) .withNamespace(TEST_NAMESPACE) + .withLabels( + Map.of( + FlinkSessionJobReconciler.LABEL_SESSION_CLUSTER, + TEST_DEPLOYMENT_NAME)) .build()); sessionJob.setSpec( FlinkSessionJobSpec.builder() diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/FlinkSessionJobReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/FlinkSessionJobReconcilerTest.java index 645cf84452..0277e54a96 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/FlinkSessionJobReconcilerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/FlinkSessionJobReconcilerTest.java @@ -23,12 +23,16 @@ import org.apache.flink.kubernetes.operator.TestingFlinkService; import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions; +import org.apache.flink.kubernetes.operator.crd.CrdConstants; import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob; import org.apache.flink.kubernetes.operator.crd.spec.JobState; import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode; import org.apache.flink.kubernetes.operator.crd.status.JobStatus; import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient; +import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer; import io.javaoperatorsdk.operator.api.reconciler.Context; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -43,9 +47,12 @@ import static org.junit.jupiter.api.Assertions.assertTrue; /** Tests for {@link FlinkSessionJobReconciler}. */ +@EnableKubernetesMockClient(crud = true) public class FlinkSessionJobReconcilerTest { private final FlinkConfigManager configManager = new FlinkConfigManager(new Configuration()); + private KubernetesMockServer mockServer; + private KubernetesClient kubernetesClient; @Test public void testSubmitAndCleanUp() throws Exception { @@ -420,4 +427,33 @@ public void testJobUpgradeIgnorePendingSavepoint() throws Exception { spSessionJob.getStatus().getJobStatus().getSavepointInfo().getTriggerId()); assertEquals(JobState.SUSPENDED.name(), spSessionJob.getStatus().getJobStatus().getState()); } + + @Test + public void testSetSessionLabel() throws Exception { + TestingFlinkService flinkService = new TestingFlinkService(); + FlinkSessionJob sessionJob = TestUtils.buildSessionJob(); + sessionJob.getMetadata().getLabels().clear(); + + FlinkSessionJobReconciler reconciler = + new FlinkSessionJobReconciler(kubernetesClient, flinkService, configManager); + + var path = + String.format( + "/apis/%s/%s/namespaces/%s/%s/%s", + CrdConstants.API_GROUP, + CrdConstants.API_VERSION, + sessionJob.getMetadata().getNamespace(), + sessionJob.getPlural(), + sessionJob.getMetadata().getName()); + + mockServer.expect().get().withPath(path).andReturn(200, sessionJob).once(); + mockServer.expect().patch().withPath(path).andReturn(200, sessionJob).once(); + reconciler.reconcile(sessionJob, TestUtils.createEmptyContext()); + Assertions.assertEquals( + TestUtils.TEST_DEPLOYMENT_NAME, + sessionJob + .getMetadata() + .getLabels() + .get(FlinkSessionJobReconciler.LABEL_SESSION_CLUSTER)); + } }