diff --git a/batch/batch_commands.go b/batch/batch_commands.go index 0bd298c0..fec5ef95 100644 --- a/batch/batch_commands.go +++ b/batch/batch_commands.go @@ -135,6 +135,21 @@ func BatchSignal(c *cli.Context) error { return startBatchJob(c, &req) } +// BatchDelete delete a list of workflows +func BatchDelete(c *cli.Context) error { + operator := common.GetCurrentUserFromEnv() + + req := workflowservice.StartBatchOperationRequest{ + Operation: &workflowservice.StartBatchOperationRequest_DeletionOperation{ + DeletionOperation: &batch.BatchOperationDeletion{ + Identity: operator, + }, + }, + } + + return startBatchJob(c, &req) +} + // startBatchJob starts a batch job func startBatchJob(c *cli.Context, req *workflowservice.StartBatchOperationRequest) error { namespace, err := common.RequiredFlag(c, common.FlagNamespace) diff --git a/common/defs-flags.go b/common/defs-flags.go index 72b765e2..89f680b1 100644 --- a/common/defs-flags.go +++ b/common/defs-flags.go @@ -6,7 +6,7 @@ const ( FlagAddrDefinition = "The host and port (formatted as host:port) for the Temporal Frontend Service." FlagNSAliasDefinition = "Identifies a Namespace in the Temporal Workflow." FlagMetadataDefinition = "Contains gRPC metadata to send with requests (formatted as key=value)." - FlagTLSDefinition = "Enable TLS encryption without additional options such as mTLS or client certificates" + FlagTLSDefinition = "Enable TLS encryption without additional options such as mTLS or client certificates." FlagTLSCertPathDefinition = "Path to x509 certificate." FlagTLSKeyPathDefinition = "Path to private certificate key." FlagTLSCaPathDefinition = "Path to server CA certificate." @@ -65,6 +65,7 @@ const ( FlagCancelWorkflow = "Cancel Workflow Execution with given Workflow Id." FlagWorkflowIDTerminate = "Terminate Workflow Execution with given Workflow Id." FlagQueryTerminate = "Terminate Workflow Executions with given List Filter." + FlagQueryDelete = "Delete Workflow Executions with given List Filter." FlagEventIDDefinition = "The Event Id for any Event after WorkflowTaskStarted you want to reset to (exclusive). It can be WorkflowTaskCompleted, WorkflowTaskFailed or others." FlagQueryResetBatch = "Visibility Query of Search Attributes describing the Workflow Executions to reset. See https://docs.temporal.io/docs/tctl/workflow/list#--query." FlagInputFileReset = "Input file that specifies Workflow Executions to reset. Each line contains one Workflow Id as the base Run and, optionally, a Run Id." diff --git a/tests/workflow_test.go b/tests/workflow_test.go index 97979635..53a36a63 100644 --- a/tests/workflow_test.go +++ b/tests/workflow_test.go @@ -300,6 +300,51 @@ func (s *e2eSuite) TestWorkflowTerminate_Batch() { }, 10*time.Second, time.Second, "timed out awaiting for workflows termination") } +func (s *e2eSuite) TestWorkflowDelete_Batch() { + s.T().Parallel() + + testserver, app, _ := s.setUpTestEnvironment() + defer func() { + _ = testserver.Stop() + }() + + w := s.newWorker(testserver, testTq, func(r worker.Registry) { + r.RegisterWorkflow(awaitsignal.Workflow) + }) + defer w.Stop() + + c := testserver.Client() + + ids := []string{"1", "2", "3"} + for _, id := range ids { + _, err := c.ExecuteWorkflow( + context.Background(), + sdkclient.StartWorkflowOptions{ID: id, TaskQueue: testTq}, + awaitsignal.Workflow, + ) + s.NoError(err) + } + + err := app.Run([]string{"", "workflow", "delete", "--query", "WorkflowId = '1' OR WorkflowId = '2'", "--reason", "test", "--yes", "--namespace", testNamespace}) + s.NoError(err) + + awaitTaskQueuePoller(s, c, testTq) + awaitBatchJob(s, c, testNamespace) + + s.Eventually(func() bool { + wfs, err := c.ListWorkflow(context.Background(), &workflowservice.ListWorkflowExecutionsRequest{ + Namespace: testNamespace, + }) + s.NoError(err) + + if len(wfs.GetExecutions()) == 1 && wfs.GetExecutions()[0].GetExecution().GetWorkflowId() == "3" { + return true + } + + return false + }, 10*time.Second, time.Second, "timed out awaiting for workflows termination") +} + // awaitTaskQueuePoller used mostly for more explicit failure message func awaitTaskQueuePoller(s *e2eSuite, c sdkclient.Client, taskqueue string) { s.Eventually(func() bool { diff --git a/workflow/workflow.go b/workflow/workflow.go index fc8637fe..b4ba617d 100644 --- a/workflow/workflow.go +++ b/workflow/workflow.go @@ -245,7 +245,37 @@ func NewWorkflowCommands() []*cli.Command { Name: "delete", Usage: common.DeleteWorkflowDefinition, UsageText: common.WorkflowDeleteUsageText, - Flags: common.FlagsForExecution, + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: common.FlagWorkflowID, + Aliases: common.FlagWorkflowIDAlias, + Usage: common.FlagWorkflowIDTerminate, + Category: common.CategoryMain, + }, + &cli.StringFlag{ + Name: common.FlagRunID, + Aliases: common.FlagRunIDAlias, + Usage: common.FlagRunIdDefinition, + Category: common.CategoryMain, + }, + &cli.StringFlag{ + Name: common.FlagQuery, + Aliases: common.FlagQueryAlias, + Usage: common.FlagQueryDelete, + Category: common.CategoryMain, + }, + &cli.StringFlag{ + Name: common.FlagReason, + Usage: common.FlagReasonDefinition, + Category: common.CategoryMain, + }, + &cli.BoolFlag{ + Name: common.FlagYes, + Aliases: common.FlagYesAlias, + Usage: common.FlagYesDefinition, + Category: common.CategoryMain, + }, + }, Action: func(c *cli.Context) error { return DeleteWorkflow(c) }, diff --git a/workflow/workflow_commands.go b/workflow/workflow_commands.go index ecd6f451..71890c58 100644 --- a/workflow/workflow_commands.go +++ b/workflow/workflow_commands.go @@ -375,9 +375,9 @@ func printReplayableHistory(c *cli.Context, iter iterator.Iterator[*historypb.Hi func TerminateWorkflow(c *cli.Context) error { if c.String(common.FlagQuery) != "" { return batch.BatchTerminate(c) - } else { - return terminateWorkflowByID(c) } + + return terminateWorkflowByID(c) } // terminateWorkflowByID terminates a single workflow execution @@ -406,8 +406,17 @@ func terminateWorkflowByID(c *cli.Context) error { return nil } -// DeleteWorkflow deletes a workflow execution. +// DeleteWorkflow deletes workflow executions based on filter parameters func DeleteWorkflow(c *cli.Context) error { + if c.String(common.FlagQuery) != "" { + return batch.BatchDelete(c) + } + + return deleteWorkflowByID(c) +} + +// deleteWorkflowByID deletes a single workflow execution +func deleteWorkflowByID(c *cli.Context) error { nsName, err := common.RequiredFlag(c, common.FlagNamespace) if err != nil { return err @@ -439,9 +448,9 @@ func DeleteWorkflow(c *cli.Context) error { func CancelWorkflow(c *cli.Context) error { if c.String(common.FlagQuery) != "" { return batch.BatchCancel(c) - } else { - return cancelWorkflowByID(c) } + + return cancelWorkflowByID(c) } // cancelWorkflowByID cancels a single workflow execution @@ -472,9 +481,9 @@ func cancelWorkflowByID(c *cli.Context) error { func SignalWorkflow(c *cli.Context) error { if c.String(common.FlagQuery) != "" { return batch.BatchSignal(c) - } else { - return signalWorkflowByID(c) } + + return signalWorkflowByID(c) } // signalWorkflowByID signals a single workflow execution