Skip to content

Commit

Permalink
Fix pod security context (#1889)
Browse files Browse the repository at this point in the history
* Revert "src: Use jobs not plain pods for auxiliary tasks (#1857)"

This reverts commit cb6f33d.

* refactor: move code from openshift

This is needed to avoid circular package dependencies.

Signed-off-by: Matej Vasek <mvasek@redhat.com>

* fix: set pod SC only on non-OpenShift

Signed-off-by: Matej Vasek <mvasek@redhat.com>

---------

Signed-off-by: Matej Vasek <mvasek@redhat.com>
  • Loading branch information
matejvasek committed Jul 26, 2023
1 parent a2834c2 commit a270f9e
Show file tree
Hide file tree
Showing 10 changed files with 223 additions and 215 deletions.
12 changes: 6 additions & 6 deletions cmd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import (
"knative.dev/func/pkg/docker/creds"
fn "knative.dev/func/pkg/functions"
fnhttp "knative.dev/func/pkg/http"
"knative.dev/func/pkg/k8s"
"knative.dev/func/pkg/knative"
"knative.dev/func/pkg/openshift"
"knative.dev/func/pkg/pipelines/tekton"
"knative.dev/func/pkg/progress"
)
Expand Down Expand Up @@ -103,7 +103,7 @@ func NewClient(cfg ClientConfig, options ...fn.Option) (*fn.Client, func()) {
// newTransport returns a transport with cluster-flavor-specific variations
// which take advantage of additional features offered by cluster variants.
func newTransport(insecureSkipVerify bool) fnhttp.RoundTripCloser {
return fnhttp.NewRoundTripper(fnhttp.WithInsecureSkipVerify(insecureSkipVerify), openshift.WithOpenShiftServiceCA())
return fnhttp.NewRoundTripper(fnhttp.WithInsecureSkipVerify(insecureSkipVerify), fnhttp.WithOpenShiftServiceCA())
}

// newCredentialsProvider returns a credentials provider which possibly
Expand All @@ -114,7 +114,7 @@ func newCredentialsProvider(configPath string, t http.RoundTripper) docker.Crede
creds.WithPromptForCredentials(prompt.NewPromptForCredentials(os.Stdin, os.Stdout, os.Stderr)),
creds.WithPromptForCredentialStore(prompt.NewPromptForCredentialStore()),
creds.WithTransport(t),
creds.WithAdditionalCredentialLoaders(openshift.GetDockerCredentialLoaders()...),
creds.WithAdditionalCredentialLoaders(k8s.GetOpenShiftDockerCredentialLoaders()...),
}

// Other cluster variants can be supported here
Expand Down Expand Up @@ -144,18 +144,18 @@ func newKnativeDeployer(namespace string, verbose bool) fn.Deployer {
}

type deployDecorator struct {
oshDec openshift.OpenshiftMetadataDecorator
oshDec k8s.OpenshiftMetadataDecorator
}

func (d deployDecorator) UpdateAnnotations(function fn.Function, annotations map[string]string) map[string]string {
if openshift.IsOpenShift() {
if k8s.IsOpenShift() {
return d.oshDec.UpdateAnnotations(function, annotations)
}
return annotations
}

func (d deployDecorator) UpdateLabels(function fn.Function, labels map[string]string) map[string]string {
if openshift.IsOpenShift() {
if k8s.IsOpenShift() {
return d.oshDec.UpdateLabels(function, labels)
}
return labels
Expand Down
5 changes: 2 additions & 3 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"knative.dev/func/pkg/builders"
fn "knative.dev/func/pkg/functions"
"knative.dev/func/pkg/k8s"
"knative.dev/func/pkg/openshift"
)

const (
Expand Down Expand Up @@ -77,8 +76,8 @@ func (c Global) RegistryDefault() string {
return c.Registry
}
switch {
case openshift.IsOpenShift():
return openshift.GetDefaultRegistry()
case k8s.IsOpenShift():
return k8s.GetDefaultOpenShiftRegistry()
default:
return ""
}
Expand Down
38 changes: 38 additions & 0 deletions pkg/http/openshift.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package http

import (
"context"
"crypto/x509"
"fmt"
"strings"
"sync"

"knative.dev/func/pkg/k8s"
)

const openShiftRegistryHost = "image-registry.openshift-image-registry.svc"

// WithOpenShiftServiceCA enables trust to OpenShift's service CA for internal image registry
func WithOpenShiftServiceCA() Option {
var err error
var ca *x509.Certificate
var o sync.Once

selectCA := func(ctx context.Context, serverName string) (*x509.Certificate, error) {
if strings.HasPrefix(serverName, openShiftRegistryHost) {
o.Do(func() {
ca, err = k8s.GetOpenShiftServiceCA(ctx)
if err != nil {
err = fmt.Errorf("cannot get CA: %w", err)
}
})
if err != nil {
return nil, err
}
return ca, nil
}
return nil, nil
}

return WithSelectCA(selectCA)
}
Original file line number Diff line number Diff line change
@@ -1,24 +1,23 @@
//go:build integration
// +build integration

package openshift_test
package http_test

import (
"net/http"

"testing"

fnhttp "knative.dev/func/pkg/http"
"knative.dev/func/pkg/openshift"
"knative.dev/func/pkg/k8s"
)

func TestRoundTripper(t *testing.T) {
if !openshift.IsOpenShift() {
if !k8s.IsOpenShift() {
t.Skip("The cluster in not an instance of OpenShift.")
return
}

transport := fnhttp.NewRoundTripper(openshift.WithOpenShiftServiceCA())
transport := fnhttp.NewRoundTripper(fnhttp.WithOpenShiftServiceCA())
defer transport.Close()

client := http.Client{
Expand Down
96 changes: 37 additions & 59 deletions pkg/k8s/dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,13 @@ import (
"syscall"
"time"

batchV1 "k8s.io/api/batch/v1"
coreV1 "k8s.io/api/core/v1"
metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
batchv1 "k8s.io/client-go/kubernetes/typed/batch/v1"
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
Expand Down Expand Up @@ -65,9 +64,7 @@ func NewInClusterDialer(ctx context.Context, clientConfig clientcmd.ClientConfig
type contextDialer struct {
coreV1 v1.CoreV1Interface
clientConfig clientcmd.ClientConfig
batchV1 batchv1.BatchV1Interface
restConf *restclient.Config
jobName string
podName string
namespace string
detachChan chan struct{}
Expand Down Expand Up @@ -188,13 +185,9 @@ func (c *contextDialer) Close() error {
close(c.detachChan)
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*1)
defer cancel()
delOpts := metaV1.DeleteOptions{}

pp := metaV1.DeletePropagationForeground
delOpts := metaV1.DeleteOptions{
PropagationPolicy: &pp,
}

return c.batchV1.Jobs(c.namespace).Delete(ctx, c.jobName, delOpts)
return c.coreV1.Pods(c.namespace).Delete(ctx, c.podName, delOpts)
}

func (c *contextDialer) startDialerPod(ctx context.Context) (err error) {
Expand All @@ -213,66 +206,56 @@ func (c *contextDialer) startDialerPod(ctx context.Context) (err error) {
if err != nil {
return
}

c.coreV1 = client.CoreV1()
c.batchV1 = client.BatchV1()

c.namespace, _, err = c.clientConfig.Namespace()
if err != nil {
return
}

jobs := client.BatchV1().Jobs(c.namespace)
pods := client.CoreV1().Pods(c.namespace)

c.podName = "in-cluster-dialer-" + rand.String(5)

defer func() {
if err != nil {
c.Close()
}
}()

c.jobName = "in-cluster-dialer-" + rand.String(5)

job := &batchV1.Job{
pod := &coreV1.Pod{
ObjectMeta: metaV1.ObjectMeta{
Name: c.jobName,
Name: c.podName,
Labels: nil,
Annotations: nil,
},
Spec: batchV1.JobSpec{
Template: coreV1.PodTemplateSpec{
Spec: coreV1.PodSpec{
Containers: []coreV1.Container{
{
Name: "container",
Image: SocatImage,
Stdin: true,
StdinOnce: true,
Command: []string{"socat", "-u", "-", "OPEN:/dev/null"},
},
},
DNSPolicy: coreV1.DNSClusterFirst,
RestartPolicy: coreV1.RestartPolicyNever,
Spec: coreV1.PodSpec{
SecurityContext: defaultPodSecurityContext(),
Containers: []coreV1.Container{
{
Name: c.podName,
Image: SocatImage,
Stdin: true,
StdinOnce: true,
Command: []string{"socat", "-u", "-", "OPEN:/dev/null"},
SecurityContext: defaultSecurityContext(client),
},
},
DNSPolicy: coreV1.DNSClusterFirst,
RestartPolicy: coreV1.RestartPolicyNever,
},
}

creatOpts := metaV1.CreateOptions{}

podChan, err := podReady(ctx, c.coreV1, c.jobName, c.namespace)
if err != nil {
return fmt.Errorf("cannot setup pod watch: %w", err)
}
ready := podReady(ctx, c.coreV1, c.podName, c.namespace)

_, err = jobs.Create(ctx, job, creatOpts)
_, err = pods.Create(ctx, pod, creatOpts)
if err != nil {
return
}

select {
case poe := <-podChan:
if poe.err != nil {
return poe.err
}
c.podName = poe.pod.Name
case err = <-ready:
case <-ctx.Done():
err = ctx.Err()
case <-time.After(time.Minute * 1):
Expand Down Expand Up @@ -310,7 +293,7 @@ func (c *contextDialer) exec(hostPort string, in io.Reader, out, errOut io.Write
SubResource("exec")
req.VersionedParams(&coreV1.PodExecOptions{
Command: []string{"socat", "-dd", "-", fmt.Sprintf("TCP:%s", hostPort)},
Container: "container",
Container: c.podName,
Stdin: true,
Stdout: true,
Stderr: true,
Expand All @@ -337,7 +320,7 @@ func attach(restClient restclient.Interface, restConf *restclient.Config, podNam
Namespace(namespace).
SubResource("attach")
req.VersionedParams(&coreV1.PodAttachOptions{
Container: "container",
Container: podName,
Stdin: true,
Stdout: true,
Stderr: true,
Expand All @@ -357,30 +340,26 @@ func attach(restClient restclient.Interface, restConf *restclient.Config, podNam
})
}

type podOrError struct {
pod *coreV1.Pod
err error
}

func podReady(ctx context.Context, core v1.CoreV1Interface, jobName, namespace string) (result <-chan podOrError, err error) {
outChan := make(chan podOrError, 1)
result = outChan
func podReady(ctx context.Context, core v1.CoreV1Interface, podName, namespace string) (errChan <-chan error) {
d := make(chan error)
errChan = d

pods := core.Pods(namespace)

nameSelector := fields.OneTermEqualSelector("metadata.name", podName).String()
listOpts := metaV1.ListOptions{
Watch: true,
LabelSelector: "job-name=" + jobName,
FieldSelector: nameSelector,
}
watcher, err := pods.Watch(ctx, listOpts)
if err != nil {
return nil, err
return
}

go func() {
defer watcher.Stop()
watchChan := watcher.ResultChan()
for event := range watchChan {
ch := watcher.ResultChan()
for event := range ch {
pod, ok := event.Object.(*coreV1.Pod)
if !ok {
continue
Expand All @@ -389,7 +368,7 @@ func podReady(ctx context.Context, core v1.CoreV1Interface, jobName, namespace s
if event.Type == watch.Modified {
for _, status := range pod.Status.ContainerStatuses {
if status.Ready {
outChan <- podOrError{pod: pod}
d <- nil
return
}
if status.State.Waiting != nil {
Expand All @@ -400,10 +379,9 @@ func podReady(ctx context.Context, core v1.CoreV1Interface, jobName, namespace s
"InvalidImageName",
"CrashLoopBackOff",
"ImagePullBackOff":
e := fmt.Errorf("reason: %v, message: %v",
d <- fmt.Errorf("reason: %v, message: %v",
status.State.Waiting.Reason,
status.State.Waiting.Message)
outChan <- podOrError{err: e}
return
default:
continue
Expand Down
Loading

0 comments on commit a270f9e

Please sign in to comment.