Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement workflow cancel command #435

Merged
merged 2 commits into from
Feb 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions temporalcli/commands.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,7 @@ func NewTemporalWorkflowCommand(cctx *CommandContext, parent *TemporalCommand) *
type TemporalWorkflowCancelCommand struct {
Parent *TemporalWorkflowCommand
Command cobra.Command
SingleWorkflowOrBatchOptions
}

func NewTemporalWorkflowCancelCommand(cctx *CommandContext, parent *TemporalWorkflowCommand) *TemporalWorkflowCancelCommand {
Expand All @@ -386,8 +387,13 @@ func NewTemporalWorkflowCancelCommand(cctx *CommandContext, parent *TemporalWork
s.Command.DisableFlagsInUseLine = true
s.Command.Use = "cancel [flags]"
s.Command.Short = "Cancel a Workflow Execution."
s.Command.Long = "TODO"
if hasHighlighting {
s.Command.Long = "The \x1b[1mtemporal workflow cancel\x1b[0m command is used to cancel a Workflow Execution. Canceling a running Workflow Execution records a \x1b[1mWorkflowExecutionCancelRequested\x1b[0m event in the Event History. A new Command Task will be scheduled, and the Workflow Execution will perform cleanup work.\n\nExecutions may be cancelled by ID:\n\x1b[1mtemporal workflow cancel --workflow-id MyWorkflowId\x1b[0m\n\n...or in bulk via a visibility query list filter:\n\x1b[1mtemporal workflow cancel --query=MyQuery\x1b[0m\n\nUse the options listed below to change the behavior of this command."
} else {
s.Command.Long = "The `temporal workflow cancel` command is used to cancel a Workflow Execution. Canceling a running Workflow Execution records a `WorkflowExecutionCancelRequested` event in the Event History. A new Command Task will be scheduled, and the Workflow Execution will perform cleanup work.\n\nExecutions may be cancelled by ID:\n```\ntemporal workflow cancel --workflow-id MyWorkflowId\n```\n\n...or in bulk via a visibility query list filter:\n```\ntemporal workflow cancel --query=MyQuery\n```\n\nUse the options listed below to change the behavior of this command."
}
s.Command.Args = cobra.NoArgs
s.SingleWorkflowOrBatchOptions.buildFlags(cctx, s.Command.Flags())
s.Command.Run = func(c *cobra.Command, args []string) {
if err := s.run(cctx, args); err != nil {
cctx.Options.Fail(err)
Expand Down Expand Up @@ -647,7 +653,7 @@ type SingleWorkflowOrBatchOptions struct {
func (v *SingleWorkflowOrBatchOptions) buildFlags(cctx *CommandContext, f *pflag.FlagSet) {
f.StringVarP(&v.WorkflowId, "workflow-id", "w", "", "Workflow Id. Either this or query must be set.")
f.StringVarP(&v.RunId, "run-id", "r", "", "Run Id. Cannot be set when query is set.")
f.StringVarP(&v.Query, "query", "q", "", "Start a batch to Signal Workflow Executions with given List Filter. Either this or Workflow Id must be set.")
f.StringVarP(&v.Query, "query", "q", "", "Start a batch to operate on Workflow Executions with given List Filter. Either this or Workflow Id must be set.")
f.StringVar(&v.Reason, "reason", "", "Reason to perform batch. Only allowed if query is present unless the command specifies otherwise. Defaults to message with the current user's name.")
f.BoolVarP(&v.Yes, "yes", "y", false, "Confirm prompt to perform batch. Only allowed if query is present.")
}
Expand Down
33 changes: 31 additions & 2 deletions temporalcli/commands.workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,37 @@ import (
"go.temporal.io/sdk/client"
)

func (*TemporalWorkflowCancelCommand) run(*CommandContext, []string) error {
return fmt.Errorf("TODO")
func (c *TemporalWorkflowCancelCommand) run(cctx *CommandContext, args []string) error {
cl, err := c.Parent.ClientOptions.dialClient(cctx)
if err != nil {
return err
}
defer cl.Close()

exec, batchReq, err := c.workflowExecOrBatch(cctx, c.Parent.Namespace, cl, singleOrBatchOverrides{})
if err != nil {
return err
}

// Run single or batch
if exec != nil {
err = cl.CancelWorkflow(cctx, exec.WorkflowId, exec.RunId)
if err != nil {
return fmt.Errorf("failed to cancel workflow: %w", err)
}
cctx.Printer.Println("Canceled workflow")
} else if batchReq != nil {
batchReq.Operation = &workflowservice.StartBatchOperationRequest_CancellationOperation{
CancellationOperation: &batch.BatchOperationCancellation{
Identity: clientIdentity(),
},
}
if err := startBatchJob(cctx, cl, batchReq); err != nil {
return err
}
}

return nil
}

func (*TemporalWorkflowDeleteCommand) run(*CommandContext, []string) error {
Expand Down
96 changes: 96 additions & 0 deletions temporalcli/commands.workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,3 +275,99 @@ func (s *SharedServerSuite) testTerminateBatchWorkflow(json bool) *CommandResult
}
return res
}

func (s *SharedServerSuite) TestWorkflow_Cancel_SingleWorkflowSuccess() {
// Make workflow wait for cancel and then return the context's error
s.Worker.OnDevWorkflow(func(ctx workflow.Context, a any) (any, error) {
ctx.Done().Receive(ctx, nil)
return nil, ctx.Err()
})

// Start the workflow
run, err := s.Client.ExecuteWorkflow(
s.Context,
client.StartWorkflowOptions{TaskQueue: s.Worker.Options.TaskQueue},
DevWorkflow,
"ignored",
)
s.NoError(err)

// Send cancel
res := s.Execute(
"workflow", "cancel",
"--address", s.Address(),
"-w", run.GetID(),
)
s.NoError(res.Err)

// Confirm workflow was cancelled
s.Error(workflow.ErrCanceled, run.Get(s.Context, nil))
}

func (s *SharedServerSuite) TestWorkflow_Cancel_BatchWorkflowSuccess() {
res := s.testCancelBatchWorkflow(false)
s.Contains(res.Stdout.String(), "approximately 5 workflow(s)")
s.Contains(res.Stdout.String(), "Started batch")
}

func (s *SharedServerSuite) TestWorkflow_Cancel_BatchWorkflowSuccessJSON() {
res := s.testCancelBatchWorkflow(true)
var jsonRes map[string]any
s.NoError(json.Unmarshal(res.Stdout.Bytes(), &jsonRes))
s.NotEmpty(jsonRes["batchJobId"])
}

func (s *SharedServerSuite) testCancelBatchWorkflow(json bool) *CommandResult {
// Make workflow wait for cancel and then return the context's error
s.Worker.OnDevWorkflow(func(ctx workflow.Context, a any) (any, error) {
ctx.Done().Receive(ctx, nil)
return nil, ctx.Err()
})

// Start 5 workflows
runs := make([]client.WorkflowRun, 5)
searchAttr := "keyword-" + uuid.NewString()
for i := range runs {
run, err := s.Client.ExecuteWorkflow(
s.Context,
client.StartWorkflowOptions{
TaskQueue: s.Worker.Options.TaskQueue,
SearchAttributes: map[string]any{"CustomKeywordField": searchAttr},
},
DevWorkflow,
"ignored",
)
s.NoError(err)
runs[i] = run
}

// Wait for all to appear in list
s.Eventually(func() bool {
resp, err := s.Client.ListWorkflow(s.Context, &workflowservice.ListWorkflowExecutionsRequest{
Query: "CustomKeywordField = '" + searchAttr + "'",
})
s.NoError(err)
return len(resp.Executions) == len(runs)
}, 3*time.Second, 100*time.Millisecond)

// Send batch cancel with a "y" for non-json or "--yes" for json
args := []string{
"workflow", "cancel",
"--address", s.Address(),
"--query", "CustomKeywordField = '" + searchAttr + "'",
"--reason", "cancellation-test",
}
if json {
args = append(args, "--yes", "-o", "json")
} else {
s.CommandHarness.Stdin.WriteString("y\n")
}
res := s.Execute(args...)
s.NoError(res.Err)

// Confirm that all workflows fail with ErrCanceled
for _, run := range runs {
s.Error(workflow.ErrCanceled, run.Get(s.Context, nil))
}
return res
}
20 changes: 18 additions & 2 deletions temporalcli/commandsmd/commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,23 @@ Workflow commands use this syntax:`temporal workflow COMMAND [ARGS]`.

### temporal workflow cancel: Cancel a Workflow Execution.

TODO
The `temporal workflow cancel` command is used to cancel a [Workflow Execution](/concepts/what-is-a-workflow-execution). Canceling a running Workflow Execution records a `WorkflowExecutionCancelRequested` event in the Event History. A new Command Task will be scheduled, and the Workflow Execution will perform cleanup work.

Executions may be cancelled by [ID](/concepts/what-is-a-workflow-id):
```
temporal workflow cancel --workflow-id MyWorkflowId
```

...or in bulk via a visibility query [list filter](/concepts/what-is-a-list-filter):
```
temporal workflow cancel --query=MyQuery
```

Use the options listed below to change the behavior of this command.

#### Options

Includes options set for [single workflow or batch](#options-set-single-workflow-or-batch)

### temporal workflow count: Count Workflow Executions.

Expand Down Expand Up @@ -338,7 +354,7 @@ Includes options set for [payload input](#options-set-for-payload-input).

* `--workflow-id`, `-w` (string) - Workflow Id. Either this or query must be set.
* `--run-id`, `-r` (string) - Run Id. Cannot be set when query is set.
* `--query`, `-q` (string) - Start a batch to Signal Workflow Executions with given List Filter. Either this or
* `--query`, `-q` (string) - Start a batch to operate on Workflow Executions with given List Filter. Either this or
Workflow Id must be set.
* `--reason` (string) - Reason to perform batch. Only allowed if query is present unless the command specifies otherwise. Defaults to message with the current user's name.
* `--yes`, `-y` (bool) - Confirm prompt to perform batch. Only allowed if query is present.
Expand Down
Loading