Skip to content

Commit

Permalink
Add workflow update start and execute subcommands
Browse files Browse the repository at this point in the history
  • Loading branch information
dandavison committed Aug 28, 2024
1 parent f46c45c commit b618785
Show file tree
Hide file tree
Showing 5 changed files with 243 additions and 70 deletions.
93 changes: 75 additions & 18 deletions temporalcli/commands.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -2156,9 +2156,9 @@ func NewTemporalWorkflowExecuteCommand(cctx *CommandContext, parent *TemporalWor
s.Command.Use = "execute [flags]"
s.Command.Short = "Start new Workflow Execution"
if hasHighlighting {
s.Command.Long = "Establish a new Workflow Execution and direct its progress to stdout. The\ncommand blocks and returns when the Workflow Execution completes. If your\nWorkflow requires input, pass valid JSON:\n\n\x1b[1mtemporal workflow execute\n --workflow-id YourWorkflowId \\\n --type YourWorkflow \\\n --task-queue YourTaskQueue \\\n --input '{\"Input\": \"As-JSON\"}'\x1b[0m\n\nUse \x1b[1m--event-details\x1b[0m to relay updates to the command-line output in JSON\nformat. When using JSON output (\x1b[1m--output json\x1b[0m), this includes the entire\n\"history\" JSON key for the run."
s.Command.Long = "Establish a new Workflow Execution and direct its progress to stdout. The\ncommand blocks and returns when the Workflow Execution completes. If your\nWorkflow requires input, pass valid JSON:\n\n\x1b[1mtemporal workflow execute\n --workflow-id YourWorkflowId \\\n --type YourWorkflow \\\n --task-queue YourTaskQueue \\\n --input '{\"some-key\": \"some-value\"}'\x1b[0m\n\nUse \x1b[1m--event-details\x1b[0m to relay updates to the command-line output in JSON\nformat. When using JSON output (\x1b[1m--output json\x1b[0m), this includes the entire\n\"history\" JSON key for the run."
} else {
s.Command.Long = "Establish a new Workflow Execution and direct its progress to stdout. The\ncommand blocks and returns when the Workflow Execution completes. If your\nWorkflow requires input, pass valid JSON:\n\n```\ntemporal workflow execute\n --workflow-id YourWorkflowId \\\n --type YourWorkflow \\\n --task-queue YourTaskQueue \\\n --input '{\"Input\": \"As-JSON\"}'\n```\n\nUse `--event-details` to relay updates to the command-line output in JSON\nformat. When using JSON output (`--output json`), this includes the entire\n\"history\" JSON key for the run."
s.Command.Long = "Establish a new Workflow Execution and direct its progress to stdout. The\ncommand blocks and returns when the Workflow Execution completes. If your\nWorkflow requires input, pass valid JSON:\n\n```\ntemporal workflow execute\n --workflow-id YourWorkflowId \\\n --type YourWorkflow \\\n --task-queue YourTaskQueue \\\n --input '{\"some-key\": \"some-value\"}'\n```\n\nUse `--event-details` to relay updates to the command-line output in JSON\nformat. When using JSON output (`--output json`), this includes the entire\n\"history\" JSON key for the run."
}
s.Command.Args = cobra.NoArgs
s.SharedWorkflowStartOptions.buildFlags(cctx, s.Command.Flags())
Expand Down Expand Up @@ -2506,9 +2506,9 @@ func NewTemporalWorkflowStartCommand(cctx *CommandContext, parent *TemporalWorkf
s.Command.Use = "start [flags]"
s.Command.Short = "Initiate a Workflow Execution"
if hasHighlighting {
s.Command.Long = "Start a new Workflow Execution. Returns the Workflow- and Run-IDs.\n\n\x1b[1mtemporal workflow start \\\n\t\t--workflow-id YourWorkflowId \\\n\t\t--type YourWorkflow \\\n\t\t--task-queue YourTaskQueue \\\n\t\t--input '{\"Input\": \"As-JSON\"}'\x1b[0m"
s.Command.Long = "Start a new Workflow Execution. Returns the Workflow- and Run-IDs.\n\n\x1b[1mtemporal workflow start \\\n\t\t--workflow-id YourWorkflowId \\\n\t\t--type YourWorkflow \\\n\t\t--task-queue YourTaskQueue \\\n\t\t--input '{\"some-key\": \"some-value\"}'\x1b[0m"
} else {
s.Command.Long = "Start a new Workflow Execution. Returns the Workflow- and Run-IDs.\n\n```\ntemporal workflow start \\\n\t\t--workflow-id YourWorkflowId \\\n\t\t--type YourWorkflow \\\n\t\t--task-queue YourTaskQueue \\\n\t\t--input '{\"Input\": \"As-JSON\"}'\n```"
s.Command.Long = "Start a new Workflow Execution. Returns the Workflow- and Run-IDs.\n\n```\ntemporal workflow start \\\n\t\t--workflow-id YourWorkflowId \\\n\t\t--type YourWorkflow \\\n\t\t--task-queue YourTaskQueue \\\n\t\t--input '{\"some-key\": \"some-value\"}'\n```"
}
s.Command.Args = cobra.NoArgs
s.SharedWorkflowStartOptions.buildFlags(cctx, s.Command.Flags())
Expand Down Expand Up @@ -2598,34 +2598,91 @@ func NewTemporalWorkflowTraceCommand(cctx *CommandContext, parent *TemporalWorkf
type TemporalWorkflowUpdateCommand struct {
Parent *TemporalWorkflowCommand
Command cobra.Command
PayloadInputOptions
}

func NewTemporalWorkflowUpdateCommand(cctx *CommandContext, parent *TemporalWorkflowCommand) *TemporalWorkflowUpdateCommand {
var s TemporalWorkflowUpdateCommand
s.Parent = parent
s.Command.Use = "update"
s.Command.Short = "Start and wait for Updates"
s.Command.Long = "An Update is a synchronous call to a Workflow Execution that can change its\nstate, control its flow, and return a result."
s.Command.Args = cobra.NoArgs
s.Command.AddCommand(&NewTemporalWorkflowUpdateExecuteCommand(cctx, &s).Command)
s.Command.AddCommand(&NewTemporalWorkflowUpdateStartCommand(cctx, &s).Command)
return &s
}

type UpdateOptions struct {
Name string
WorkflowId string
UpdateId string
RunId string
FirstExecutionRunId string
}

func NewTemporalWorkflowUpdateCommand(cctx *CommandContext, parent *TemporalWorkflowCommand) *TemporalWorkflowUpdateCommand {
var s TemporalWorkflowUpdateCommand
func (v *UpdateOptions) buildFlags(cctx *CommandContext, f *pflag.FlagSet) {
f.StringVar(&v.Name, "name", "", "Handler method name. Required. Aliased as \"--type\".")
_ = cobra.MarkFlagRequired(f, "name")
f.StringVarP(&v.WorkflowId, "workflow-id", "w", "", "Workflow ID. Required.")
_ = cobra.MarkFlagRequired(f, "workflow-id")
f.StringVar(&v.UpdateId, "update-id", "", "Update ID. If unset, defaults to a UUID. Must be unique per Workflow Execution.")
f.StringVarP(&v.RunId, "run-id", "r", "", "Run ID. If unset, updates the currently-running Workflow Execution.")
f.StringVar(&v.FirstExecutionRunId, "first-execution-run-id", "", "Parent Run ID. The update is sent to the last Workflow Execution in the chain started with this Run ID.")
}

type TemporalWorkflowUpdateExecuteCommand struct {
Parent *TemporalWorkflowUpdateCommand
Command cobra.Command
UpdateOptions
PayloadInputOptions
}

func NewTemporalWorkflowUpdateExecuteCommand(cctx *CommandContext, parent *TemporalWorkflowUpdateCommand) *TemporalWorkflowUpdateExecuteCommand {
var s TemporalWorkflowUpdateExecuteCommand
s.Parent = parent
s.Command.DisableFlagsInUseLine = true
s.Command.Use = "update [flags]"
s.Command.Short = "Synchronously run a Workflow update handler"
s.Command.Use = "execute [flags]"
s.Command.Short = "Send an Update and wait for it to complete"
if hasHighlighting {
s.Command.Long = "Send a message to a Workflow Execution to invoke an update handler. An update\ncan change the state of a Workflow Execution and return a response:\n\n\x1b[1mtemporal workflow update \\\n --workflow-id YourWorkflowId \\\n --name YourUpdate \\\n --input '{\"Input\": \"As-JSON\"}'\x1b[0m"
s.Command.Long = "Send a message to a Workflow Execution to invoke an Update handler, and wait for\nthe update to complete or fail. You can also use this to wait for an existing\nupdate to complete, by submitting an existing update ID.\n\n\x1b[1mtemporal workflow update execute \\\n --workflow-id YourWorkflowId \\\n --name YourUpdate \\\n --input '{\"some-key\": \"some-value\"}'\x1b[0m"
} else {
s.Command.Long = "Send a message to a Workflow Execution to invoke an update handler. An update\ncan change the state of a Workflow Execution and return a response:\n\n```\ntemporal workflow update \\\n --workflow-id YourWorkflowId \\\n --name YourUpdate \\\n --input '{\"Input\": \"As-JSON\"}'\n```"
s.Command.Long = "Send a message to a Workflow Execution to invoke an Update handler, and wait for\nthe update to complete or fail. You can also use this to wait for an existing\nupdate to complete, by submitting an existing update ID.\n\n```\ntemporal workflow update execute \\\n --workflow-id YourWorkflowId \\\n --name YourUpdate \\\n --input '{\"some-key\": \"some-value\"}'\n```"
}
s.Command.Args = cobra.NoArgs
s.UpdateOptions.buildFlags(cctx, s.Command.Flags())
s.PayloadInputOptions.buildFlags(cctx, s.Command.Flags())
s.Command.Flags().SetNormalizeFunc(aliasNormalizer(map[string]string{
"type": "name",
}))
s.Command.Run = func(c *cobra.Command, args []string) {
if err := s.run(cctx, args); err != nil {
cctx.Options.Fail(err)
}
}
return &s
}

type TemporalWorkflowUpdateStartCommand struct {
Parent *TemporalWorkflowUpdateCommand
Command cobra.Command
UpdateOptions
PayloadInputOptions
}

func NewTemporalWorkflowUpdateStartCommand(cctx *CommandContext, parent *TemporalWorkflowUpdateCommand) *TemporalWorkflowUpdateStartCommand {
var s TemporalWorkflowUpdateStartCommand
s.Parent = parent
s.Command.DisableFlagsInUseLine = true
s.Command.Use = "start [flags]"
s.Command.Short = "Send an Update and wait for it to be accepted"
if hasHighlighting {
s.Command.Long = "Send a message to a Workflow Execution to invoke an Update handler, and wait for\nthe update to be accepted or rejected. You can subsequently wait for the update\nto complete by using \x1b[1mtemporal workflow update execute\x1b[0m.\n\n\x1b[1mtemporal workflow update start \\\n --workflow-id YourWorkflowId \\\n --name YourUpdate \\\n --input '{\"some-key\": \"some-value\"}'\x1b[0m"
} else {
s.Command.Long = "Send a message to a Workflow Execution to invoke an Update handler, and wait for\nthe update to be accepted or rejected. You can subsequently wait for the update\nto complete by using `temporal workflow update execute`.\n\n```\ntemporal workflow update start \\\n --workflow-id YourWorkflowId \\\n --name YourUpdate \\\n --input '{\"some-key\": \"some-value\"}'\n```"
}
s.Command.Args = cobra.NoArgs
s.UpdateOptions.buildFlags(cctx, s.Command.Flags())
s.PayloadInputOptions.buildFlags(cctx, s.Command.Flags())
s.Command.Flags().StringVar(&s.Name, "name", "", "Handler method name. Required. Aliased as \"--type\".")
_ = cobra.MarkFlagRequired(s.Command.Flags(), "name")
s.Command.Flags().StringVarP(&s.WorkflowId, "workflow-id", "w", "", "Workflow ID. Required.")
_ = cobra.MarkFlagRequired(s.Command.Flags(), "workflow-id")
s.Command.Flags().StringVar(&s.UpdateId, "update-id", "", "Update ID. If unset, defaults to a UUID. Must be unique per Workflow Execution.")
s.Command.Flags().StringVarP(&s.RunId, "run-id", "r", "", "Run ID. If unset, updates the currently-running Workflow Execution.")
s.Command.Flags().StringVar(&s.FirstExecutionRunId, "first-execution-run-id", "", "Parent Run ID. The update is sent to the last Workflow Execution in the chain started with this Run ID.")
s.Command.Flags().SetNormalizeFunc(aliasNormalizer(map[string]string{
"type": "name",
}))
Expand Down
44 changes: 31 additions & 13 deletions temporalcli/commands.workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ func (c *TemporalWorkflowSignalCommand) run(cctx *CommandContext, args []string)
}
defer cl.Close()

// Get input payloads
input, err := c.buildRawInputPayloads()
if err != nil {
return err
Expand Down Expand Up @@ -187,33 +186,53 @@ func (c *TemporalWorkflowTerminateCommand) run(cctx *CommandContext, _ []string)
return nil
}

func (c *TemporalWorkflowUpdateCommand) run(cctx *CommandContext, args []string) error {
cl, err := c.Parent.ClientOptions.dialClient(cctx)
func (c *TemporalWorkflowUpdateStartCommand) run(cctx *CommandContext, args []string) error {
return workflowUpdateHelper(cctx, c.Parent.Parent.ClientOptions, c.PayloadInputOptions, c.UpdateOptions, client.WorkflowUpdateStageAccepted)
}

func (c *TemporalWorkflowUpdateExecuteCommand) run(cctx *CommandContext, args []string) error {
return workflowUpdateHelper(cctx, c.Parent.Parent.ClientOptions, c.PayloadInputOptions, c.UpdateOptions, client.WorkflowUpdateStageCompleted)
}

func workflowUpdateHelper(cctx *CommandContext,
clientOpts ClientOptions,
inputOpts PayloadInputOptions,
updateOpts UpdateOptions,
waitForStage client.WorkflowUpdateStage,
) error {
cl, err := clientOpts.dialClient(cctx)
if err != nil {
return err
}
defer cl.Close()

// Get raw input
input, err := c.buildRawInput()
input, err := inputOpts.buildRawInput()
if err != nil {
return err
}

request := client.UpdateWorkflowOptions{
WorkflowID: c.WorkflowId,
RunID: c.RunId,
UpdateName: c.Name,
UpdateID: c.UpdateId,
FirstExecutionRunID: c.FirstExecutionRunId,
WorkflowID: updateOpts.WorkflowId,
RunID: updateOpts.RunId,
UpdateName: updateOpts.Name,
UpdateID: updateOpts.UpdateId,
FirstExecutionRunID: updateOpts.FirstExecutionRunId,
Args: input,
WaitForStage: client.WorkflowUpdateStageCompleted,
WaitForStage: waitForStage,
}

updateHandle, err := cl.UpdateWorkflow(cctx, request)
if err != nil {
return fmt.Errorf("unable to update workflow: %w", err)
}
if waitForStage == client.WorkflowUpdateStageAccepted {
return cctx.Printer.PrintStructured(
struct {
Name string `json:"name"`
UpdateID string `json:"updateId"`
}{Name: updateOpts.Name, UpdateID: updateHandle.UpdateID()},
printer.StructuredOptions{})
}

var valuePtr interface{}
err = updateHandle.Get(cctx, &valuePtr)
Expand All @@ -226,7 +245,7 @@ func (c *TemporalWorkflowUpdateCommand) run(cctx *CommandContext, args []string)
Name string `json:"name"`
UpdateID string `json:"updateId"`
Result interface{} `json:"result"`
}{Name: c.Name, UpdateID: updateHandle.UpdateID(), Result: valuePtr},
}{Name: updateOpts.Name, UpdateID: updateHandle.UpdateID(), Result: valuePtr},
printer.StructuredOptions{})
}

Expand Down Expand Up @@ -330,7 +349,6 @@ func queryHelper(cctx *CommandContext,
}
defer cl.Close()

// Get input payloads
input, err := inputOpts.buildRawInputPayloads()
if err != nil {
return err
Expand Down
Loading

0 comments on commit b618785

Please sign in to comment.