Skip to content

Commit

Permalink
feat: cancel pipeline run on CTRL^C
Browse files Browse the repository at this point in the history
Signed-off-by: Matej Vasek <mvasek@redhat.com>
  • Loading branch information
matejvasek committed Oct 13, 2022
1 parent 4961298 commit df24b62
Showing 1 changed file with 26 additions and 6 deletions.
32 changes: 26 additions & 6 deletions pipelines/tekton/pipeplines_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package tekton

import (
"context"
goErrors "errors"
"fmt"
"sync"

Expand Down Expand Up @@ -149,9 +150,14 @@ func (pp *PipelinesProvider) Run(ctx context.Context, f fn.Function) error {
return fmt.Errorf("problem in creating pipeline run: %v", err)
}

err = pp.watchPipelineRunProgress(pr)
err = pp.watchPipelineRunProgress(ctx, pr)
if err != nil {
return fmt.Errorf("problem in watching started pipeline run: %v", err)
if !goErrors.Is(err, context.Canceled) {
return fmt.Errorf("problem in watching started pipeline run: %v", err)
}
// TODO replace deletion with pipeline-run cancellation
_ = client.PipelineRuns(pp.namespace).Delete(context.TODO(), pr.Name, metav1.DeleteOptions{})
return fmt.Errorf("pipeline run cancelled: %w", context.Canceled)
}

pr, err = client.PipelineRuns(pp.namespace).Get(ctx, pr.Name, metav1.GetOptions{})
Expand Down Expand Up @@ -236,7 +242,7 @@ func (pp *PipelinesProvider) Remove(ctx context.Context, f fn.Function) error {

// watchPipelineRunProgress watches the progress of the input PipelineRun
// and prints detailed description of the currently executed Tekton Task.
func (pp *PipelinesProvider) watchPipelineRunProgress(pr *v1beta1.PipelineRun) error {
func (pp *PipelinesProvider) watchPipelineRunProgress(ctx context.Context, pr *v1beta1.PipelineRun) error {
taskProgressMsg := map[string]string{
taskNameFetchSources: "Fetching git repository with the function source code",
taskNameBuild: "Building function image on the cluster",
Expand All @@ -250,9 +256,23 @@ func (pp *PipelinesProvider) watchPipelineRunProgress(pr *v1beta1.PipelineRun) e

prTracker := pipelinerun.NewTracker(pr.Name, pp.namespace, clientset)
trChannel := prTracker.Monitor([]string{})

ctxDone := ctx.Done()
wg := sync.WaitGroup{}
for trs := range trChannel {
out:
for {
var trs []taskrun.Run
var ok bool

select {
case trs, ok = <-trChannel:
if !ok {
break out
}
case <-ctxDone:
err = ctx.Err()
break out
}

wg.Add(len(trs))

for _, run := range trs {
Expand All @@ -271,7 +291,7 @@ func (pp *PipelinesProvider) watchPipelineRunProgress(pr *v1beta1.PipelineRun) e
}
wg.Wait()

return nil
return err
}

// getFailedPipelineRunLog returns log message for a failed PipelineRun,
Expand Down

0 comments on commit df24b62

Please sign in to comment.