Skip to content

Commit

Permalink
[FLINK-27613] Add label for the session job to help list the session …
Browse files Browse the repository at this point in the history
…cluster
  • Loading branch information
Aitozi committed Jun 14, 2022
1 parent 42f289a commit bf60428
Show file tree
Hide file tree
Showing 9 changed files with 286 additions and 10 deletions.
3 changes: 2 additions & 1 deletion docs/content/docs/operations/helm.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ The configurable parameters of the Helm chart and which default values as detail
| podSecurityContext | Defines privilege and access control settings for a pod or container for pod security context. | runAsUser: 9999<br/>runAsGroup: 9999 |
| operatorSecurityContext | Defines privilege and access control settings for a pod or container for operator security context. | |
| webhookSecurityContext | Defines privilege and access control settings for a pod or container for webhook security context. | |
| webhook.create | Whether to enable webhook to create for flink-kubernetes-operator. | true |
| webhook.create | Whether to enable webhook validator to create for flink-kubernetes-operator. | true |
| wenhook.mutator.create | Whether to enable webhook mutator to create for flink-kubernetes-operator. | True |
| webhook.keystore | The ConfigMap of webhook key store. | useDefaultPassword: true |
| defaultConfiguration.create | Whether to enable default configuration to create for flink-kubernetes-operator. | true |
| defaultConfiguration.append | Whether to append configuration files with configs. | true |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,6 @@ public class CrdConstants {
public static final String API_VERSION = "v1beta1";
public static final String KIND_SESSION_JOB = "FlinkSessionJob";
public static final String KIND_FLINK_DEPLOYMENT = "FlinkDeployment";

public static final String LABEL_TARGET_SESSION = "target.session";
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.flink.kubernetes.operator.admission;

import org.apache.flink.kubernetes.operator.admission.mutator.DefaultRequestMutator;

import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream;
import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
Expand All @@ -40,6 +42,7 @@
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.admission.v1.AdmissionReview;
import io.javaoperatorsdk.admissioncontroller.AdmissionController;
import io.javaoperatorsdk.admissioncontroller.mutation.Mutator;
import io.javaoperatorsdk.admissioncontroller.validation.Validator;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
Expand All @@ -62,11 +65,14 @@ public class AdmissionHandler extends SimpleChannelInboundHandler<HttpRequest> {
private static final Logger LOG = LoggerFactory.getLogger(AdmissionHandler.class);
private static final ObjectMapper objectMapper = new ObjectMapper();
protected static final String VALIDATE_REQUEST_PATH = "/validate";
protected static final String MUTATOR_REQUEST_PATH = "/mutate";

private final AdmissionController<HasMetadata> validatingController;
private final AdmissionController<HasMetadata> mutatorController;

public AdmissionHandler(Validator<HasMetadata> validator) {
public AdmissionHandler(Validator<HasMetadata> validator, Mutator<HasMetadata> mutator) {
this.validatingController = new AdmissionController<>(validator);
this.mutatorController = new AdmissionController<>(new DefaultRequestMutator(mutator));
}

@Override
Expand All @@ -85,11 +91,23 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpRequest httpRequest)
LOG.error("Failed to validate", e);
sendError(ctx, ExceptionUtils.getStackTrace(e));
}
} else if (MUTATOR_REQUEST_PATH.equals(path)) {
final ByteBuf msgContent = ((FullHttpRequest) httpRequest).content();
AdmissionReview review;
try {
InputStream in = new ByteBufInputStream(msgContent);
review = objectMapper.readValue(in, AdmissionReview.class);
AdmissionReview response = mutatorController.handle(review);
sendResponse(ctx, objectMapper.writeValueAsString(response));
} catch (Exception e) {
LOG.error("Failed to mutate", e);
sendError(ctx, ExceptionUtils.getStackTrace(e));
}
} else {
String error =
String.format(
"Illegal path requested: %s. Only %s is accepted.",
path, VALIDATE_REQUEST_PATH);
"Illegal path requested: %s. Only %s or %s is accepted.",
path, VALIDATE_REQUEST_PATH, MUTATOR_REQUEST_PATH);
LOG.error(error);
sendError(ctx, error);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.flink.kubernetes.operator.admission;

import org.apache.flink.kubernetes.operator.admission.mutator.FlinkMutator;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.utils.EnvUtils;
import org.apache.flink.kubernetes.operator.utils.ValidatorUtils;
Expand Down Expand Up @@ -61,7 +62,8 @@ public static void main(String[] args) throws Exception {
FlinkConfigManager configManager = new FlinkConfigManager();
Set<FlinkResourceValidator> validators = ValidatorUtils.discoverValidators(configManager);
AdmissionHandler endpoint =
new AdmissionHandler(new FlinkValidator(validators, configManager));
new AdmissionHandler(
new FlinkValidator(validators, configManager), new FlinkMutator());
ChannelInitializer<SocketChannel> initializer = createChannelInitializer(endpoint);
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.kubernetes.operator.admission.mutator;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.fabric8.kubernetes.api.model.KubernetesResource;
import io.fabric8.kubernetes.api.model.admission.v1.AdmissionRequest;
import io.fabric8.kubernetes.api.model.admission.v1.AdmissionResponse;
import io.fabric8.zjsonpatch.JsonDiff;
import io.javaoperatorsdk.admissioncontroller.AdmissionUtils;
import io.javaoperatorsdk.admissioncontroller.NotAllowedException;
import io.javaoperatorsdk.admissioncontroller.Operation;
import io.javaoperatorsdk.admissioncontroller.RequestHandler;
import io.javaoperatorsdk.admissioncontroller.clone.Cloner;
import io.javaoperatorsdk.admissioncontroller.clone.ObjectMapperCloner;
import io.javaoperatorsdk.admissioncontroller.mutation.Mutator;

import java.nio.charset.StandardCharsets;
import java.util.Base64;

/**
* The default request mutator. It's copied from the {@link
* io.javaoperatorsdk.admissioncontroller.mutation.DefaultRequestMutator} with a modified path diff
* util to serialize out include non-null.
*
* @param <T>
*/
public class DefaultRequestMutator<T extends KubernetesResource> implements RequestHandler {
private static final ObjectMapper mapper =
new ObjectMapper().setSerializationInclusion(JsonInclude.Include.NON_NULL);
private final Mutator<T> mutator;
private final Cloner<T> cloner;

public DefaultRequestMutator(Mutator<T> mutator) {
this(mutator, new ObjectMapperCloner<>());
}

public DefaultRequestMutator(Mutator<T> mutator, Cloner<T> cloner) {
this.mutator = mutator;
this.cloner = cloner;
}

public AdmissionResponse handle(AdmissionRequest admissionRequest) {
Operation operation = Operation.valueOf(admissionRequest.getOperation());
KubernetesResource originalResource =
AdmissionUtils.getTargetResource(admissionRequest, operation);
KubernetesResource clonedResource =
(KubernetesResource) this.cloner.clone((T) originalResource);

AdmissionResponse admissionResponse;
try {
T mutatedResource = this.mutator.mutate((T) clonedResource, operation);
admissionResponse = admissionResponseFromMutation(originalResource, mutatedResource);
} catch (NotAllowedException e) {
admissionResponse = AdmissionUtils.notAllowedExceptionToAdmissionResponse(e);
}

return admissionResponse;
}

public static AdmissionResponse admissionResponseFromMutation(
KubernetesResource originalResource, KubernetesResource mutatedResource) {
AdmissionResponse admissionResponse = new AdmissionResponse();
admissionResponse.setAllowed(true);
// It only allowed JSONPatch now, So we should avoid serialize out null value
// https://github.com/kubernetes/kubernetes/blob/3f1a9f9f3eaeae3d387b9152ea9aebb52be72319/pkg/apis/admission/types.go#L134
admissionResponse.setPatchType("JSONPatch");
JsonNode originalResNode = mapper.valueToTree(originalResource);
JsonNode mutatedResNode = mapper.valueToTree(mutatedResource);
JsonNode diff = JsonDiff.asJson(originalResNode, mutatedResNode);
String base64Diff =
Base64.getEncoder()
.encodeToString(diff.toString().getBytes(StandardCharsets.UTF_8));
admissionResponse.setPatch(base64Diff);
return admissionResponse;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.kubernetes.operator.admission.mutator;

import org.apache.flink.kubernetes.operator.crd.CrdConstants;
import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.admissioncontroller.NotAllowedException;
import io.javaoperatorsdk.admissioncontroller.Operation;
import io.javaoperatorsdk.admissioncontroller.mutation.Mutator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;

/** The default mutator. */
public class FlinkMutator implements Mutator<HasMetadata> {
private static final Logger LOG = LoggerFactory.getLogger(FlinkMutator.class);
private static final ObjectMapper mapper = new ObjectMapper();

@Override
public HasMetadata mutate(HasMetadata resource, Operation operation)
throws NotAllowedException {
if (operation == Operation.CREATE) {
LOG.debug("Mutating resource {}", resource);

if (CrdConstants.KIND_SESSION_JOB.equals(resource.getKind())) {
try {
var sessionJob = mapper.convertValue(resource, FlinkSessionJob.class);
setSessionTargetLabel(sessionJob);
return sessionJob;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
return resource;
}

private void setSessionTargetLabel(FlinkSessionJob flinkSessionJob) {
var labels = flinkSessionJob.getMetadata().getLabels();
if (labels == null) {
labels = new HashMap<>();
}
var deploymentName = flinkSessionJob.getSpec().getDeploymentName();
if (deploymentName != null
&& !deploymentName.equals(labels.get(CrdConstants.LABEL_TARGET_SESSION))) {
labels.put(
CrdConstants.LABEL_TARGET_SESSION,
flinkSessionJob.getSpec().getDeploymentName());
flinkSessionJob.getMetadata().setLabels(labels);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,18 @@

package org.apache.flink.kubernetes.operator.admission;

import org.apache.flink.kubernetes.operator.admission.mutator.FlinkMutator;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.crd.CrdConstants;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.crd.spec.FlinkSessionJobSpec;
import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
import org.apache.flink.kubernetes.operator.utils.ValidatorUtils;

import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
import org.apache.flink.shaded.netty4.io.netty.buffer.UnpooledHeapByteBuf;
import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpRequest;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse;
Expand All @@ -33,11 +39,14 @@
import io.fabric8.kubernetes.api.model.GroupVersionKind;
import io.fabric8.kubernetes.api.model.admission.v1.AdmissionRequest;
import io.fabric8.kubernetes.api.model.admission.v1.AdmissionReview;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.util.Base64;

import static io.javaoperatorsdk.admissioncontroller.Operation.CREATE;
import static org.apache.flink.kubernetes.operator.admission.AdmissionHandler.MUTATOR_REQUEST_PATH;
import static org.apache.flink.kubernetes.operator.admission.AdmissionHandler.VALIDATE_REQUEST_PATH;
import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod.GET;
import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
Expand All @@ -53,7 +62,8 @@ public class AdmissionHandlerTest {
new AdmissionHandler(
new FlinkValidator(
ValidatorUtils.discoverValidators(new FlinkConfigManager()),
new FlinkConfigManager()));
new FlinkConfigManager()),
new FlinkMutator());

@Test
public void testHandleIllegalRequest() {
Expand All @@ -65,8 +75,8 @@ public void testHandleIllegalRequest() {
assertEquals(INTERNAL_SERVER_ERROR, response.status());
assertEquals(
String.format(
"Illegal path requested: %s. Only %s is accepted.",
illegalRequest, VALIDATE_REQUEST_PATH),
"Illegal path requested: %s. Only %s or %s is accepted.",
illegalRequest, VALIDATE_REQUEST_PATH, MUTATOR_REQUEST_PATH),
new String(response.content().array()));
assertTrue(embeddedChannel.finish());
}
Expand Down Expand Up @@ -114,4 +124,44 @@ public void testHandleValidateRequestWithAdmissionReview() throws IOException {
assertEquals(OK, response.status());
assertTrue(embeddedChannel.finish());
}

@Test
public void testMutateHandler() throws Exception {
final EmbeddedChannel embeddedChannel = new EmbeddedChannel(admissionHandler);
var sessionJob = new FlinkSessionJob();
sessionJob.setSpec(
FlinkSessionJobSpec.builder()
.job(JobSpec.builder().jarURI("http://myjob.jar").build())
.deploymentName("test-session")
.build());

final AdmissionRequest admissionRequest = new AdmissionRequest();
admissionRequest.setOperation(CREATE.name());
admissionRequest.setObject(sessionJob);
admissionRequest.setKind(
new GroupVersionKind(
sessionJob.getGroup(), sessionJob.getVersion(), sessionJob.getKind()));
final AdmissionReview admissionReview = new AdmissionReview();
admissionReview.setRequest(admissionRequest);
embeddedChannel.writeInbound(
new DefaultFullHttpRequest(
HTTP_1_1,
GET,
MUTATOR_REQUEST_PATH,
Unpooled.wrappedBuffer(
new ObjectMapper()
.writeValueAsString(admissionReview)
.getBytes())));
embeddedChannel.writeOutbound(new DefaultFullHttpResponse(HTTP_1_1, OK));
final DefaultHttpResponse response = embeddedChannel.readOutbound();
assertEquals(OK, response.status());
Assertions.assertFalse(embeddedChannel.outboundMessages().isEmpty());
var body = embeddedChannel.readOutbound();
Assertions.assertNotNull(body);
var str = new String(((UnpooledHeapByteBuf) body).array());
var review = new ObjectMapper().readValue(str, AdmissionReview.class);
var patch = new String(Base64.getDecoder().decode(review.getResponse().getPatch()));
Assertions.assertTrue(patch.contains(CrdConstants.LABEL_TARGET_SESSION));
assertTrue(embeddedChannel.finish());
}
}
Loading

0 comments on commit bf60428

Please sign in to comment.