Skip to content

Commit

Permalink
[FLINK-27613] Add label for the session job to help list the session …
Browse files Browse the repository at this point in the history
…jobs in the same session cluster
  • Loading branch information
Aitozi committed May 14, 2022
1 parent 85fc32a commit 916c6c1
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,14 @@

import javax.annotation.Nullable;

import java.util.HashMap;
import java.util.Optional;

/** The reconciler for the {@link FlinkSessionJob}. */
public class FlinkSessionJobReconciler implements Reconciler<FlinkSessionJob> {

private static final Logger LOG = LoggerFactory.getLogger(FlinkSessionJobReconciler.class);
public static final String LABEL_SESSION_CLUSTER = "target.session.name";

private final FlinkConfigManager configManager;
private final KubernetesClient kubernetesClient;
Expand Down Expand Up @@ -76,6 +78,24 @@ public void reconcile(FlinkSessionJob flinkSessionJob, Context context) throws E
OperatorUtils.getSecondaryResource(
flinkSessionJob, context, configManager.getOperatorConfiguration());

var labels = flinkSessionJob.getMetadata().getLabels();
if (labels == null) {
labels = new HashMap<>();
}
if (!flinkSessionJob
.getSpec()
.getDeploymentName()
.equals(labels.get(LABEL_SESSION_CLUSTER))) {
labels.put(LABEL_SESSION_CLUSTER, flinkSessionJob.getSpec().getDeploymentName());
flinkSessionJob.getMetadata().setLabels(labels);
kubernetesClient
.resources(FlinkSessionJob.class)
.inNamespace(flinkSessionJob.getMetadata().getNamespace())
.withName(flinkSessionJob.getMetadata().getName())
.patch(flinkSessionJob);
return;
}

// if session cluster is not ready, we can't do reconcile for the job.
if (!helper.sessionClusterReady(flinkDepOptional)) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.flink.kubernetes.operator.metrics.MetricManager;
import org.apache.flink.kubernetes.operator.observer.deployment.ObserverFactory;
import org.apache.flink.kubernetes.operator.reconciler.deployment.ReconcilerFactory;
import org.apache.flink.kubernetes.operator.reconciler.sessionjob.FlinkSessionJobReconciler;
import org.apache.flink.kubernetes.operator.utils.StatusHelper;
import org.apache.flink.kubernetes.operator.utils.ValidatorUtils;
import org.apache.flink.metrics.testutils.MetricListener;
Expand Down Expand Up @@ -131,6 +132,10 @@ public static FlinkSessionJob buildSessionJob() {
new ObjectMetaBuilder()
.withName(TEST_SESSION_JOB_NAME)
.withNamespace(TEST_NAMESPACE)
.withLabels(
Map.of(
FlinkSessionJobReconciler.LABEL_SESSION_CLUSTER,
TEST_DEPLOYMENT_NAME))
.build());
sessionJob.setSpec(
FlinkSessionJobSpec.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,16 @@
import org.apache.flink.kubernetes.operator.TestingFlinkService;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.crd.CrdConstants;
import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.crd.spec.JobState;
import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;

import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
Expand All @@ -43,9 +47,12 @@
import static org.junit.jupiter.api.Assertions.assertTrue;

/** Tests for {@link FlinkSessionJobReconciler}. */
@EnableKubernetesMockClient(crud = true)
public class FlinkSessionJobReconcilerTest {

private final FlinkConfigManager configManager = new FlinkConfigManager(new Configuration());
private KubernetesMockServer mockServer;
private KubernetesClient kubernetesClient;

@Test
public void testSubmitAndCleanUp() throws Exception {
Expand Down Expand Up @@ -420,4 +427,33 @@ public void testJobUpgradeIgnorePendingSavepoint() throws Exception {
spSessionJob.getStatus().getJobStatus().getSavepointInfo().getTriggerId());
assertEquals(JobState.SUSPENDED.name(), spSessionJob.getStatus().getJobStatus().getState());
}

@Test
public void testSetSessionLabel() throws Exception {
TestingFlinkService flinkService = new TestingFlinkService();
FlinkSessionJob sessionJob = TestUtils.buildSessionJob();
sessionJob.getMetadata().getLabels().clear();

FlinkSessionJobReconciler reconciler =
new FlinkSessionJobReconciler(kubernetesClient, flinkService, configManager);

var path =
String.format(
"/apis/%s/%s/namespaces/%s/%s/%s",
CrdConstants.API_GROUP,
CrdConstants.API_VERSION,
sessionJob.getMetadata().getNamespace(),
sessionJob.getPlural(),
sessionJob.getMetadata().getName());

mockServer.expect().get().withPath(path).andReturn(200, sessionJob).once();
mockServer.expect().patch().withPath(path).andReturn(200, sessionJob).once();
reconciler.reconcile(sessionJob, TestUtils.createEmptyContext());
Assertions.assertEquals(
TestUtils.TEST_DEPLOYMENT_NAME,
sessionJob
.getMetadata()
.getLabels()
.get(FlinkSessionJobReconciler.LABEL_SESSION_CLUSTER));
}
}

0 comments on commit 916c6c1

Please sign in to comment.