From 43da0d66adbb511a281c399feabb898fb1886093 Mon Sep 17 00:00:00 2001 From: Aitozi Date: Sat, 11 Jun 2022 00:15:36 +0800 Subject: [PATCH] [FLINK-27613] Add label for the session job to help list the session jobs in the same session cluster --- .../kubernetes/operator/crd/CrdConstants.java | 2 + .../operator/admission/AdmissionHandler.java | 24 ++++- .../admission/FlinkOperatorWebhook.java | 4 +- .../kubernetes/operator/admission/Utils.java | 34 +++++++ .../mutator/DefaultRequestMutator.java | 94 +++++++++++++++++++ .../admission/mutator/FlinkMutator.java | 70 ++++++++++++++ .../admission/AdmissionHandlerTest.java | 56 ++++++++++- .../templates/webhook.yaml | 34 ++++++- 8 files changed, 310 insertions(+), 8 deletions(-) create mode 100644 flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/Utils.java create mode 100644 flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/mutator/DefaultRequestMutator.java create mode 100644 flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/mutator/FlinkMutator.java diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/CrdConstants.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/CrdConstants.java index b699ad908e..d807d336f0 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/CrdConstants.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/CrdConstants.java @@ -23,4 +23,6 @@ public class CrdConstants { public static final String API_VERSION = "v1beta1"; public static final String KIND_SESSION_JOB = "FlinkSessionJob"; public static final String KIND_FLINK_DEPLOYMENT = "FlinkDeployment"; + + public static final String LABEL_TARGET_SESSION = "target.session"; } diff --git a/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/AdmissionHandler.java b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/AdmissionHandler.java index b204699bad..2a877c9156 100644 --- a/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/AdmissionHandler.java +++ b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/AdmissionHandler.java @@ -17,6 +17,8 @@ package org.apache.flink.kubernetes.operator.admission; +import org.apache.flink.kubernetes.operator.admission.mutator.DefaultRequestMutator; + import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream; import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled; @@ -40,6 +42,7 @@ import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.api.model.admission.v1.AdmissionReview; import io.javaoperatorsdk.admissioncontroller.AdmissionController; +import io.javaoperatorsdk.admissioncontroller.mutation.Mutator; import io.javaoperatorsdk.admissioncontroller.validation.Validator; import org.apache.commons.lang3.exception.ExceptionUtils; import org.slf4j.Logger; @@ -62,11 +65,14 @@ public class AdmissionHandler extends SimpleChannelInboundHandler { private static final Logger LOG = LoggerFactory.getLogger(AdmissionHandler.class); private static final ObjectMapper objectMapper = new ObjectMapper(); protected static final String VALIDATE_REQUEST_PATH = "/validate"; + protected static final String MUTATOR_REQUEST_PATH = "/mutate"; private final AdmissionController validatingController; + private final AdmissionController mutatorController; - public AdmissionHandler(Validator validator) { + public AdmissionHandler(Validator validator, Mutator mutator) { this.validatingController = new AdmissionController<>(validator); + this.mutatorController = new AdmissionController<>(new DefaultRequestMutator(mutator)); } @Override @@ -85,11 +91,23 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpRequest httpRequest) LOG.error("Failed to validate", e); sendError(ctx, ExceptionUtils.getStackTrace(e)); } + } else if (MUTATOR_REQUEST_PATH.equals(path)) { + final ByteBuf msgContent = ((FullHttpRequest) httpRequest).content(); + AdmissionReview review; + try { + InputStream in = new ByteBufInputStream(msgContent); + review = objectMapper.readValue(in, AdmissionReview.class); + AdmissionReview response = mutatorController.handle(review); + sendResponse(ctx, objectMapper.writeValueAsString(response)); + } catch (Exception e) { + LOG.error("Failed to mutate", e); + sendError(ctx, ExceptionUtils.getStackTrace(e)); + } } else { String error = String.format( - "Illegal path requested: %s. Only %s is accepted.", - path, VALIDATE_REQUEST_PATH); + "Illegal path requested: %s. Only %s or %s is accepted.", + path, VALIDATE_REQUEST_PATH, MUTATOR_REQUEST_PATH); LOG.error(error); sendError(ctx, error); } diff --git a/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkOperatorWebhook.java b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkOperatorWebhook.java index 21a8a21ca6..a6b9960cce 100644 --- a/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkOperatorWebhook.java +++ b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkOperatorWebhook.java @@ -17,6 +17,7 @@ package org.apache.flink.kubernetes.operator.admission; +import org.apache.flink.kubernetes.operator.admission.mutator.FlinkMutator; import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; import org.apache.flink.kubernetes.operator.utils.EnvUtils; import org.apache.flink.kubernetes.operator.utils.ValidatorUtils; @@ -61,7 +62,8 @@ public static void main(String[] args) throws Exception { FlinkConfigManager configManager = new FlinkConfigManager(); Set validators = ValidatorUtils.discoverValidators(configManager); AdmissionHandler endpoint = - new AdmissionHandler(new FlinkValidator(validators, configManager)); + new AdmissionHandler( + new FlinkValidator(validators, configManager), new FlinkMutator()); ChannelInitializer initializer = createChannelInitializer(endpoint); NioEventLoopGroup bossGroup = new NioEventLoopGroup(1); NioEventLoopGroup workerGroup = new NioEventLoopGroup(); diff --git a/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/Utils.java b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/Utils.java new file mode 100644 index 0000000000..11333785bb --- /dev/null +++ b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/Utils.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.admission; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.fabric8.kubernetes.api.model.HasMetadata; + +/** Admission utils. */ +public class Utils { + + private static final ObjectMapper mapper = new ObjectMapper(); + + public static T convertToTargetType(HasMetadata resource, Class targetType) + throws JsonProcessingException { + var serialized = mapper.writeValueAsString(resource); + return mapper.readValue(serialized, targetType); + } +} diff --git a/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/mutator/DefaultRequestMutator.java b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/mutator/DefaultRequestMutator.java new file mode 100644 index 0000000000..65c3edccc6 --- /dev/null +++ b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/mutator/DefaultRequestMutator.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.admission.mutator; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.fabric8.kubernetes.api.model.KubernetesResource; +import io.fabric8.kubernetes.api.model.admission.v1.AdmissionRequest; +import io.fabric8.kubernetes.api.model.admission.v1.AdmissionResponse; +import io.fabric8.zjsonpatch.JsonDiff; +import io.javaoperatorsdk.admissioncontroller.AdmissionUtils; +import io.javaoperatorsdk.admissioncontroller.NotAllowedException; +import io.javaoperatorsdk.admissioncontroller.Operation; +import io.javaoperatorsdk.admissioncontroller.RequestHandler; +import io.javaoperatorsdk.admissioncontroller.clone.Cloner; +import io.javaoperatorsdk.admissioncontroller.clone.ObjectMapperCloner; +import io.javaoperatorsdk.admissioncontroller.mutation.Mutator; + +import java.nio.charset.StandardCharsets; +import java.util.Base64; + +/** + * The default request mutator. It's copied from the {@link + * io.javaoperatorsdk.admissioncontroller.mutation.DefaultRequestMutator} with a modified path diff + * util to serialize out include non-null. + * + * @param + */ +public class DefaultRequestMutator implements RequestHandler { + private static final ObjectMapper mapper = + new ObjectMapper().setSerializationInclusion(JsonInclude.Include.NON_NULL); + private final Mutator mutator; + private final Cloner cloner; + + public DefaultRequestMutator(Mutator mutator) { + this(mutator, new ObjectMapperCloner<>()); + } + + public DefaultRequestMutator(Mutator mutator, Cloner cloner) { + this.mutator = mutator; + this.cloner = cloner; + } + + public AdmissionResponse handle(AdmissionRequest admissionRequest) { + Operation operation = Operation.valueOf(admissionRequest.getOperation()); + KubernetesResource originalResource = + AdmissionUtils.getTargetResource(admissionRequest, operation); + KubernetesResource clonedResource = + (KubernetesResource) this.cloner.clone((T) originalResource); + + AdmissionResponse admissionResponse; + try { + T mutatedResource = this.mutator.mutate((T) clonedResource, operation); + admissionResponse = admissionResponseFromMutation(originalResource, mutatedResource); + } catch (NotAllowedException e) { + admissionResponse = AdmissionUtils.notAllowedExceptionToAdmissionResponse(e); + } + + return admissionResponse; + } + + public static AdmissionResponse admissionResponseFromMutation( + KubernetesResource originalResource, KubernetesResource mutatedResource) { + AdmissionResponse admissionResponse = new AdmissionResponse(); + admissionResponse.setAllowed(true); + // It only allowed JSONPatch now, So we should avoid serialize out null value + // https://github.com/kubernetes/kubernetes/blob/3f1a9f9f3eaeae3d387b9152ea9aebb52be72319/pkg/apis/admission/types.go#L134 + admissionResponse.setPatchType("JSONPatch"); + JsonNode originalResNode = mapper.valueToTree(originalResource); + JsonNode mutatedResNode = mapper.valueToTree(mutatedResource); + JsonNode diff = JsonDiff.asJson(originalResNode, mutatedResNode); + String base64Diff = + Base64.getEncoder() + .encodeToString(diff.toString().getBytes(StandardCharsets.UTF_8)); + admissionResponse.setPatch(base64Diff); + return admissionResponse; + } +} diff --git a/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/mutator/FlinkMutator.java b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/mutator/FlinkMutator.java new file mode 100644 index 0000000000..ac35405f82 --- /dev/null +++ b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/mutator/FlinkMutator.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.admission.mutator; + +import org.apache.flink.kubernetes.operator.admission.Utils; +import org.apache.flink.kubernetes.operator.crd.CrdConstants; +import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.javaoperatorsdk.admissioncontroller.NotAllowedException; +import io.javaoperatorsdk.admissioncontroller.Operation; +import io.javaoperatorsdk.admissioncontroller.mutation.Mutator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; + +/** The default mutator. */ +public class FlinkMutator implements Mutator { + private static final Logger LOG = LoggerFactory.getLogger(FlinkMutator.class); + + @Override + public HasMetadata mutate(HasMetadata resource, Operation operation) + throws NotAllowedException { + if (operation == Operation.CREATE) { + LOG.debug("Mutating resource {}", resource); + + if (CrdConstants.KIND_SESSION_JOB.equals(resource.getKind())) { + try { + var sessionJob = Utils.convertToTargetType(resource, FlinkSessionJob.class); + patchInternalLabel(sessionJob); + return sessionJob; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + return resource; + } + + private void patchInternalLabel(FlinkSessionJob flinkSessionJob) { + var labels = flinkSessionJob.getMetadata().getLabels(); + if (labels == null) { + labels = new HashMap<>(); + } + var deploymentName = flinkSessionJob.getSpec().getDeploymentName(); + if (deploymentName != null + && !deploymentName.equals(labels.get(CrdConstants.LABEL_TARGET_SESSION))) { + labels.put( + CrdConstants.LABEL_TARGET_SESSION, + flinkSessionJob.getSpec().getDeploymentName()); + flinkSessionJob.getMetadata().setLabels(labels); + } + } +} diff --git a/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/AdmissionHandlerTest.java b/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/AdmissionHandlerTest.java index 71adc90cbc..9476bd008a 100644 --- a/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/AdmissionHandlerTest.java +++ b/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/AdmissionHandlerTest.java @@ -17,12 +17,18 @@ package org.apache.flink.kubernetes.operator.admission; +import org.apache.flink.kubernetes.operator.admission.mutator.FlinkMutator; import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; +import org.apache.flink.kubernetes.operator.crd.CrdConstants; import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; +import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob; import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec; +import org.apache.flink.kubernetes.operator.crd.spec.FlinkSessionJobSpec; +import org.apache.flink.kubernetes.operator.crd.spec.JobSpec; import org.apache.flink.kubernetes.operator.utils.ValidatorUtils; import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled; +import org.apache.flink.shaded.netty4.io.netty.buffer.UnpooledHeapByteBuf; import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpRequest; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse; @@ -33,11 +39,14 @@ import io.fabric8.kubernetes.api.model.GroupVersionKind; import io.fabric8.kubernetes.api.model.admission.v1.AdmissionRequest; import io.fabric8.kubernetes.api.model.admission.v1.AdmissionReview; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import java.io.IOException; +import java.util.Base64; import static io.javaoperatorsdk.admissioncontroller.Operation.CREATE; +import static org.apache.flink.kubernetes.operator.admission.AdmissionHandler.MUTATOR_REQUEST_PATH; import static org.apache.flink.kubernetes.operator.admission.AdmissionHandler.VALIDATE_REQUEST_PATH; import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod.GET; import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR; @@ -53,7 +62,8 @@ public class AdmissionHandlerTest { new AdmissionHandler( new FlinkValidator( ValidatorUtils.discoverValidators(new FlinkConfigManager()), - new FlinkConfigManager())); + new FlinkConfigManager()), + new FlinkMutator()); @Test public void testHandleIllegalRequest() { @@ -65,8 +75,8 @@ public void testHandleIllegalRequest() { assertEquals(INTERNAL_SERVER_ERROR, response.status()); assertEquals( String.format( - "Illegal path requested: %s. Only %s is accepted.", - illegalRequest, VALIDATE_REQUEST_PATH), + "Illegal path requested: %s. Only %s or %s is accepted.", + illegalRequest, VALIDATE_REQUEST_PATH, MUTATOR_REQUEST_PATH), new String(response.content().array())); assertTrue(embeddedChannel.finish()); } @@ -114,4 +124,44 @@ public void testHandleValidateRequestWithAdmissionReview() throws IOException { assertEquals(OK, response.status()); assertTrue(embeddedChannel.finish()); } + + @Test + public void testMutateHandler() throws Exception { + final EmbeddedChannel embeddedChannel = new EmbeddedChannel(admissionHandler); + var sessionJob = new FlinkSessionJob(); + sessionJob.setSpec( + FlinkSessionJobSpec.builder() + .job(JobSpec.builder().jarURI("http://myjob.jar").build()) + .deploymentName("test-session") + .build()); + + final AdmissionRequest admissionRequest = new AdmissionRequest(); + admissionRequest.setOperation(CREATE.name()); + admissionRequest.setObject(sessionJob); + admissionRequest.setKind( + new GroupVersionKind( + sessionJob.getGroup(), sessionJob.getVersion(), sessionJob.getKind())); + final AdmissionReview admissionReview = new AdmissionReview(); + admissionReview.setRequest(admissionRequest); + embeddedChannel.writeInbound( + new DefaultFullHttpRequest( + HTTP_1_1, + GET, + MUTATOR_REQUEST_PATH, + Unpooled.wrappedBuffer( + new ObjectMapper() + .writeValueAsString(admissionReview) + .getBytes()))); + embeddedChannel.writeOutbound(new DefaultFullHttpResponse(HTTP_1_1, OK)); + final DefaultHttpResponse response = embeddedChannel.readOutbound(); + assertEquals(OK, response.status()); + Assertions.assertFalse(embeddedChannel.outboundMessages().isEmpty()); + var body = embeddedChannel.readOutbound(); + Assertions.assertNotNull(body); + var str = new String(((UnpooledHeapByteBuf) body).array()); + var review = new ObjectMapper().readValue(str, AdmissionReview.class); + var patch = new String(Base64.getDecoder().decode(review.getResponse().getPatch())); + Assertions.assertTrue(patch.contains(CrdConstants.LABEL_TARGET_SESSION)); + assertTrue(embeddedChannel.finish()); + } } diff --git a/helm/flink-kubernetes-operator/templates/webhook.yaml b/helm/flink-kubernetes-operator/templates/webhook.yaml index c0ecf67ac1..f8f001fa3a 100644 --- a/helm/flink-kubernetes-operator/templates/webhook.yaml +++ b/helm/flink-kubernetes-operator/templates/webhook.yaml @@ -109,4 +109,36 @@ webhooks: operator: In values: [{{- range .Values.watchNamespaces }}{{ . | quote }},{{- end}}] {{- end }} - {{- end }} +--- +apiVersion: admissionregistration.k8s.io/v1 +kind: MutatingWebhookConfiguration +metadata: + annotations: + cert-manager.io/inject-ca-from: {{ .Release.Namespace }}/flink-operator-serving-cert + name: flink-operator-{{ .Release.Namespace }}-webhook-configuration +webhooks: + - name: flinkoperator.flink.apache.org + admissionReviewVersions: ["v1"] + clientConfig: + service: + name: flink-operator-webhook-service + namespace: {{ .Release.Namespace }} + path: /mutate + failurePolicy: Fail + rules: + - apiGroups: ["*"] + apiVersions: ["*"] + scope: "Namespaced" + operations: + - CREATE + resources: + - flinksessionjobs + sideEffects: None + {{- if .Values.watchNamespaces }} + namespaceSelector: + matchExpressions: + - key: kubernetes.io/metadata.name + operator: In + values: [{{- range .Values.watchNamespaces }}{{ . | quote }},{{- end}}] + {{- end }} +{{- end }}