Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CLI Refresh: workflow signal command #432

Merged
merged 10 commits into from
Feb 1, 2024
64 changes: 48 additions & 16 deletions temporalcli/commands.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,11 +435,21 @@ func NewTemporalWorkflowDeleteCommand(cctx *CommandContext, parent *TemporalWork
return &s
}

type WorkflowReferenceOptions struct {
WorkflowId string
RunId string
}

func (v *WorkflowReferenceOptions) buildFlags(cctx *CommandContext, f *pflag.FlagSet) {
f.StringVarP(&v.WorkflowId, "workflow-id", "w", "", "Workflow Id.")
_ = cobra.MarkFlagRequired(f, "workflow-id")
f.StringVarP(&v.RunId, "run-id", "r", "", "Run Id.")
}

type TemporalWorkflowDescribeCommand struct {
Parent *TemporalWorkflowCommand
Command cobra.Command
WorkflowId string
RunId string
Parent *TemporalWorkflowCommand
Command cobra.Command
WorkflowReferenceOptions
ResetPoints bool
Raw bool
}
Expand All @@ -456,9 +466,7 @@ func NewTemporalWorkflowDescribeCommand(cctx *CommandContext, parent *TemporalWo
s.Command.Long = "The `temporal workflow describe` command shows information about a given\nWorkflow Execution.\n\nThis information can be used to locate Workflow Executions that weren't able to run successfully.\n\n`temporal workflow describe --workflow-id=meaningful-business-id`\n\nOutput can be shown as printed ('raw') or formatted to only show the Workflow Execution's auto-reset points.\n\n`temporal workflow describe --workflow-id=meaningful-business-id --raw=true --reset-points=true`\n\nUse the command options below to change the information returned by this command."
}
s.Command.Args = cobra.NoArgs
s.Command.Flags().StringVarP(&s.WorkflowId, "workflow-id", "w", "", "Workflow Id.")
_ = cobra.MarkFlagRequired(s.Command.Flags(), "workflow-id")
s.Command.Flags().StringVarP(&s.RunId, "run-id", "r", "", "Run Id.")
s.WorkflowReferenceOptions.buildFlags(cctx, s.Command.Flags())
s.Command.Flags().BoolVar(&s.ResetPoints, "reset-points", false, "Only show auto-reset points.")
s.Command.Flags().BoolVar(&s.Raw, "raw", false, "Print properties without changing their format.")
s.Command.Run = func(c *cobra.Command, args []string) {
Expand Down Expand Up @@ -595,10 +603,9 @@ func NewTemporalWorkflowResetBatchCommand(cctx *CommandContext, parent *Temporal
}

type TemporalWorkflowShowCommand struct {
Parent *TemporalWorkflowCommand
Command cobra.Command
WorkflowId string
RunId string
Parent *TemporalWorkflowCommand
Command cobra.Command
WorkflowReferenceOptions
ResetPoints bool
Follow bool
}
Expand All @@ -615,9 +622,7 @@ func NewTemporalWorkflowShowCommand(cctx *CommandContext, parent *TemporalWorkfl
s.Command.Long = "The `temporal workflow show` command provides the Event History for a\nWorkflow Execution.\n\nUse the options listed below to change the command's behavior."
}
s.Command.Args = cobra.NoArgs
s.Command.Flags().StringVarP(&s.WorkflowId, "workflow-id", "w", "", "Workflow Id.")
_ = cobra.MarkFlagRequired(s.Command.Flags(), "workflow-id")
s.Command.Flags().StringVarP(&s.RunId, "run-id", "r", "", "Run Id.")
s.WorkflowReferenceOptions.buildFlags(cctx, s.Command.Flags())
s.Command.Flags().BoolVar(&s.ResetPoints, "reset-points", false, "Only show auto-reset points.")
s.Command.Flags().BoolVar(&s.Follow, "follow", false, "Follow the progress of a Workflow Execution if it goes to a new run.")
s.Command.Run = func(c *cobra.Command, args []string) {
Expand All @@ -628,19 +633,46 @@ func NewTemporalWorkflowShowCommand(cctx *CommandContext, parent *TemporalWorkfl
return &s
}

type SingleWorkflowOrBatchOptions struct {
WorkflowId string
RunId string
Query string
Reason string
Yes bool
}

func (v *SingleWorkflowOrBatchOptions) buildFlags(cctx *CommandContext, f *pflag.FlagSet) {
f.StringVarP(&v.WorkflowId, "workflow-id", "w", "", "Workflow Id. Either this or query must be set.")
f.StringVarP(&v.RunId, "run-id", "r", "", "Run Id. Cannot be set when query is set.")
f.StringVarP(&v.Query, "query", "q", "", "Start a batch to Signal Workflow Executions with given List Filter. Either this or Workflow Id must be set.")
f.StringVar(&v.Reason, "reason", "", "Reason to perform batch. Only allowed if query is present. Defaults to message with user name and time.")
f.BoolVarP(&v.Yes, "yes", "y", false, "Confirm prompt to perform batch. Only allowed if query is present.")
}

type TemporalWorkflowSignalCommand struct {
Parent *TemporalWorkflowCommand
Command cobra.Command
PayloadInputOptions
Name string
SingleWorkflowOrBatchOptions
}

func NewTemporalWorkflowSignalCommand(cctx *CommandContext, parent *TemporalWorkflowCommand) *TemporalWorkflowSignalCommand {
var s TemporalWorkflowSignalCommand
s.Parent = parent
s.Command.DisableFlagsInUseLine = true
s.Command.Use = "signal [flags]"
s.Command.Short = "Signal Workflow Execution by Id or List Filter."
s.Command.Long = "TODO"
s.Command.Short = "Signal Workflow Execution by Id."
if hasHighlighting {
s.Command.Long = "The \x1b[1mtemporal workflow signal\x1b[0m command is used to Signal a\nWorkflow Execution by ID.\n\n\x1b[1mtemporal workflow signal \\\n\t\t--workflow-id MyWorkflowId \\\n\t\t--name MySignal \\\n\t\t--input '{\"Input\": \"As-JSON\"}'\x1b[0m\n\nUse the options listed below to change the command's behavior."
} else {
s.Command.Long = "The `temporal workflow signal` command is used to Signal a\nWorkflow Execution by ID.\n\n```\ntemporal workflow signal \\\n\t\t--workflow-id MyWorkflowId \\\n\t\t--name MySignal \\\n\t\t--input '{\"Input\": \"As-JSON\"}'\n```\n\nUse the options listed below to change the command's behavior."
}
s.Command.Args = cobra.NoArgs
s.PayloadInputOptions.buildFlags(cctx, s.Command.Flags())
s.Command.Flags().StringVar(&s.Name, "name", "", "Signal Name.")
_ = cobra.MarkFlagRequired(s.Command.Flags(), "name")
s.SingleWorkflowOrBatchOptions.buildFlags(cctx, s.Command.Flags())
s.Command.Run = func(c *cobra.Command, args []string) {
if err := s.run(cctx, args); err != nil {
cctx.Options.Fail(err)
Expand Down
22 changes: 21 additions & 1 deletion temporalcli/commands.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package temporalcli

import (
"bufio"
"context"
"encoding/json"
"fmt"
Expand Down Expand Up @@ -59,7 +60,8 @@ type CommandOptions struct {
// related to env config stuff above.
LookupEnv func(string) (string, bool)

// These two fields below default to OS values
// These three fields below default to OS values
Stdin io.Reader
Stdout io.Writer
Stderr io.Writer

Expand Down Expand Up @@ -87,6 +89,9 @@ func (c *CommandContext) preprocessOptions() error {
c.Options.LookupEnv = os.LookupEnv
}

if c.Options.Stdin == nil {
c.Options.Stdin = os.Stdin
}
if c.Options.Stdout == nil {
c.Options.Stdout = os.Stdout
}
Expand Down Expand Up @@ -249,6 +254,21 @@ func (c *CommandContext) populateFlagsFromEnv(flags *pflag.FlagSet) error {
return flagErr
}

// Returns error if JSON output enabled
func (c *CommandContext) promptYes(message string, autoConfirm bool) (bool, error) {
if c.JSONOutput && !autoConfirm {
return false, fmt.Errorf("must bypass prompts when using JSON output")
}
c.Printer.Print(message, " ")
if autoConfirm {
c.Printer.Println("yes")
return true, nil
}
line, _ := bufio.NewReader(c.Options.Stdin).ReadString('\n')
line = strings.TrimSpace(strings.ToLower(line))
return line == "y" || line == "yes", nil
}

// Execute runs the Temporal CLI with the given context and options. This
// intentionally does not return an error but rather invokes Fail on the
// options.
Expand Down
134 changes: 131 additions & 3 deletions temporalcli/commands.workflow.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
package temporalcli

import "fmt"
import (
"fmt"
"os/user"

"github.com/google/uuid"
"github.com/temporalio/cli/temporalcli/internal/printer"
"go.temporal.io/api/batch/v1"
"go.temporal.io/api/common/v1"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/sdk/client"
)

func (*TemporalWorkflowCancelCommand) run(*CommandContext, []string) error {
return fmt.Errorf("TODO")
Expand All @@ -22,8 +32,52 @@ func (*TemporalWorkflowResetBatchCommand) run(*CommandContext, []string) error {
return fmt.Errorf("TODO")
}

func (*TemporalWorkflowSignalCommand) run(*CommandContext, []string) error {
return fmt.Errorf("TODO")
func (c *TemporalWorkflowSignalCommand) run(cctx *CommandContext, args []string) error {
cl, err := c.Parent.ClientOptions.dialClient(cctx)
if err != nil {
return err
}
defer cl.Close()

// Get input payloads
input, err := c.buildRawInputPayloads()
if err != nil {
return err
}

exec, batchReq, err := c.workflowExecOrBatch(cctx, c.Parent.Namespace, cl)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did this just happen to be the first command implemented for which batch was an option?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

if err != nil {
return err
}

// Run single or batch
if exec != nil {
// We have to use the raw signal service call here because the Go SDK's
// signal call doesn't accept multiple arguments.
_, err = cl.WorkflowService().SignalWorkflowExecution(cctx, &workflowservice.SignalWorkflowExecutionRequest{
Namespace: c.Parent.Namespace,
WorkflowExecution: &common.WorkflowExecution{WorkflowId: c.WorkflowId, RunId: c.RunId},
SignalName: c.Name,
Input: input,
Identity: clientIdentity(),
})
if err != nil {
return fmt.Errorf("failed signalling workflow: %w", err)
}
cctx.Printer.Println("Signal workflow succeeded")
} else if batchReq != nil {
batchReq.Operation = &workflowservice.StartBatchOperationRequest_SignalOperation{
SignalOperation: &batch.BatchOperationSignal{
Signal: c.Name,
Input: input,
Identity: clientIdentity(),
},
}
if err := startBatchJob(cctx, cl, batchReq); err != nil {
return err
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that the contract of workflowExecOrBatch is such that one or other of the above conditions must be true, so I'd like the code not to appear to allow neither to be executed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is leftover from previous code where it could return all nils when you declined the prompt, but I decided to change that to be an error and did not change this.

return nil
}

func (*TemporalWorkflowStackCommand) run(*CommandContext, []string) error {
Expand All @@ -41,3 +95,77 @@ func (*TemporalWorkflowTraceCommand) run(*CommandContext, []string) error {
func (*TemporalWorkflowUpdateCommand) run(*CommandContext, []string) error {
return fmt.Errorf("TODO")
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although it's a private function, it would be good to document this something like

This function guarantees that exactly one of the 3 returned values will be non-nil.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Concur

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK great, I can make these minor changes.

func (s *SingleWorkflowOrBatchOptions) workflowExecOrBatch(
cctx *CommandContext,
namespace string,
cl client.Client,
) (*common.WorkflowExecution, *workflowservice.StartBatchOperationRequest, error) {
// If workflow is set, we return single execution
if s.WorkflowId != "" {
if s.Query != "" {
return nil, nil, fmt.Errorf("cannot set query when workflow ID is set")
} else if s.Reason != "" {
return nil, nil, fmt.Errorf("cannot set reason when workflow ID is set")
} else if s.Yes {
return nil, nil, fmt.Errorf("cannot set 'yes' when workflow ID is set")
}
return &common.WorkflowExecution{WorkflowId: s.WorkflowId, RunId: s.RunId}, nil, nil
}

// Check query is set properly
if s.Query == "" {
return nil, nil, fmt.Errorf("must set either workflow ID or query")
} else if s.WorkflowId != "" {
return nil, nil, fmt.Errorf("cannot set workflow ID when query is set")
} else if s.RunId != "" {
return nil, nil, fmt.Errorf("cannot set run ID when query is set")
}

// Count the workflows that will be affected
count, err := cl.CountWorkflow(cctx, &workflowservice.CountWorkflowExecutionsRequest{Query: s.Query})
if err != nil {
return nil, nil, fmt.Errorf("failed counting workflows from query: %w", err)
}
yes, err := cctx.promptYes(
fmt.Sprintf("Start batch against approximately %v workflow(s)? y/N", count.Count), s.Yes)
if err != nil {
return nil, nil, err
} else if !yes {
// We consider this a command failure
return nil, nil, fmt.Errorf("user denied confirmation")
}

// Default the reason if not set
reason := s.Reason
if reason == "" {
username := "<unknown-user>"
if u, err := user.Current(); err != nil && u.Username != "" {
username = u.Username
}
reason = "Requested from CLI by " + username
}
Comment on lines +139 to +147
Copy link
Member Author

@cretz cretz Jan 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Current CLI requires reason to be set by user. UI defaults to something like Terminated from Web UI by foo@bar.com, so we follow a similar format, but we just do "Requested" instead of specific verb (batch job carries its own timestamp info).


return nil, &workflowservice.StartBatchOperationRequest{
Namespace: namespace,
JobId: uuid.NewString(),
VisibilityQuery: s.Query,
Reason: reason,
}, nil
}

func startBatchJob(cctx *CommandContext, cl client.Client, req *workflowservice.StartBatchOperationRequest) error {
_, err := cl.WorkflowService().StartBatchOperation(cctx, req)
if err != nil {
return fmt.Errorf("failed starting batch operation: %w", err)
}
if cctx.JSONOutput {
return cctx.Printer.PrintStructured(
struct {
BatchJobID string `json:"batchJobId"`
}{BatchJobID: req.JobId},
printer.StructuredOptions{})
}
cctx.Printer.Printlnf("Started batch for job ID: %v", req.JobId)
return nil
}
19 changes: 16 additions & 3 deletions temporalcli/commands.workflow_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,19 @@ func (w *WorkflowStartOptions) buildStartOptions() (client.StartWorkflowOptions,
}

func (p *PayloadInputOptions) buildRawInput() ([]any, error) {
payloads, err := p.buildRawInputPayloads()
if err != nil {
return nil, err
}
// Convert to raw values that our special data converter understands
ret := make([]any, len(payloads.Payloads))
for i, payload := range payloads.Payloads {
ret[i] = rawValue{payload}
}
return ret, nil
}

func (p *PayloadInputOptions) buildRawInputPayloads() (*common.Payloads, error) {
// Get input strings
var inData [][]byte
for _, in := range p.Input {
Expand All @@ -300,8 +313,8 @@ func (p *PayloadInputOptions) buildRawInput() ([]any, error) {
metadata[metaPieces[0]] = []byte(metaPieces[1])
}

// Convert to raw values
ret := make([]any, len(inData))
// Create payloads
ret := &common.Payloads{Payloads: make([]*common.Payload, len(inData))}
for i, in := range inData {
// First, if it's JSON, validate that it is accurate
if strings.HasPrefix(string(metadata["encoding"]), "json/") && !json.Valid(in) {
Expand All @@ -314,7 +327,7 @@ func (p *PayloadInputOptions) buildRawInput() ([]any, error) {
return nil, fmt.Errorf("input #%v is not valid base64", i+1)
}
}
ret[i] = rawValue{payload: &common.Payload{Data: in, Metadata: metadata}}
ret.Payloads[i] = &common.Payload{Data: in, Metadata: metadata}
}
return ret, nil
}
Expand Down
Loading
Loading