Skip to content

Commit

Permalink
On cluster build without VCS (knative#1298)
Browse files Browse the repository at this point in the history
* refactor: attach() and podReady() more general

Signed-off-by: Matej Vasek <mvasek@redhat.com>

* feat: added function for upload to a volume

Signed-off-by: Matej Vasek <mvasek@redhat.com>

* refactor: extract defaultSecurityContext

Signed-off-by: Matej Vasek <mvasek@redhat.com>

* fix: added runtime type check

Signed-off-by: Matej Vasek <mvasek@redhat.com>

* feat: allow on-cluster-build without VCS

Signed-off-by: Matej Vasek <mvasek@redhat.com>

* fix: reword error message

Co-authored-by: Lance Ball <lball@redhat.com>

* fix: runtime type check

Signed-off-by: Matej Vasek <mvasek@redhat.com>

* test: e2e test for direct upload on-cluster-build

Signed-off-by: Matej Vasek <mvasek@redhat.com>

Signed-off-by: Matej Vasek <mvasek@redhat.com>
Co-authored-by: Lance Ball <lball@redhat.com>
  • Loading branch information
matejvasek and lance committed Feb 15, 2023
1 parent 55fd132 commit 145e473
Show file tree
Hide file tree
Showing 16 changed files with 558 additions and 58 deletions.
9 changes: 0 additions & 9 deletions cmd/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,9 +277,6 @@ func runDeploy(cmd *cobra.Command, _ []string, newClient ClientFactory) (err err

// Perform the deployment either remote or local.
if config.Remote {
if f.Build.Git.URL == "" {
return ErrURLRequired // Provides CLI-specific help text
}
// Invoke a remote build/push/deploy pipeline
// Returned is the function with fields like Registry and Image populated.
if f, err = client.RunPipeline(cmd.Context(), f); err != nil {
Expand Down Expand Up @@ -736,9 +733,3 @@ var ErrRegistryRequired = errors.New(`A container registry is required. For exa
--registry docker.io/myusername
To run the command in an interactive mode, use --confirm (-c)`)

var ErrURLRequired = errors.New(`The function is not associated with a Git repository, and needs one in order to perform a remote deployment. For example:
--remote --git-url=https://git.example.com/namespace/myFunction
To run the deploy command in an interactive mode, use --confirm (-c)`)
9 changes: 0 additions & 9 deletions cmd/deploy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,15 +426,6 @@ func TestDeploy_RemoteBuildURLPermutations(t *testing.T) {
if remote != "" && remote != "false" { // default "" is == false.
// REMOTE Assertions

// TODO: (enhancement) allow triggering remote deploy without Git.
// This would tar up the local filesystem and send it to the cluster
// build and deploy. For now URL is required when triggering remote.
if url == "" && err == nil {
t.Fatal("error expected when --remote without a --git-url")
} else {
return // test successfully confirmed this error case
}

if !pipeliner.RunInvoked { // Remote deployer should be triggered
t.Error("remote was not invoked")
}
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ require (
knative.dev/serving v0.34.1-0.20221005094629-080aaa5c6241
)

require github.com/sabhiram/go-gitignore v0.0.0-20210923224102-525f6e181f06

require (
cloud.google.com/go/compute v1.10.0 // indirect
contrib.go.opencensus.io/exporter/ocagent v0.7.1-0.20200907061046-05415f1de66d // indirect
Expand Down Expand Up @@ -189,7 +191,6 @@ require (
github.com/rivo/uniseg v0.2.0 // indirect
github.com/robfig/cron/v3 v3.0.1 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/sabhiram/go-gitignore v0.0.0-20210923224102-525f6e181f06 // indirect
github.com/sergi/go-diff v1.2.0 // indirect
github.com/sirupsen/logrus v1.9.0 // indirect
github.com/spf13/afero v1.9.2 // indirect
Expand Down
45 changes: 20 additions & 25 deletions k8s/dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ func (c *contextDialer) startDialerPod(ctx context.Context) (err error) {
img = i
}

runAsNonRoot := true
pod := &coreV1.Pod{
ObjectMeta: metaV1.ObjectMeta{
Name: c.podName,
Expand All @@ -144,17 +143,12 @@ func (c *contextDialer) startDialerPod(ctx context.Context) (err error) {
Spec: coreV1.PodSpec{
Containers: []coreV1.Container{
{
Name: c.podName,
Image: img,
Stdin: true,
StdinOnce: true,
Command: []string{"socat", "-u", "-", "OPEN:/dev/null"},
SecurityContext: &coreV1.SecurityContext{
Privileged: new(bool),
AllowPrivilegeEscalation: new(bool),
RunAsNonRoot: &runAsNonRoot,
Capabilities: &coreV1.Capabilities{Drop: []coreV1.Capability{"ALL"}},
},
Name: c.podName,
Image: img,
Stdin: true,
StdinOnce: true,
Command: []string{"socat", "-u", "-", "OPEN:/dev/null"},
SecurityContext: defaultSecurityContext(),
},
},
DNSPolicy: coreV1.DNSClusterFirst,
Expand All @@ -163,7 +157,7 @@ func (c *contextDialer) startDialerPod(ctx context.Context) (err error) {
}
creatOpts := metaV1.CreateOptions{}

ready := c.podReady(ctx)
ready := podReady(ctx, c.coreV1, c.podName, c.namespace)

_, err = pods.Create(ctx, pod, creatOpts)
if err != nil {
Expand All @@ -184,7 +178,7 @@ func (c *contextDialer) startDialerPod(ctx context.Context) (err error) {

// attaching to the stdin to automatically Complete the pod on exit
go func() {
_ = c.attach(emptyBlockingReader(c.detachChan), io.Discard, io.Discard)
_ = attach(c.coreV1.RESTClient(), c.restConf, c.podName, c.namespace, emptyBlockingReader(c.detachChan), io.Discard, io.Discard)
}()

return nil
Expand Down Expand Up @@ -229,23 +223,21 @@ func (c *contextDialer) exec(hostPort string, in io.Reader, out, errOut io.Write
})
}

func (c *contextDialer) attach(in io.Reader, out, errOut io.Writer) error {

restClient := c.coreV1.RESTClient()
func attach(restClient restclient.Interface, restConf *restclient.Config, podName, namespace string, in io.Reader, out, errOut io.Writer) error {
req := restClient.Post().
Resource("pods").
Name(c.podName).
Namespace(c.namespace).
Name(podName).
Namespace(namespace).
SubResource("attach")
req.VersionedParams(&coreV1.PodAttachOptions{
Container: c.podName,
Container: podName,
Stdin: true,
Stdout: true,
Stderr: true,
TTY: false,
}, scheme.ParameterCodec)

executor, err := remotecommand.NewSPDYExecutor(c.restConf, "POST", req.URL())
executor, err := remotecommand.NewSPDYExecutor(restConf, "POST", req.URL())
if err != nil {
return err
}
Expand All @@ -258,13 +250,13 @@ func (c *contextDialer) attach(in io.Reader, out, errOut io.Writer) error {
})
}

func (c *contextDialer) podReady(ctx context.Context) (errChan <-chan error) {
func podReady(ctx context.Context, core v1.CoreV1Interface, podName, namespace string) (errChan <-chan error) {
d := make(chan error)
errChan = d

pods := c.coreV1.Pods(c.namespace)
pods := core.Pods(namespace)

nameSelector := fields.OneTermEqualSelector("metadata.name", c.podName).String()
nameSelector := fields.OneTermEqualSelector("metadata.name", podName).String()
listOpts := metaV1.ListOptions{
Watch: true,
FieldSelector: nameSelector,
Expand All @@ -278,7 +270,10 @@ func (c *contextDialer) podReady(ctx context.Context) (errChan <-chan error) {
defer watcher.Stop()
ch := watcher.ResultChan()
for event := range ch {
pod := event.Object.(*coreV1.Pod)
pod, ok := event.Object.(*coreV1.Pod)
if !ok {
continue
}

if event.Type == watch.Modified {
for _, status := range pod.Status.ContainerStatuses {
Expand Down
173 changes: 173 additions & 0 deletions k8s/persistent_volumes.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,22 @@
package k8s

import (
"bytes"
"context"
"errors"
"fmt"
"io"
"strings"
"sync"
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
)

func GetPersistentVolumeClaim(ctx context.Context, name, namespaceOverride string) (*corev1.PersistentVolumeClaim, error) {
Expand Down Expand Up @@ -51,3 +62,165 @@ func DeletePersistentVolumeClaims(ctx context.Context, namespaceOverride string,

return client.CoreV1().PersistentVolumeClaims(namespace).DeleteCollection(ctx, metav1.DeleteOptions{}, listOptions)
}

var TarImage = "quay.io/boson/alpine-socat:1.7.4.3-r1-non-root"

// UploadToVolume uploads files (passed in form of tar stream) into volume.
func UploadToVolume(ctx context.Context, content io.Reader, claimName, namespace string) error {
return runWithVolumeMounted(ctx, TarImage, []string{"tar", "-xmf", "-"}, content, claimName, namespace)
}

// Runs a pod with given image, command and stdin
// while having the volume mounted and working directory set to it.
func runWithVolumeMounted(ctx context.Context, podImage string, podCommand []string, podInput io.Reader, claimName, namespace string) error {
var err error

cliConf := GetClientConfig()
restConf, err := cliConf.ClientConfig()
if err != nil {
return fmt.Errorf("cannot get client config: %w", err)
}
restConf.WarningHandler = restclient.NoWarnings{}

err = setConfigDefaults(restConf)
if err != nil {
return fmt.Errorf("cannot set config defaults: %w", err)
}

client, err := kubernetes.NewForConfig(restConf)
if err != nil {
return fmt.Errorf("cannot create k8s client: %w", err)
}

namespace, err = GetNamespace(namespace)
if err != nil {
return fmt.Errorf("cannot get namespace: %w", err)
}

podName := "volume-uploader-" + rand.String(5)

pods := client.CoreV1().Pods(namespace)

defer func() {
_ = pods.Delete(ctx, podName, metav1.DeleteOptions{})
}()

const volumeMntPoint = "/tmp/volume_mnt"
const pVol = "p-vol"
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
Labels: nil,
Annotations: nil,
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: podName,
Image: podImage,
Stdin: true,
StdinOnce: true,
WorkingDir: volumeMntPoint,
Command: podCommand,
VolumeMounts: []corev1.VolumeMount{
{
Name: pVol,
MountPath: volumeMntPoint,
},
},
SecurityContext: defaultSecurityContext(),
},
},
Volumes: []corev1.Volume{{
Name: pVol,
VolumeSource: corev1.VolumeSource{
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
ClaimName: claimName,
},
},
}},
DNSPolicy: corev1.DNSClusterFirst,
RestartPolicy: corev1.RestartPolicyNever,
},
}

localCtx, cancel := context.WithCancel(ctx)
defer cancel()
ready := podReady(localCtx, client.CoreV1(), podName, namespace)

_, err = pods.Create(ctx, pod, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("cannot create pod: %w", err)
}

select {
case err = <-ready:
case <-ctx.Done():
err = ctx.Err()
case <-time.After(time.Minute * 5):
err = errors.New("timeout waiting for pod to start")
}

if err != nil {
return fmt.Errorf("cannot start the pod: %w", err)
}

nameSelector := fields.OneTermEqualSelector("metadata.name", podName).String()
listOpts := metav1.ListOptions{
FieldSelector: nameSelector,
Watch: true,
}
watcher, err := pods.Watch(localCtx, listOpts)
if err != nil {
return fmt.Errorf("cannot set up the watcher: %w", err)
}
defer watcher.Stop()
termCh := make(chan corev1.ContainerStateTerminated, 1)
go func() {
for event := range watcher.ResultChan() {
p, ok := event.Object.(*corev1.Pod)
if !ok {
continue
}
if len(p.Status.ContainerStatuses) > 0 {
termState := event.Object.(*corev1.Pod).Status.ContainerStatuses[0].State.Terminated
if termState != nil {
termCh <- *termState
break
}
}
}
}()

var outBuff tsBuff
err = attach(client.CoreV1().RESTClient(), restConf, podName, namespace, podInput, &outBuff, &outBuff)
if err != nil {
return fmt.Errorf("cannot attach stdio to the pod: %w", err)
}

termState := <-termCh
if termState.ExitCode != 0 {
cmdOut := strings.Trim(outBuff.String(), "\n")
err = fmt.Errorf("the command failed: exitcode=%d, out=%q", termState.ExitCode, cmdOut)
}

return err
}

// thread safe buffer for logging
type tsBuff struct {
buff bytes.Buffer
mu sync.Mutex
}

func (t *tsBuff) String() string {
t.mu.Lock()
defer t.mu.Unlock()
return t.buff.String()
}

func (t *tsBuff) Write(p []byte) (n int, err error) {
t.mu.Lock()
defer t.mu.Unlock()
return t.buff.Write(p)
}
Loading

0 comments on commit 145e473

Please sign in to comment.