Skip to content

Commit

Permalink
Add batch support to signal
Browse files Browse the repository at this point in the history
  • Loading branch information
cretz committed Jan 30, 2024
1 parent 2d07efc commit 0819721
Show file tree
Hide file tree
Showing 7 changed files with 270 additions and 21 deletions.
20 changes: 18 additions & 2 deletions temporalcli/commands.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -633,12 +633,28 @@ func NewTemporalWorkflowShowCommand(cctx *CommandContext, parent *TemporalWorkfl
return &s
}

type SingleWorkflowOrBatchOptions struct {
WorkflowId string
RunId string
Query string
Reason string
Yes bool
}

func (v *SingleWorkflowOrBatchOptions) buildFlags(cctx *CommandContext, f *pflag.FlagSet) {
f.StringVarP(&v.WorkflowId, "workflow-id", "w", "", "Workflow Id. Either this or query must be set.")
f.StringVarP(&v.RunId, "run-id", "r", "", "Run Id. Cannot be set when query is set.")
f.StringVarP(&v.Query, "query", "q", "", "Start a batch to Signal Workflow Executions with given List Filter. Either this or Workflow Id must be set.")
f.StringVar(&v.Reason, "reason", "", "Reason to perform batch. Only allowed, and required if query is present.")
f.BoolVarP(&v.Yes, "yes", "y", false, "Confirm prompt to perform batch. Only allowed if query is present.")
}

type TemporalWorkflowSignalCommand struct {
Parent *TemporalWorkflowCommand
Command cobra.Command
WorkflowReferenceOptions
PayloadInputOptions
Name string
SingleWorkflowOrBatchOptions
}

func NewTemporalWorkflowSignalCommand(cctx *CommandContext, parent *TemporalWorkflowCommand) *TemporalWorkflowSignalCommand {
Expand All @@ -653,10 +669,10 @@ func NewTemporalWorkflowSignalCommand(cctx *CommandContext, parent *TemporalWork
s.Command.Long = "The `temporal workflow signal` command is used to Signal a\nWorkflow Execution by ID.\n\n```\ntemporal workflow signal \\\n\t\t--workflow-id MyWorkflowId \\\n\t\t--name MySignal \\\n\t\t--input '{\"Input\": \"As-JSON\"}'\n```\n\nUse the options listed below to change the command's behavior."
}
s.Command.Args = cobra.NoArgs
s.WorkflowReferenceOptions.buildFlags(cctx, s.Command.Flags())
s.PayloadInputOptions.buildFlags(cctx, s.Command.Flags())
s.Command.Flags().StringVar(&s.Name, "name", "", "Signal Name.")
_ = cobra.MarkFlagRequired(s.Command.Flags(), "name")
s.SingleWorkflowOrBatchOptions.buildFlags(cctx, s.Command.Flags())
s.Command.Run = func(c *cobra.Command, args []string) {
if err := s.run(cctx, args); err != nil {
cctx.Options.Fail(err)
Expand Down
22 changes: 21 additions & 1 deletion temporalcli/commands.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package temporalcli

import (
"bufio"
"context"
"encoding/json"
"fmt"
Expand Down Expand Up @@ -59,7 +60,8 @@ type CommandOptions struct {
// related to env config stuff above.
LookupEnv func(string) (string, bool)

// These two fields below default to OS values
// These three fields below default to OS values
Stdin io.Reader
Stdout io.Writer
Stderr io.Writer

Expand Down Expand Up @@ -87,6 +89,9 @@ func (c *CommandContext) preprocessOptions() error {
c.Options.LookupEnv = os.LookupEnv
}

if c.Options.Stdin == nil {
c.Options.Stdin = os.Stdin
}
if c.Options.Stdout == nil {
c.Options.Stdout = os.Stdout
}
Expand Down Expand Up @@ -249,6 +254,21 @@ func (c *CommandContext) populateFlagsFromEnv(flags *pflag.FlagSet) error {
return flagErr
}

// Returns error if JSON output enabled
func (c *CommandContext) promptYes(message string, autoConfirm bool) (bool, error) {
if c.JSONOutput && !autoConfirm {
return false, fmt.Errorf("must bypass prompts when using JSON output")
}
c.Printer.Print(message, " ")
if autoConfirm {
c.Printer.Println("yes")
return true, nil
}
line, _ := bufio.NewReader(c.Options.Stdin).ReadString('\n')
line = strings.TrimSpace(strings.ToLower(line))
return line == "y" || line == "yes", nil
}

// Execute runs the Temporal CLI with the given context and options. This
// intentionally does not return an error but rather invokes Fail on the
// options.
Expand Down
111 changes: 100 additions & 11 deletions temporalcli/commands.workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@ package temporalcli
import (
"fmt"

"github.com/google/uuid"
"github.com/temporalio/cli/temporalcli/internal/printer"
"go.temporal.io/api/batch/v1"
"go.temporal.io/api/common/v1"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/sdk/client"
)

func (*TemporalWorkflowCancelCommand) run(*CommandContext, []string) error {
Expand Down Expand Up @@ -40,19 +44,38 @@ func (c *TemporalWorkflowSignalCommand) run(cctx *CommandContext, args []string)
return err
}

// Send signal. We have to use the raw signal service call here because the Go
// SDK's signal call doesn't accept multiple arguments.
_, err = cl.WorkflowService().SignalWorkflowExecution(cctx, &workflowservice.SignalWorkflowExecutionRequest{
Namespace: c.Parent.Namespace,
WorkflowExecution: &common.WorkflowExecution{WorkflowId: c.WorkflowId, RunId: c.RunId},
SignalName: c.Name,
Input: input,
Identity: clientIdentity(),
})
exec, batchReq, err := c.workflowExecOrBatch(cctx, c.Parent.Namespace, cl)
if err != nil {
return fmt.Errorf("failed signalling workflow: %w", err)
return err
}

// Run single or batch or nothing
if exec != nil {
// We have to use the raw signal service call here because the Go SDK's
// signal call doesn't accept multiple arguments.
_, err = cl.WorkflowService().SignalWorkflowExecution(cctx, &workflowservice.SignalWorkflowExecutionRequest{
Namespace: c.Parent.Namespace,
WorkflowExecution: &common.WorkflowExecution{WorkflowId: c.WorkflowId, RunId: c.RunId},
SignalName: c.Name,
Input: input,
Identity: clientIdentity(),
})
if err != nil {
return fmt.Errorf("failed signalling workflow: %w", err)
}
cctx.Printer.Println("Signal workflow succeeded")
} else if batchReq != nil {
batchReq.Operation = &workflowservice.StartBatchOperationRequest_SignalOperation{
SignalOperation: &batch.BatchOperationSignal{
Signal: c.Name,
Input: input,
Identity: clientIdentity(),
},
}
if err := startBatchJob(cctx, cl, batchReq); err != nil {
return err
}
}
cctx.Printer.Println("Signal workflow succeeded")
return nil
}

Expand All @@ -71,3 +94,69 @@ func (*TemporalWorkflowTraceCommand) run(*CommandContext, []string) error {
func (*TemporalWorkflowUpdateCommand) run(*CommandContext, []string) error {
return fmt.Errorf("TODO")
}

// All results can be nil (e.g. if user declined batch)
func (s *SingleWorkflowOrBatchOptions) workflowExecOrBatch(
cctx *CommandContext,
namespace string,
cl client.Client,
) (*common.WorkflowExecution, *workflowservice.StartBatchOperationRequest, error) {
// If workflow is set, we return single execution
if s.WorkflowId != "" {
if s.Query != "" {
return nil, nil, fmt.Errorf("cannot set query when workflow ID is set")
} else if s.Reason != "" {
return nil, nil, fmt.Errorf("cannot set reason when workflow ID is set")
} else if s.Yes {
return nil, nil, fmt.Errorf("cannot set 'yes' when workflow ID is set")
}
return &common.WorkflowExecution{WorkflowId: s.WorkflowId, RunId: s.RunId}, nil, nil
}

// Check query is set properly
if s.Query == "" {
return nil, nil, fmt.Errorf("must set either workflow ID or query")
} else if s.WorkflowId != "" {
return nil, nil, fmt.Errorf("cannot set workflow ID when query is set")
} else if s.RunId != "" {
return nil, nil, fmt.Errorf("cannot set run ID when query is set")
} else if s.Reason == "" {
return nil, nil, fmt.Errorf("reason required when query is set")
}

// Count the workflows that will be affected
count, err := cl.CountWorkflow(cctx, &workflowservice.CountWorkflowExecutionsRequest{Query: s.Query})
if err != nil {
return nil, nil, fmt.Errorf("failed counting workflows from query: %w", err)
}
yes, err := cctx.promptYes(
fmt.Sprintf("Start batch against approximately %v workflow(s)? y/N", count.Count), s.Yes)
if err != nil {
return nil, nil, err
} else if !yes {
// We consider this a command failure
return nil, nil, fmt.Errorf("user denied confirmation")
}
return nil, &workflowservice.StartBatchOperationRequest{
Namespace: namespace,
JobId: uuid.NewString(),
VisibilityQuery: s.Query,
Reason: s.Reason,
}, nil
}

func startBatchJob(cctx *CommandContext, cl client.Client, req *workflowservice.StartBatchOperationRequest) error {
_, err := cl.WorkflowService().StartBatchOperation(cctx, req)
if err != nil {
return fmt.Errorf("failed starting batch operation: %w", err)
}
cctx.Logger.Info("Started batch", "jobId", req.JobId)
if cctx.JSONOutput {
return cctx.Printer.PrintStructured(
struct {
BatchJobID string `json:"batchJobId"`
}{BatchJobID: req.JobId},
printer.StructuredOptions{})
}
return nil
}
81 changes: 80 additions & 1 deletion temporalcli/commands.workflow_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
package temporalcli_test

import (
"encoding/json"
"time"

"github.com/google/uuid"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/workflow"
)

func (s *SharedServerSuite) TestWorkflow_Signal_SimpleSuccess() {
func (s *SharedServerSuite) TestWorkflow_Signal_SingleWorkflowSuccess() {
// Make workflow wait for signal and then return it
s.Worker.OnDevWorkflow(func(ctx workflow.Context, a any) (any, error) {
var ret any
Expand Down Expand Up @@ -37,3 +42,77 @@ func (s *SharedServerSuite) TestWorkflow_Signal_SimpleSuccess() {
s.NoError(run.Get(s.Context, &actual))
s.Equal(map[string]any{"foo": "bar"}, actual)
}

func (s *SharedServerSuite) TestWorkflow_Signal_BatchWorkflowSuccess() {
res := s.testSignalBatchWorkflow(false)
s.Contains(res.Stdout.String(), "approximately 5 workflow(s)")
s.ContainsOnSameLine(res.Stderr.String(), "Started batch", "jobId")
}

func (s *SharedServerSuite) TestWorkflow_Signal_BatchWorkflowSuccessJSON() {
res := s.testSignalBatchWorkflow(true)
s.ContainsOnSameLine(res.Stderr.String(), "Started batch", "jobId")
var jsonRes map[string]any
s.NoError(json.Unmarshal(res.Stdout.Bytes(), &jsonRes))
s.NotEmpty(jsonRes["batchJobId"])
}

func (s *SharedServerSuite) testSignalBatchWorkflow(json bool) *CommandResult {
// Make workflow wait for signal and then return it
s.Worker.OnDevWorkflow(func(ctx workflow.Context, a any) (any, error) {
var ret any
workflow.GetSignalChannel(ctx, "my-signal").Receive(ctx, &ret)
return ret, nil
})

// Start 5 workflows
runs := make([]client.WorkflowRun, 5)
searchAttr := "keyword-" + uuid.NewString()
for i := range runs {
run, err := s.Client.ExecuteWorkflow(
s.Context,
client.StartWorkflowOptions{
TaskQueue: s.Worker.Options.TaskQueue,
SearchAttributes: map[string]any{"CustomKeywordField": searchAttr},
},
DevWorkflow,
"ignored",
)
s.NoError(err)
runs[i] = run
}

// Wait for all to appear in list
s.Eventually(func() bool {
resp, err := s.Client.ListWorkflow(s.Context, &workflowservice.ListWorkflowExecutionsRequest{
Query: "CustomKeywordField = '" + searchAttr + "'",
})
s.NoError(err)
return len(resp.Executions) == len(runs)
}, 3*time.Second, 100*time.Millisecond)

// Send batch signal with a "y" for non-json or "--yes" for json
args := []string{
"workflow", "signal",
"--address", s.Address(),
"--query", "CustomKeywordField = '" + searchAttr + "'",
"--reason", "for a test",
"--name", "my-signal",
"-i", `{"key": "val"}`,
}
if json {
args = append(args, "--yes", "-o", "json")
} else {
s.CommandHarness.Stdin.WriteString("y\n")
}
res := s.Execute(args...)
s.NoError(res.Err)

// Confirm that all workflows complete with the signal value
for _, run := range runs {
var ret map[string]string
s.NoError(run.Get(s.Context, &ret))
s.Equal(map[string]string{"key": "val"}, ret)
}
return res
}
Loading

0 comments on commit 0819721

Please sign in to comment.