Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

src: Use jobs not plain pods for auxiliary tasks #1857

Merged
merged 2 commits into from
Jul 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 59 additions & 37 deletions pkg/k8s/dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@ import (
"syscall"
"time"

batchV1 "k8s.io/api/batch/v1"
coreV1 "k8s.io/api/core/v1"
metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
batchv1 "k8s.io/client-go/kubernetes/typed/batch/v1"
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
Expand Down Expand Up @@ -64,7 +65,9 @@ func NewInClusterDialer(ctx context.Context, clientConfig clientcmd.ClientConfig
type contextDialer struct {
coreV1 v1.CoreV1Interface
clientConfig clientcmd.ClientConfig
batchV1 batchv1.BatchV1Interface
restConf *restclient.Config
jobName string
podName string
namespace string
detachChan chan struct{}
Expand Down Expand Up @@ -185,9 +188,13 @@ func (c *contextDialer) Close() error {
close(c.detachChan)
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*1)
defer cancel()
delOpts := metaV1.DeleteOptions{}

return c.coreV1.Pods(c.namespace).Delete(ctx, c.podName, delOpts)
pp := metaV1.DeletePropagationForeground
delOpts := metaV1.DeleteOptions{
PropagationPolicy: &pp,
}

return c.batchV1.Jobs(c.namespace).Delete(ctx, c.jobName, delOpts)
}

func (c *contextDialer) startDialerPod(ctx context.Context) (err error) {
Expand All @@ -206,56 +213,66 @@ func (c *contextDialer) startDialerPod(ctx context.Context) (err error) {
if err != nil {
return
}

c.coreV1 = client.CoreV1()
c.batchV1 = client.BatchV1()

c.namespace, _, err = c.clientConfig.Namespace()
if err != nil {
return
}

pods := client.CoreV1().Pods(c.namespace)

c.podName = "in-cluster-dialer-" + rand.String(5)
jobs := client.BatchV1().Jobs(c.namespace)

defer func() {
if err != nil {
c.Close()
}
}()

pod := &coreV1.Pod{
c.jobName = "in-cluster-dialer-" + rand.String(5)

job := &batchV1.Job{
ObjectMeta: metaV1.ObjectMeta{
Name: c.podName,
Labels: nil,
Annotations: nil,
Name: c.jobName,
},
Spec: coreV1.PodSpec{
SecurityContext: defaultPodSecurityContext(),
Containers: []coreV1.Container{
{
Name: c.podName,
Image: SocatImage,
Stdin: true,
StdinOnce: true,
Command: []string{"socat", "-u", "-", "OPEN:/dev/null"},
SecurityContext: defaultSecurityContext(client),
Spec: batchV1.JobSpec{
Template: coreV1.PodTemplateSpec{
Spec: coreV1.PodSpec{
Containers: []coreV1.Container{
{
Name: "container",
Image: SocatImage,
Stdin: true,
StdinOnce: true,
Command: []string{"socat", "-u", "-", "OPEN:/dev/null"},
},
},
DNSPolicy: coreV1.DNSClusterFirst,
RestartPolicy: coreV1.RestartPolicyNever,
},
},
DNSPolicy: coreV1.DNSClusterFirst,
RestartPolicy: coreV1.RestartPolicyNever,
},
}

creatOpts := metaV1.CreateOptions{}

ready := podReady(ctx, c.coreV1, c.podName, c.namespace)
podChan, err := podReady(ctx, c.coreV1, c.jobName, c.namespace)
if err != nil {
return fmt.Errorf("cannot setup pod watch: %w", err)
}

_, err = pods.Create(ctx, pod, creatOpts)
_, err = jobs.Create(ctx, job, creatOpts)
if err != nil {
return
}

select {
case err = <-ready:
case poe := <-podChan:
if poe.err != nil {
return poe.err
}
c.podName = poe.pod.Name
case <-ctx.Done():
err = ctx.Err()
case <-time.After(time.Minute * 1):
Expand Down Expand Up @@ -293,7 +310,7 @@ func (c *contextDialer) exec(hostPort string, in io.Reader, out, errOut io.Write
SubResource("exec")
req.VersionedParams(&coreV1.PodExecOptions{
Command: []string{"socat", "-dd", "-", fmt.Sprintf("TCP:%s", hostPort)},
Container: c.podName,
Container: "container",
Stdin: true,
Stdout: true,
Stderr: true,
Expand All @@ -320,7 +337,7 @@ func attach(restClient restclient.Interface, restConf *restclient.Config, podNam
Namespace(namespace).
SubResource("attach")
req.VersionedParams(&coreV1.PodAttachOptions{
Container: podName,
Container: "container",
Stdin: true,
Stdout: true,
Stderr: true,
Expand All @@ -340,26 +357,30 @@ func attach(restClient restclient.Interface, restConf *restclient.Config, podNam
})
}

func podReady(ctx context.Context, core v1.CoreV1Interface, podName, namespace string) (errChan <-chan error) {
d := make(chan error)
errChan = d
type podOrError struct {
pod *coreV1.Pod
err error
}

func podReady(ctx context.Context, core v1.CoreV1Interface, jobName, namespace string) (result <-chan podOrError, err error) {
outChan := make(chan podOrError, 1)
result = outChan

pods := core.Pods(namespace)

nameSelector := fields.OneTermEqualSelector("metadata.name", podName).String()
listOpts := metaV1.ListOptions{
Watch: true,
FieldSelector: nameSelector,
LabelSelector: "job-name=" + jobName,
}
watcher, err := pods.Watch(ctx, listOpts)
if err != nil {
return
return nil, err
}

go func() {
defer watcher.Stop()
ch := watcher.ResultChan()
for event := range ch {
watchChan := watcher.ResultChan()
for event := range watchChan {
pod, ok := event.Object.(*coreV1.Pod)
if !ok {
continue
Expand All @@ -368,7 +389,7 @@ func podReady(ctx context.Context, core v1.CoreV1Interface, podName, namespace s
if event.Type == watch.Modified {
for _, status := range pod.Status.ContainerStatuses {
if status.Ready {
d <- nil
outChan <- podOrError{pod: pod}
return
}
if status.State.Waiting != nil {
Expand All @@ -379,9 +400,10 @@ func podReady(ctx context.Context, core v1.CoreV1Interface, podName, namespace s
"InvalidImageName",
"CrashLoopBackOff",
"ImagePullBackOff":
d <- fmt.Errorf("reason: %v, message: %v",
e := fmt.Errorf("reason: %v, message: %v",
status.State.Waiting.Reason,
status.State.Waiting.Message)
outChan <- podOrError{err: e}
return
default:
continue
Expand Down
81 changes: 48 additions & 33 deletions pkg/k8s/persistent_volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"syscall"
"time"

batchV1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
Expand Down Expand Up @@ -101,65 +102,79 @@ func runWithVolumeMounted(ctx context.Context, podImage string, podCommand []str
return fmt.Errorf("cannot get namespace: %w", err)
}

podName := "volume-uploader-" + rand.String(5)
jobName := "volume-uploader-" + rand.String(5)

pods := client.CoreV1().Pods(namespace)
jobs := client.BatchV1().Jobs(namespace)

defer func() {
_ = pods.Delete(ctx, podName, metav1.DeleteOptions{})
pp := metav1.DeletePropagationForeground
delOpts := metav1.DeleteOptions{
PropagationPolicy: &pp,
}
_ = jobs.Delete(ctx, jobName, delOpts)
}()

const volumeMntPoint = "/tmp/volume_mnt"
const pVol = "p-vol"
pod := &corev1.Pod{
job := &batchV1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
Labels: nil,
Annotations: nil,
Name: jobName,
},
Spec: corev1.PodSpec{
SecurityContext: defaultPodSecurityContext(),
Containers: []corev1.Container{
{
Name: podName,
Image: podImage,
Stdin: true,
StdinOnce: true,
WorkingDir: volumeMntPoint,
Command: podCommand,
VolumeMounts: []corev1.VolumeMount{
Spec: batchV1.JobSpec{

Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: pVol,
MountPath: volumeMntPoint,
Name: "container",
Image: podImage,
Stdin: true,
StdinOnce: true,
WorkingDir: volumeMntPoint,
Command: podCommand,
VolumeMounts: []corev1.VolumeMount{
{
Name: pVol,
MountPath: volumeMntPoint,
},
},
},
},
SecurityContext: defaultSecurityContext(client),
Volumes: []corev1.Volume{{
Name: pVol,
VolumeSource: corev1.VolumeSource{
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
ClaimName: claimName,
},
},
}},
DNSPolicy: corev1.DNSClusterFirst,
RestartPolicy: corev1.RestartPolicyNever,
},
},
Volumes: []corev1.Volume{{
Name: pVol,
VolumeSource: corev1.VolumeSource{
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
ClaimName: claimName,
},
},
}},
DNSPolicy: corev1.DNSClusterFirst,
RestartPolicy: corev1.RestartPolicyNever,
},
}

localCtx, cancel := context.WithCancel(ctx)
defer cancel()
ready := podReady(localCtx, client.CoreV1(), podName, namespace)
podChan, err := podReady(localCtx, client.CoreV1(), jobName, namespace)
if err != nil {
return fmt.Errorf("cannot setup pod watch: %w", err)
}

_, err = pods.Create(ctx, pod, metav1.CreateOptions{})
_, err = jobs.Create(ctx, job, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("cannot create pod: %w", err)
}

var podName string
select {
case err = <-ready:
case poe := <-podChan:
if poe.err != nil {
return poe.err
}
podName = poe.pod.Name
case <-ctx.Done():
err = ctx.Err()
case <-time.After(time.Minute * 5):
Expand Down
39 changes: 0 additions & 39 deletions pkg/k8s/security_context.go

This file was deleted.

Loading