Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

On cluster build without VCS #1298

Merged
merged 8 commits into from
Oct 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions cmd/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,9 +277,6 @@ func runDeploy(cmd *cobra.Command, _ []string, newClient ClientFactory) (err err

// Perform the deployment either remote or local.
if config.Remote {
if f.Build.Git.URL == "" {
return ErrURLRequired // Provides CLI-specific help text
}
// Invoke a remote build/push/deploy pipeline
// Returned is the function with fields like Registry and Image populated.
if f, err = client.RunPipeline(cmd.Context(), f); err != nil {
Expand Down Expand Up @@ -736,9 +733,3 @@ var ErrRegistryRequired = errors.New(`A container registry is required. For exa
--registry docker.io/myusername

To run the command in an interactive mode, use --confirm (-c)`)

var ErrURLRequired = errors.New(`The function is not associated with a Git repository, and needs one in order to perform a remote deployment. For example:

--remote --git-url=https://git.example.com/namespace/myFunction

To run the deploy command in an interactive mode, use --confirm (-c)`)
9 changes: 0 additions & 9 deletions cmd/deploy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,15 +426,6 @@ func TestDeploy_RemoteBuildURLPermutations(t *testing.T) {
if remote != "" && remote != "false" { // default "" is == false.
// REMOTE Assertions

// TODO: (enhancement) allow triggering remote deploy without Git.
// This would tar up the local filesystem and send it to the cluster
// build and deploy. For now URL is required when triggering remote.
if url == "" && err == nil {
t.Fatal("error expected when --remote without a --git-url")
} else {
return // test successfully confirmed this error case
}

if !pipeliner.RunInvoked { // Remote deployer should be triggered
t.Error("remote was not invoked")
}
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ require (
knative.dev/serving v0.34.1-0.20221005094629-080aaa5c6241
)

require github.com/sabhiram/go-gitignore v0.0.0-20210923224102-525f6e181f06

require (
cloud.google.com/go/compute v1.10.0 // indirect
contrib.go.opencensus.io/exporter/ocagent v0.7.1-0.20200907061046-05415f1de66d // indirect
Expand Down Expand Up @@ -189,7 +191,6 @@ require (
github.com/rivo/uniseg v0.2.0 // indirect
github.com/robfig/cron/v3 v3.0.1 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/sabhiram/go-gitignore v0.0.0-20210923224102-525f6e181f06 // indirect
github.com/sergi/go-diff v1.2.0 // indirect
github.com/sirupsen/logrus v1.9.0 // indirect
github.com/spf13/afero v1.9.2 // indirect
Expand Down
45 changes: 20 additions & 25 deletions k8s/dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ func (c *contextDialer) startDialerPod(ctx context.Context) (err error) {
img = i
}

runAsNonRoot := true
pod := &coreV1.Pod{
ObjectMeta: metaV1.ObjectMeta{
Name: c.podName,
Expand All @@ -144,17 +143,12 @@ func (c *contextDialer) startDialerPod(ctx context.Context) (err error) {
Spec: coreV1.PodSpec{
Containers: []coreV1.Container{
{
Name: c.podName,
Image: img,
Stdin: true,
StdinOnce: true,
Command: []string{"socat", "-u", "-", "OPEN:/dev/null"},
SecurityContext: &coreV1.SecurityContext{
Privileged: new(bool),
AllowPrivilegeEscalation: new(bool),
RunAsNonRoot: &runAsNonRoot,
Capabilities: &coreV1.Capabilities{Drop: []coreV1.Capability{"ALL"}},
},
Name: c.podName,
Image: img,
Stdin: true,
StdinOnce: true,
Command: []string{"socat", "-u", "-", "OPEN:/dev/null"},
SecurityContext: defaultSecurityContext(),
},
},
DNSPolicy: coreV1.DNSClusterFirst,
Expand All @@ -163,7 +157,7 @@ func (c *contextDialer) startDialerPod(ctx context.Context) (err error) {
}
creatOpts := metaV1.CreateOptions{}

ready := c.podReady(ctx)
ready := podReady(ctx, c.coreV1, c.podName, c.namespace)

_, err = pods.Create(ctx, pod, creatOpts)
if err != nil {
Expand All @@ -184,7 +178,7 @@ func (c *contextDialer) startDialerPod(ctx context.Context) (err error) {

// attaching to the stdin to automatically Complete the pod on exit
go func() {
_ = c.attach(emptyBlockingReader(c.detachChan), io.Discard, io.Discard)
_ = attach(c.coreV1.RESTClient(), c.restConf, c.podName, c.namespace, emptyBlockingReader(c.detachChan), io.Discard, io.Discard)
}()

return nil
Expand Down Expand Up @@ -229,23 +223,21 @@ func (c *contextDialer) exec(hostPort string, in io.Reader, out, errOut io.Write
})
}

func (c *contextDialer) attach(in io.Reader, out, errOut io.Writer) error {

restClient := c.coreV1.RESTClient()
func attach(restClient restclient.Interface, restConf *restclient.Config, podName, namespace string, in io.Reader, out, errOut io.Writer) error {
req := restClient.Post().
Resource("pods").
Name(c.podName).
Namespace(c.namespace).
Name(podName).
Namespace(namespace).
SubResource("attach")
req.VersionedParams(&coreV1.PodAttachOptions{
Container: c.podName,
Container: podName,
Stdin: true,
Stdout: true,
Stderr: true,
TTY: false,
}, scheme.ParameterCodec)

executor, err := remotecommand.NewSPDYExecutor(c.restConf, "POST", req.URL())
executor, err := remotecommand.NewSPDYExecutor(restConf, "POST", req.URL())
if err != nil {
return err
}
Expand All @@ -258,13 +250,13 @@ func (c *contextDialer) attach(in io.Reader, out, errOut io.Writer) error {
})
}

func (c *contextDialer) podReady(ctx context.Context) (errChan <-chan error) {
func podReady(ctx context.Context, core v1.CoreV1Interface, podName, namespace string) (errChan <-chan error) {
d := make(chan error)
errChan = d

pods := c.coreV1.Pods(c.namespace)
pods := core.Pods(namespace)

nameSelector := fields.OneTermEqualSelector("metadata.name", c.podName).String()
nameSelector := fields.OneTermEqualSelector("metadata.name", podName).String()
listOpts := metaV1.ListOptions{
Watch: true,
FieldSelector: nameSelector,
Expand All @@ -278,7 +270,10 @@ func (c *contextDialer) podReady(ctx context.Context) (errChan <-chan error) {
defer watcher.Stop()
ch := watcher.ResultChan()
for event := range ch {
pod := event.Object.(*coreV1.Pod)
pod, ok := event.Object.(*coreV1.Pod)
if !ok {
continue
}

if event.Type == watch.Modified {
for _, status := range pod.Status.ContainerStatuses {
Expand Down
173 changes: 173 additions & 0 deletions k8s/persistent_volumes.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,22 @@
package k8s

import (
"bytes"
"context"
"errors"
"fmt"
"io"
"strings"
"sync"
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
)

func GetPersistentVolumeClaim(ctx context.Context, name, namespaceOverride string) (*corev1.PersistentVolumeClaim, error) {
Expand Down Expand Up @@ -51,3 +62,165 @@ func DeletePersistentVolumeClaims(ctx context.Context, namespaceOverride string,

return client.CoreV1().PersistentVolumeClaims(namespace).DeleteCollection(ctx, metav1.DeleteOptions{}, listOptions)
}

var TarImage = "quay.io/boson/alpine-socat:1.7.4.3-r1-non-root"

// UploadToVolume uploads files (passed in form of tar stream) into volume.
func UploadToVolume(ctx context.Context, content io.Reader, claimName, namespace string) error {
return runWithVolumeMounted(ctx, TarImage, []string{"tar", "-xmf", "-"}, content, claimName, namespace)
}

// Runs a pod with given image, command and stdin
// while having the volume mounted and working directory set to it.
func runWithVolumeMounted(ctx context.Context, podImage string, podCommand []string, podInput io.Reader, claimName, namespace string) error {
var err error

cliConf := GetClientConfig()
restConf, err := cliConf.ClientConfig()
if err != nil {
return fmt.Errorf("cannot get client config: %w", err)
}
restConf.WarningHandler = restclient.NoWarnings{}

err = setConfigDefaults(restConf)
if err != nil {
return fmt.Errorf("cannot set config defaults: %w", err)
}

client, err := kubernetes.NewForConfig(restConf)
if err != nil {
return fmt.Errorf("cannot create k8s client: %w", err)
}

namespace, err = GetNamespace(namespace)
if err != nil {
return fmt.Errorf("cannot get namespace: %w", err)
}

podName := "volume-uploader-" + rand.String(5)

pods := client.CoreV1().Pods(namespace)

defer func() {
_ = pods.Delete(ctx, podName, metav1.DeleteOptions{})
}()

const volumeMntPoint = "/tmp/volume_mnt"
const pVol = "p-vol"
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
Labels: nil,
Annotations: nil,
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: podName,
Image: podImage,
Stdin: true,
StdinOnce: true,
WorkingDir: volumeMntPoint,
Command: podCommand,
VolumeMounts: []corev1.VolumeMount{
{
Name: pVol,
MountPath: volumeMntPoint,
},
},
SecurityContext: defaultSecurityContext(),
},
},
Volumes: []corev1.Volume{{
Name: pVol,
VolumeSource: corev1.VolumeSource{
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
ClaimName: claimName,
},
},
}},
DNSPolicy: corev1.DNSClusterFirst,
RestartPolicy: corev1.RestartPolicyNever,
},
}

localCtx, cancel := context.WithCancel(ctx)
defer cancel()
ready := podReady(localCtx, client.CoreV1(), podName, namespace)

_, err = pods.Create(ctx, pod, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("cannot create pod: %w", err)
}

select {
case err = <-ready:
case <-ctx.Done():
err = ctx.Err()
case <-time.After(time.Minute * 5):
err = errors.New("timeout waiting for pod to start")
}

if err != nil {
return fmt.Errorf("cannot start the pod: %w", err)
}

nameSelector := fields.OneTermEqualSelector("metadata.name", podName).String()
listOpts := metav1.ListOptions{
FieldSelector: nameSelector,
Watch: true,
}
watcher, err := pods.Watch(localCtx, listOpts)
if err != nil {
return fmt.Errorf("cannot set up the watcher: %w", err)
}
defer watcher.Stop()
termCh := make(chan corev1.ContainerStateTerminated, 1)
go func() {
for event := range watcher.ResultChan() {
p, ok := event.Object.(*corev1.Pod)
if !ok {
continue
}
if len(p.Status.ContainerStatuses) > 0 {
termState := event.Object.(*corev1.Pod).Status.ContainerStatuses[0].State.Terminated
if termState != nil {
termCh <- *termState
break
}
}
}
}()

var outBuff tsBuff
err = attach(client.CoreV1().RESTClient(), restConf, podName, namespace, podInput, &outBuff, &outBuff)
if err != nil {
return fmt.Errorf("cannot attach stdio to the pod: %w", err)
}

termState := <-termCh
if termState.ExitCode != 0 {
cmdOut := strings.Trim(outBuff.String(), "\n")
err = fmt.Errorf("the command failed: exitcode=%d, out=%q", termState.ExitCode, cmdOut)
}

return err
}

// thread safe buffer for logging
type tsBuff struct {
buff bytes.Buffer
mu sync.Mutex
}

func (t *tsBuff) String() string {
t.mu.Lock()
defer t.mu.Unlock()
return t.buff.String()
}

func (t *tsBuff) Write(p []byte) (n int, err error) {
t.mu.Lock()
defer t.mu.Unlock()
return t.buff.Write(p)
}
Loading