From 53d944ed7ad1bac58f2b03139aaa9fac08a8a59a Mon Sep 17 00:00:00 2001 From: Tim Deeb-Swihart Date: Mon, 5 Feb 2024 15:05:52 -0800 Subject: [PATCH 1/2] Implement `workflow cancel` command --- temporalcli/commands.gen.go | 8 ++- temporalcli/commands.workflow.go | 33 ++++++++- temporalcli/commands.workflow_test.go | 96 +++++++++++++++++++++++++++ temporalcli/commandsmd/commands.md | 19 +++++- 4 files changed, 152 insertions(+), 4 deletions(-) diff --git a/temporalcli/commands.gen.go b/temporalcli/commands.gen.go index f5bd2c27..fc5c93a4 100644 --- a/temporalcli/commands.gen.go +++ b/temporalcli/commands.gen.go @@ -378,6 +378,7 @@ func NewTemporalWorkflowCommand(cctx *CommandContext, parent *TemporalCommand) * type TemporalWorkflowCancelCommand struct { Parent *TemporalWorkflowCommand Command cobra.Command + SingleWorkflowOrBatchOptions } func NewTemporalWorkflowCancelCommand(cctx *CommandContext, parent *TemporalWorkflowCommand) *TemporalWorkflowCancelCommand { @@ -386,8 +387,13 @@ func NewTemporalWorkflowCancelCommand(cctx *CommandContext, parent *TemporalWork s.Command.DisableFlagsInUseLine = true s.Command.Use = "cancel [flags]" s.Command.Short = "Cancel a Workflow Execution." - s.Command.Long = "TODO" + if hasHighlighting { + s.Command.Long = "The \x1b[1mtemporal workflow cancel\x1b[0m command is used to cancel a Workflow Execution. Canceling a running Workflow Execution records a \x1b[1mWorkflowExecutionCancelRequested\x1b[0m event in the Event History. A new Command Task will be scheduled, and the Workflow Execution will perform cleanup work.\n\nExecutions may be cancelled by ID:\n\x1b[1mtemporal workflow cancel --workflow-id MyWorkflowId\x1b[0m\n\n...or in bulk via a visibility query list filter:\n\n\x1b[1mtemporal workflow cancel --query=MyQuery\x1b[0m\n\nUse the options listed below to change the behavior of this command." + } else { + s.Command.Long = "The `temporal workflow cancel` command is used to cancel a Workflow Execution. Canceling a running Workflow Execution records a `WorkflowExecutionCancelRequested` event in the Event History. A new Command Task will be scheduled, and the Workflow Execution will perform cleanup work.\n\nExecutions may be cancelled by ID:\n```\ntemporal workflow cancel --workflow-id MyWorkflowId\n```\n\n...or in bulk via a visibility query list filter:\n\n```\ntemporal workflow cancel --query=MyQuery\n```\n\nUse the options listed below to change the behavior of this command." + } s.Command.Args = cobra.NoArgs + s.SingleWorkflowOrBatchOptions.buildFlags(cctx, s.Command.Flags()) s.Command.Run = func(c *cobra.Command, args []string) { if err := s.run(cctx, args); err != nil { cctx.Options.Fail(err) diff --git a/temporalcli/commands.workflow.go b/temporalcli/commands.workflow.go index 4e3fe790..596daa80 100644 --- a/temporalcli/commands.workflow.go +++ b/temporalcli/commands.workflow.go @@ -12,8 +12,37 @@ import ( "go.temporal.io/sdk/client" ) -func (*TemporalWorkflowCancelCommand) run(*CommandContext, []string) error { - return fmt.Errorf("TODO") +func (c *TemporalWorkflowCancelCommand) run(cctx *CommandContext, args []string) error { + cl, err := c.Parent.ClientOptions.dialClient(cctx) + if err != nil { + return err + } + defer cl.Close() + + exec, batchReq, err := c.workflowExecOrBatch(cctx, c.Parent.Namespace, cl, singleOrBatchOverrides{}) + if err != nil { + return err + } + + // Run single or batch + if exec != nil { + err = cl.CancelWorkflow(cctx, exec.WorkflowId, exec.RunId) + if err != nil { + return fmt.Errorf("failed to cancel workflow: %w", err) + } + cctx.Printer.Println("Canceled workflow") + } else if batchReq != nil { + batchReq.Operation = &workflowservice.StartBatchOperationRequest_CancellationOperation{ + CancellationOperation: &batch.BatchOperationCancellation{ + Identity: clientIdentity(), + }, + } + if err := startBatchJob(cctx, cl, batchReq); err != nil { + return err + } + } + + return nil } func (*TemporalWorkflowDeleteCommand) run(*CommandContext, []string) error { diff --git a/temporalcli/commands.workflow_test.go b/temporalcli/commands.workflow_test.go index 77f1c04b..1ea76616 100644 --- a/temporalcli/commands.workflow_test.go +++ b/temporalcli/commands.workflow_test.go @@ -275,3 +275,99 @@ func (s *SharedServerSuite) testTerminateBatchWorkflow(json bool) *CommandResult } return res } + +func (s *SharedServerSuite) TestWorkflow_Cancel_SingleWorkflowSuccess() { + // Make workflow wait for cancel and then return the context's error + s.Worker.OnDevWorkflow(func(ctx workflow.Context, a any) (any, error) { + ctx.Done().Receive(ctx, nil) + return nil, ctx.Err() + }) + + // Start the workflow + run, err := s.Client.ExecuteWorkflow( + s.Context, + client.StartWorkflowOptions{TaskQueue: s.Worker.Options.TaskQueue}, + DevWorkflow, + "ignored", + ) + s.NoError(err) + + // Send cancel + res := s.Execute( + "workflow", "cancel", + "--address", s.Address(), + "-w", run.GetID(), + ) + s.NoError(res.Err) + + // Confirm workflow was cancelled + s.Error(workflow.ErrCanceled, run.Get(s.Context, nil)) +} + +func (s *SharedServerSuite) TestWorkflow_Cancel_BatchWorkflowSuccess() { + res := s.testCancelBatchWorkflow(false) + s.Contains(res.Stdout.String(), "approximately 5 workflow(s)") + s.Contains(res.Stdout.String(), "Started batch") +} + +func (s *SharedServerSuite) TestWorkflow_Cancel_BatchWorkflowSuccessJSON() { + res := s.testCancelBatchWorkflow(true) + var jsonRes map[string]any + s.NoError(json.Unmarshal(res.Stdout.Bytes(), &jsonRes)) + s.NotEmpty(jsonRes["batchJobId"]) +} + +func (s *SharedServerSuite) testCancelBatchWorkflow(json bool) *CommandResult { + // Make workflow wait for cancel and then return the context's error + s.Worker.OnDevWorkflow(func(ctx workflow.Context, a any) (any, error) { + ctx.Done().Receive(ctx, nil) + return nil, ctx.Err() + }) + + // Start 5 workflows + runs := make([]client.WorkflowRun, 5) + searchAttr := "keyword-" + uuid.NewString() + for i := range runs { + run, err := s.Client.ExecuteWorkflow( + s.Context, + client.StartWorkflowOptions{ + TaskQueue: s.Worker.Options.TaskQueue, + SearchAttributes: map[string]any{"CustomKeywordField": searchAttr}, + }, + DevWorkflow, + "ignored", + ) + s.NoError(err) + runs[i] = run + } + + // Wait for all to appear in list + s.Eventually(func() bool { + resp, err := s.Client.ListWorkflow(s.Context, &workflowservice.ListWorkflowExecutionsRequest{ + Query: "CustomKeywordField = '" + searchAttr + "'", + }) + s.NoError(err) + return len(resp.Executions) == len(runs) + }, 3*time.Second, 100*time.Millisecond) + + // Send batch cancel with a "y" for non-json or "--yes" for json + args := []string{ + "workflow", "cancel", + "--address", s.Address(), + "--query", "CustomKeywordField = '" + searchAttr + "'", + "--reason", "cancellation-test", + } + if json { + args = append(args, "--yes", "-o", "json") + } else { + s.CommandHarness.Stdin.WriteString("y\n") + } + res := s.Execute(args...) + s.NoError(res.Err) + + // Confirm that all workflows fail with ErrCanceled + for _, run := range runs { + s.Error(workflow.ErrCanceled, run.Get(s.Context, nil)) + } + return res +} diff --git a/temporalcli/commandsmd/commands.md b/temporalcli/commandsmd/commands.md index 6df16a47..3211adec 100644 --- a/temporalcli/commandsmd/commands.md +++ b/temporalcli/commandsmd/commands.md @@ -209,7 +209,24 @@ Workflow commands use this syntax:`temporal workflow COMMAND [ARGS]`. ### temporal workflow cancel: Cancel a Workflow Execution. -TODO +The `temporal workflow cancel` command is used to cancel a [Workflow Execution](/concepts/what-is-a-workflow-execution). Canceling a running Workflow Execution records a `WorkflowExecutionCancelRequested` event in the Event History. A new Command Task will be scheduled, and the Workflow Execution will perform cleanup work. + +Executions may be cancelled by [ID](/concepts/what-is-a-workflow-id): +``` +temporal workflow cancel --workflow-id MyWorkflowId +``` + +...or in bulk via a visibility query [list filter](/concepts/what-is-a-list-filter): + +``` +temporal workflow cancel --query=MyQuery +``` + +Use the options listed below to change the behavior of this command. + +#### Options + +Includes options set for [single workflow or batch](#options-set-single-workflow-or-batch) ### temporal workflow count: Count Workflow Executions. From 904906fbfb287ef001c9480b82ed46beb5155c7e Mon Sep 17 00:00:00 2001 From: Tim Deeb-Swihart Date: Wed, 7 Feb 2024 11:28:20 -0800 Subject: [PATCH 2/2] Update single-workflow-or-batch message to be generic across batch ops --- temporalcli/commands.gen.go | 6 +++--- temporalcli/commandsmd/commands.md | 3 +-- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/temporalcli/commands.gen.go b/temporalcli/commands.gen.go index fc5c93a4..347ce01d 100644 --- a/temporalcli/commands.gen.go +++ b/temporalcli/commands.gen.go @@ -388,9 +388,9 @@ func NewTemporalWorkflowCancelCommand(cctx *CommandContext, parent *TemporalWork s.Command.Use = "cancel [flags]" s.Command.Short = "Cancel a Workflow Execution." if hasHighlighting { - s.Command.Long = "The \x1b[1mtemporal workflow cancel\x1b[0m command is used to cancel a Workflow Execution. Canceling a running Workflow Execution records a \x1b[1mWorkflowExecutionCancelRequested\x1b[0m event in the Event History. A new Command Task will be scheduled, and the Workflow Execution will perform cleanup work.\n\nExecutions may be cancelled by ID:\n\x1b[1mtemporal workflow cancel --workflow-id MyWorkflowId\x1b[0m\n\n...or in bulk via a visibility query list filter:\n\n\x1b[1mtemporal workflow cancel --query=MyQuery\x1b[0m\n\nUse the options listed below to change the behavior of this command." + s.Command.Long = "The \x1b[1mtemporal workflow cancel\x1b[0m command is used to cancel a Workflow Execution. Canceling a running Workflow Execution records a \x1b[1mWorkflowExecutionCancelRequested\x1b[0m event in the Event History. A new Command Task will be scheduled, and the Workflow Execution will perform cleanup work.\n\nExecutions may be cancelled by ID:\n\x1b[1mtemporal workflow cancel --workflow-id MyWorkflowId\x1b[0m\n\n...or in bulk via a visibility query list filter:\n\x1b[1mtemporal workflow cancel --query=MyQuery\x1b[0m\n\nUse the options listed below to change the behavior of this command." } else { - s.Command.Long = "The `temporal workflow cancel` command is used to cancel a Workflow Execution. Canceling a running Workflow Execution records a `WorkflowExecutionCancelRequested` event in the Event History. A new Command Task will be scheduled, and the Workflow Execution will perform cleanup work.\n\nExecutions may be cancelled by ID:\n```\ntemporal workflow cancel --workflow-id MyWorkflowId\n```\n\n...or in bulk via a visibility query list filter:\n\n```\ntemporal workflow cancel --query=MyQuery\n```\n\nUse the options listed below to change the behavior of this command." + s.Command.Long = "The `temporal workflow cancel` command is used to cancel a Workflow Execution. Canceling a running Workflow Execution records a `WorkflowExecutionCancelRequested` event in the Event History. A new Command Task will be scheduled, and the Workflow Execution will perform cleanup work.\n\nExecutions may be cancelled by ID:\n```\ntemporal workflow cancel --workflow-id MyWorkflowId\n```\n\n...or in bulk via a visibility query list filter:\n```\ntemporal workflow cancel --query=MyQuery\n```\n\nUse the options listed below to change the behavior of this command." } s.Command.Args = cobra.NoArgs s.SingleWorkflowOrBatchOptions.buildFlags(cctx, s.Command.Flags()) @@ -653,7 +653,7 @@ type SingleWorkflowOrBatchOptions struct { func (v *SingleWorkflowOrBatchOptions) buildFlags(cctx *CommandContext, f *pflag.FlagSet) { f.StringVarP(&v.WorkflowId, "workflow-id", "w", "", "Workflow Id. Either this or query must be set.") f.StringVarP(&v.RunId, "run-id", "r", "", "Run Id. Cannot be set when query is set.") - f.StringVarP(&v.Query, "query", "q", "", "Start a batch to Signal Workflow Executions with given List Filter. Either this or Workflow Id must be set.") + f.StringVarP(&v.Query, "query", "q", "", "Start a batch to operate on Workflow Executions with given List Filter. Either this or Workflow Id must be set.") f.StringVar(&v.Reason, "reason", "", "Reason to perform batch. Only allowed if query is present unless the command specifies otherwise. Defaults to message with the current user's name.") f.BoolVarP(&v.Yes, "yes", "y", false, "Confirm prompt to perform batch. Only allowed if query is present.") } diff --git a/temporalcli/commandsmd/commands.md b/temporalcli/commandsmd/commands.md index 3211adec..7367b86c 100644 --- a/temporalcli/commandsmd/commands.md +++ b/temporalcli/commandsmd/commands.md @@ -217,7 +217,6 @@ temporal workflow cancel --workflow-id MyWorkflowId ``` ...or in bulk via a visibility query [list filter](/concepts/what-is-a-list-filter): - ``` temporal workflow cancel --query=MyQuery ``` @@ -355,7 +354,7 @@ Includes options set for [payload input](#options-set-for-payload-input). * `--workflow-id`, `-w` (string) - Workflow Id. Either this or query must be set. * `--run-id`, `-r` (string) - Run Id. Cannot be set when query is set. -* `--query`, `-q` (string) - Start a batch to Signal Workflow Executions with given List Filter. Either this or +* `--query`, `-q` (string) - Start a batch to operate on Workflow Executions with given List Filter. Either this or Workflow Id must be set. * `--reason` (string) - Reason to perform batch. Only allowed if query is present unless the command specifies otherwise. Defaults to message with the current user's name. * `--yes`, `-y` (bool) - Confirm prompt to perform batch. Only allowed if query is present.