Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add batch workflow delete #201

Merged
merged 2 commits into from
May 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions batch/batch_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,21 @@ func BatchSignal(c *cli.Context) error {
return startBatchJob(c, &req)
}

// BatchDelete delete a list of workflows
func BatchDelete(c *cli.Context) error {
operator := common.GetCurrentUserFromEnv()

req := workflowservice.StartBatchOperationRequest{
Operation: &workflowservice.StartBatchOperationRequest_DeletionOperation{
DeletionOperation: &batch.BatchOperationDeletion{
Identity: operator,
},
},
}

return startBatchJob(c, &req)
}

// startBatchJob starts a batch job
func startBatchJob(c *cli.Context, req *workflowservice.StartBatchOperationRequest) error {
namespace, err := common.RequiredFlag(c, common.FlagNamespace)
Expand Down
3 changes: 2 additions & 1 deletion common/defs-flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ const (
FlagAddrDefinition = "The host and port (formatted as host:port) for the Temporal Frontend Service."
FlagNSAliasDefinition = "Identifies a Namespace in the Temporal Workflow."
FlagMetadataDefinition = "Contains gRPC metadata to send with requests (formatted as key=value)."
FlagTLSDefinition = "Enable TLS encryption without additional options such as mTLS or client certificates"
FlagTLSDefinition = "Enable TLS encryption without additional options such as mTLS or client certificates."
FlagTLSCertPathDefinition = "Path to x509 certificate."
FlagTLSKeyPathDefinition = "Path to private certificate key."
FlagTLSCaPathDefinition = "Path to server CA certificate."
Expand Down Expand Up @@ -65,6 +65,7 @@ const (
FlagCancelWorkflow = "Cancel Workflow Execution with given Workflow Id."
FlagWorkflowIDTerminate = "Terminate Workflow Execution with given Workflow Id."
FlagQueryTerminate = "Terminate Workflow Executions with given List Filter."
FlagQueryDelete = "Delete Workflow Executions with given List Filter."
FlagEventIDDefinition = "The Event Id for any Event after WorkflowTaskStarted you want to reset to (exclusive). It can be WorkflowTaskCompleted, WorkflowTaskFailed or others."
FlagQueryResetBatch = "Visibility Query of Search Attributes describing the Workflow Executions to reset. See https://docs.temporal.io/docs/tctl/workflow/list#--query."
FlagInputFileReset = "Input file that specifies Workflow Executions to reset. Each line contains one Workflow Id as the base Run and, optionally, a Run Id."
Expand Down
45 changes: 45 additions & 0 deletions tests/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,51 @@ func (s *e2eSuite) TestWorkflowTerminate_Batch() {
}, 10*time.Second, time.Second, "timed out awaiting for workflows termination")
}

func (s *e2eSuite) TestWorkflowDelete_Batch() {
s.T().Parallel()

testserver, app, _ := s.setUpTestEnvironment()
defer func() {
_ = testserver.Stop()
}()

w := s.newWorker(testserver, testTq, func(r worker.Registry) {
r.RegisterWorkflow(awaitsignal.Workflow)
})
defer w.Stop()

c := testserver.Client()

ids := []string{"1", "2", "3"}
for _, id := range ids {
_, err := c.ExecuteWorkflow(
context.Background(),
sdkclient.StartWorkflowOptions{ID: id, TaskQueue: testTq},
awaitsignal.Workflow,
)
s.NoError(err)
}

err := app.Run([]string{"", "workflow", "delete", "--query", "WorkflowId = '1' OR WorkflowId = '2'", "--reason", "test", "--yes", "--namespace", testNamespace})
s.NoError(err)

awaitTaskQueuePoller(s, c, testTq)
awaitBatchJob(s, c, testNamespace)

s.Eventually(func() bool {
wfs, err := c.ListWorkflow(context.Background(), &workflowservice.ListWorkflowExecutionsRequest{
Namespace: testNamespace,
})
s.NoError(err)

if len(wfs.GetExecutions()) == 1 && wfs.GetExecutions()[0].GetExecution().GetWorkflowId() == "3" {
return true
}

return false
}, 10*time.Second, time.Second, "timed out awaiting for workflows termination")
}

// awaitTaskQueuePoller used mostly for more explicit failure message
func awaitTaskQueuePoller(s *e2eSuite, c sdkclient.Client, taskqueue string) {
s.Eventually(func() bool {
Expand Down
32 changes: 31 additions & 1 deletion workflow/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,37 @@ func NewWorkflowCommands() []*cli.Command {
Name: "delete",
Usage: common.DeleteWorkflowDefinition,
UsageText: common.WorkflowDeleteUsageText,
Flags: common.FlagsForExecution,
Flags: []cli.Flag{
&cli.StringFlag{
Name: common.FlagWorkflowID,
Aliases: common.FlagWorkflowIDAlias,
Usage: common.FlagWorkflowIDTerminate,
Category: common.CategoryMain,
},
&cli.StringFlag{
Name: common.FlagRunID,
Aliases: common.FlagRunIDAlias,
Usage: common.FlagRunIdDefinition,
Category: common.CategoryMain,
},
&cli.StringFlag{
Name: common.FlagQuery,
Aliases: common.FlagQueryAlias,
Usage: common.FlagQueryDelete,
Category: common.CategoryMain,
},
&cli.StringFlag{
Name: common.FlagReason,
Usage: common.FlagReasonDefinition,
Category: common.CategoryMain,
},
&cli.BoolFlag{
Name: common.FlagYes,
Aliases: common.FlagYesAlias,
Usage: common.FlagYesDefinition,
Category: common.CategoryMain,
},
},
Action: func(c *cli.Context) error {
return DeleteWorkflow(c)
},
Expand Down
23 changes: 16 additions & 7 deletions workflow/workflow_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,9 +375,9 @@ func printReplayableHistory(c *cli.Context, iter iterator.Iterator[*historypb.Hi
func TerminateWorkflow(c *cli.Context) error {
if c.String(common.FlagQuery) != "" {
return batch.BatchTerminate(c)
} else {
return terminateWorkflowByID(c)
}

return terminateWorkflowByID(c)
}

// terminateWorkflowByID terminates a single workflow execution
Expand Down Expand Up @@ -406,8 +406,17 @@ func terminateWorkflowByID(c *cli.Context) error {
return nil
}

// DeleteWorkflow deletes a workflow execution.
// DeleteWorkflow deletes workflow executions based on filter parameters
func DeleteWorkflow(c *cli.Context) error {
if c.String(common.FlagQuery) != "" {
return batch.BatchDelete(c)
}

return deleteWorkflowByID(c)
}

// deleteWorkflowByID deletes a single workflow execution
func deleteWorkflowByID(c *cli.Context) error {
nsName, err := common.RequiredFlag(c, common.FlagNamespace)
if err != nil {
return err
Expand Down Expand Up @@ -439,9 +448,9 @@ func DeleteWorkflow(c *cli.Context) error {
func CancelWorkflow(c *cli.Context) error {
if c.String(common.FlagQuery) != "" {
return batch.BatchCancel(c)
} else {
return cancelWorkflowByID(c)
}

return cancelWorkflowByID(c)
}

// cancelWorkflowByID cancels a single workflow execution
Expand Down Expand Up @@ -472,9 +481,9 @@ func cancelWorkflowByID(c *cli.Context) error {
func SignalWorkflow(c *cli.Context) error {
if c.String(common.FlagQuery) != "" {
return batch.BatchSignal(c)
} else {
return signalWorkflowByID(c)
}

return signalWorkflowByID(c)
}

// signalWorkflowByID signals a single workflow execution
Expand Down