Skip to content

Commit

Permalink
Fix for missing k8s logs (#137)
Browse files Browse the repository at this point in the history
* Backoff loop and wait
  • Loading branch information
jzh0u authored and glyn committed Oct 7, 2019
1 parent 10a206a commit bba4b7e
Showing 1 changed file with 48 additions and 19 deletions.
67 changes: 48 additions & 19 deletions driver/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"os"
"path/filepath"
"strings"
"time"

// load credential helpers
_ "k8s.io/client-go/plugin/pkg/client/auth"
Expand All @@ -28,6 +29,7 @@ import (
const (
k8sContainerName = "invocation"
k8sFileSecretVolume = "files"
numBackoffLoops = 6
)

// Driver runs an invocation image in a Kubernetes cluster.
Expand Down Expand Up @@ -234,22 +236,29 @@ func (k *Driver) Run(op *driver.Operation) (driver.OperationResult, error) {
return driver.OperationResult{}, nil
}

selector := metav1.ListOptions{
// Create a selector to detect the job just created
jobSelector := metav1.ListOptions{
LabelSelector: labels.Set(job.ObjectMeta.Labels).String(),
FieldSelector: newSingleFieldSelector("metadata.name", job.ObjectMeta.Name),
}

return driver.OperationResult{}, k.watchJobStatusAndLogs(selector, op.Out)
// Prevent detecting pods from prior jobs by adding the job name to the labels
podSelector := metav1.ListOptions{
LabelSelector: newSingleFieldSelector("job-name", job.ObjectMeta.Name),
}

return driver.OperationResult{}, k.watchJobStatusAndLogs(podSelector, jobSelector, op.Out)
}

func (k *Driver) watchJobStatusAndLogs(selector metav1.ListOptions, out io.Writer) error {
func (k *Driver) watchJobStatusAndLogs(podSelector metav1.ListOptions, jobSelector metav1.ListOptions, out io.Writer) error {
// Stream Pod logs in the background
logsStreamingComplete := make(chan bool)
err := k.streamPodLogs(selector, out, logsStreamingComplete)
err := k.streamPodLogs(podSelector, out, logsStreamingComplete)
if err != nil {
return err
}
// Watch job events and exit on failure/success
watch, err := k.jobs.Watch(selector)
watch, err := k.jobs.Watch(jobSelector)
if err != nil {
return err
}
Expand Down Expand Up @@ -306,22 +315,36 @@ func (k *Driver) streamPodLogs(options metav1.ListOptions, out io.Writer, done c
// The event was for a pod whose logs have already been streamed, so do nothing.
continue
}
req := k.pods.GetLogs(podName, &v1.PodLogOptions{
Container: k8sContainerName,
Follow: true,
})
reader, err := req.Stream()
// There was an error connecting to the pod, so continue the loop and attempt streaming
// logs again next time there is an event for the same pod.
if err != nil {
continue

for i := 0; i < numBackoffLoops; i++ {
time.Sleep(time.Duration(i*i/2) * time.Second)
req := k.pods.GetLogs(podName, &v1.PodLogOptions{
Container: k8sContainerName,
Follow: true,
})
reader, err := req.Stream()
if err != nil {
// There was an error connecting to the pod, so continue the loop and attempt streaming
// the logs again.
continue
}

// Block the loop until all logs from the pod have been processed.
bytesRead, err := io.Copy(out, reader)
reader.Close()
if err != nil {
continue
}
if bytesRead == 0 {
// There is a chance where we have connected to the pod, but it has yet to write something.
// In that case, we continue to to keep streaming until it does.
continue
}
// Set the pod to have successfully streamed data.
streamedLogs[podName] = true
break
}

// We successfully connected to the pod, so mark it as having streamed logs.
streamedLogs[podName] = true
// Block the loop until all logs from the pod have been processed.
io.Copy(out, reader)
reader.Close()
done <- true
}
}()
Expand Down Expand Up @@ -377,6 +400,12 @@ func generateFileSecret(files map[string]string) (*v1.Secret, []v1.VolumeMount)
return secret, mounts
}

func newSingleFieldSelector(k, v string) string {
return labels.Set(map[string]string{
k: v,
}).String()
}

func homeDir() string {
if h := os.Getenv("HOME"); h != "" {
return h
Expand Down

0 comments on commit bba4b7e

Please sign in to comment.