Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Pub/Sub to wait for operation completion #102

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
30 changes: 29 additions & 1 deletion pipelines/internal/commands/run/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,9 @@ package run
import (
"bufio"
"context"
"crypto/rand"
"encoding/base64"
"encoding/binary"
"encoding/json"
"errors"
"flag"
Expand All @@ -125,6 +127,7 @@ import (
"strings"
"time"

"cloud.google.com/go/pubsub"
"github.com/googlegenomics/pipelines-tools/pipelines/internal/commands/watch"
"github.com/googlegenomics/pipelines-tools/pipelines/internal/common"
"golang.org/x/oauth2/google"
Expand Down Expand Up @@ -213,6 +216,13 @@ func runPipeline(ctx context.Context, service *genomics.Service, req *genomics.R
abort := make(chan os.Signal, 1)
signal.Notify(abort, os.Interrupt)

topic, err := newPubSubTopic(ctx, req.Pipeline.Resources.ProjectId)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we want to build topics in the tooling - gcloud can be used for that - I think we just want to support an optional flag.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is one feature that should be enabled by default. I changed the functionality to always use projects//topics/pipelines-tool if only it has the lable created-by:pipelines-tool. If the tool has any problem in using Pub/Sub it will silently switch to long pooling. Also the user can specify --pub-sub=false to opt for long pooling.

if err != nil {
return fmt.Errorf("creating Pub/Sub topic: %v", err)
}
defer topic.Delete(ctx)
req.PubSubTopic = topic.ID()

attempt := uint(1)
for {
req.Pipeline.Resources.VirtualMachine.Preemptible = (attempt <= *pvmAttempts)
Expand All @@ -236,7 +246,7 @@ func runPipeline(ctx context.Context, service *genomics.Service, req *genomics.R
return nil
}

if err := watch.Invoke(ctx, service, req.Pipeline.Resources.ProjectId, []string{lro.Name}); err != nil {
if err := watch.Invoke(ctx, service, req.Pipeline.Resources.ProjectId, []string{lro.Name, "-topic", topic.ID()}); err != nil {
if err, ok := err.(common.PipelineExecutionError); ok && err.IsRetriable() {
if attempt < *pvmAttempts+*attempts {
attempt++
Expand All @@ -250,6 +260,24 @@ func runPipeline(ctx context.Context, service *genomics.Service, req *genomics.R
}
}

func newPubSubTopic(ctx context.Context, projectID string) (*pubsub.Topic, error) {
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return nil, fmt.Errorf("creating Pub/Sub client: %v", err)
}

var id uint64
if err := binary.Read(rand.Reader, binary.LittleEndian, &id); err != nil {
return nil, fmt.Errorf("generating topic name: %v", err)
}

topic, err := client.CreateTopic(ctx, fmt.Sprintf("t%d", id))
if err != nil {
return nil, fmt.Errorf("creating topic: %v", err)
}
return topic, nil
}

func parseJSON(filename string, v interface{}) error {
f, err := os.Open(filename)
if err != nil {
Expand Down
86 changes: 70 additions & 16 deletions pipelines/internal/commands/watch/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@ package watch

import (
"context"
"crypto/rand"
"encoding/binary"
"encoding/json"
"errors"
"flag"
"fmt"
"sync"
"time"

"cloud.google.com/go/pubsub"
"github.com/googlegenomics/pipelines-tools/pipelines/internal/common"
genomics "google.golang.org/api/genomics/v2alpha1"
)
Expand All @@ -32,16 +36,20 @@ var (

actions = flags.Bool("actions", false, "show action details")
details = flags.Bool("details", false, "show event details")
topic = flags.String("topic", "", "the Pub/Sub topic to watch")
)

func Invoke(ctx context.Context, service *genomics.Service, project string, arguments []string) error {
names := common.ParseFlags(flags, arguments)
if len(names) < 1 {
return errors.New("missing operation name")
}
if *topic == "" {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will still want to support polling and I think this doesn't need to be passed: when the operation data is fetched initially if there is a topic we'll start listening instead of polling.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

return errors.New("missing Pub/Sub topic")
}

name := common.ExpandOperationName(project, names[0])
result, err := watch(ctx, service, name)
result, err := watch(ctx, service, project, name, *topic)
if err != nil {
return fmt.Errorf("watching pipeline: %v", err)
}
Expand All @@ -54,26 +62,52 @@ func Invoke(ctx context.Context, service *genomics.Service, project string, argu
return nil
}

func watch(ctx context.Context, service *genomics.Service, name string) (interface{}, error) {
func watch(ctx context.Context, service *genomics.Service, project, name, topic string) (interface{}, error) {
sub, err := newPubSubSubscription(ctx, project, topic)
if err != nil {
return nil, fmt.Errorf("creating Pub/Sub subscription: %v", err)
}
defer sub.Delete(ctx)

ctx, cancel := context.WithCancel(ctx)
defer cancel()

var events []*genomics.Event
const initialDelay = 5 * time.Second
delay := initialDelay
for {
var response interface{}
var receiverErr error
var receiverLock sync.Mutex
err = sub.Receive(ctx, func(ctx context.Context, m *pubsub.Message) {
receiverLock.Lock()
defer receiverLock.Unlock()
m.Ack()

exit := func(r interface{}, err error) {
if ctx.Err() != nil {
return
}
response = r
receiverErr = err
cancel()
}

lro, err := service.Projects.Operations.Get(name).Context(ctx).Do()
if err != nil {
return nil, fmt.Errorf("getting operation status: %v", err)
exit(nil, fmt.Errorf("getting operation status: %v", err))
return
}

var metadata genomics.Metadata
if err := json.Unmarshal(lro.Metadata, &metadata); err != nil {
return nil, fmt.Errorf("parsing metadata: %v", err)
exit(nil, fmt.Errorf("parsing metadata: %v", err))
return
}

if *actions {
*actions = false
encoded, err := json.MarshalIndent(metadata.Pipeline.Actions, "", " ")
if err != nil {
return nil, fmt.Errorf("encoding actions: %v", err)
exit(nil, fmt.Errorf("encoding actions: %v", err))
return
}
fmt.Printf("%s\n", encoded)
}
Expand All @@ -88,20 +122,40 @@ func watch(ctx context.Context, service *genomics.Service, name string) (interfa
}
}
events = metadata.Events
delay = initialDelay
}

if lro.Done {
if lro.Error != nil {
return lro.Error, nil
exit(lro.Error, nil)
return
}
return lro.Response, nil
exit(lro.Response, nil)
}
})
if err != nil && err != context.Canceled {
return nil, fmt.Errorf("receiving message: %v", err)
}
return response, receiverErr
}

time.Sleep(delay)
delay = time.Duration(float64(delay) * 1.5)
if limit := time.Minute; delay > limit {
delay = limit
}
func newPubSubSubscription(ctx context.Context, projectID, topicName string) (*pubsub.Subscription, error) {
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return nil, fmt.Errorf("creating a Pub/Sub client: %v", err)
}

var id uint64
if err := binary.Read(rand.Reader, binary.LittleEndian, &id); err != nil {
return nil, fmt.Errorf("generating subscription name: %v", err)
}

sub, err := client.CreateSubscription(ctx, fmt.Sprintf("s%d", id), pubsub.SubscriptionConfig{
Topic: client.Topic(topicName),
AckDeadline: 10 * time.Second,
ExpirationPolicy: 25 * time.Hour,
})
if err != nil {
return nil, fmt.Errorf("creating subscription: %v", err)
}
return sub, nil
}