Skip to content

Commit

Permalink
Add batch workflow delete
Browse files Browse the repository at this point in the history
  • Loading branch information
feedmeapples committed Apr 19, 2023
1 parent 81dd6c3 commit 33953f6
Show file tree
Hide file tree
Showing 8 changed files with 287 additions and 133 deletions.
19 changes: 17 additions & 2 deletions batch/batch_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,21 @@ func BatchSignal(c *cli.Context) error {
return startBatchJob(c, &req)
}

// BatchDelete delete a list of workflows
func BatchDelete(c *cli.Context) error {
operator := common.GetCurrentUserFromEnv()

req := workflowservice.StartBatchOperationRequest{
Operation: &workflowservice.StartBatchOperationRequest_DeletionOperation{
DeletionOperation: &batch.BatchOperationDeletion{
Identity: operator,
},
},
}

return startBatchJob(c, &req)
}

// startBatchJob starts a batch job
func startBatchJob(c *cli.Context, req *workflowservice.StartBatchOperationRequest) error {
namespace, err := common.RequiredFlag(c, common.FlagNamespace)
Expand Down Expand Up @@ -169,10 +184,10 @@ func startBatchJob(c *cli.Context, req *workflowservice.StartBatchOperationReque
req.VisibilityQuery = query
req.Reason = reason

client := client.CFactory.FrontendClient(c)
ctx, cancel := common.NewContext(c)
defer cancel()
_, err = client.StartBatchOperation(ctx, req)

_, err = sdk.WorkflowService().StartBatchOperation(ctx, req)
if err != nil {
return fmt.Errorf("unable to start batch job: %w", err)
}
Expand Down
178 changes: 90 additions & 88 deletions common/defs-flags.go

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions headers/headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ const (
)

// Set by GoReleaser using ldflags
var Version = "DEV"
var Version = "0.0.0-DEV"

const (
ClientNameCLI = "temporal-cli"
Expand Down Expand Up @@ -48,7 +48,7 @@ var (
)

func Init() {
if Version == "DEV" {
if Version == "0.0.0-DEV" {
if info, ok := debug.ReadBuildInfo(); ok && info.Main.Version != "(devel)" {
Version = info.Main.Version
}
Expand Down
2 changes: 1 addition & 1 deletion tests/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestClientIntegrationSuite(t *testing.T) {

func (s *e2eSuite) SetupSuite() {
s.app = app.BuildApp()
server, err := testsuite.StartDevServer(context.Background(), testsuite.DevServerOptions{})
server, err := testsuite.StartDevServer(context.Background(), testsuite.DevServerOptions{LogLevel: "error"})
s.NoError(err)
s.ts = server
}
Expand Down
25 changes: 25 additions & 0 deletions tests/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package tests

import (
"sync"
"time"
)

// AwaitWaitGroup calls Wait on the given wait
// Returns true if the Wait() call succeeded before the timeout
// Returns false if the Wait() did not return before the timeout
func AwaitWaitGroup(wg *sync.WaitGroup, timeout time.Duration) bool {
doneC := make(chan struct{})

go func() {
wg.Wait()
close(doneC)
}()

select {
case <-doneC:
return true
case <-time.After(timeout):
return false
}
}
80 changes: 77 additions & 3 deletions tests/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,21 @@ package tests

import (
"context"
"io/ioutil"
"os"
"sync"
"time"

"github.com/pborman/uuid"
"github.com/temporalio/cli/tests/workflows/helloworld"
"go.temporal.io/api/enums/v1"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
)

const (
testTq = "test-queue"
testTq = "test-queue"
testNamespace = "default"
)

func (s *e2eSuite) TestWorkflowShow_ReplayableHistory() {
Expand Down Expand Up @@ -42,7 +46,7 @@ func (s *e2eSuite) TestWorkflowShow_ReplayableHistory() {
// save history to file
historyFile := uuid.New() + ".json"
logs := s.writer.GetContent()
err = ioutil.WriteFile(historyFile, []byte(logs), 0644)
err = os.WriteFile(historyFile, []byte(logs), 0644)
s.NoError(err)
defer os.Remove(historyFile)

Expand All @@ -51,3 +55,73 @@ func (s *e2eSuite) TestWorkflowShow_ReplayableHistory() {
err = replayer.ReplayWorkflowHistoryFromJSONFile(nil, historyFile)
s.NoError(err)
}

func (s *e2eSuite) TestWorkflowDelete_Batch() {
c := s.ts.Client()

s.NewWorker(testTq, func(r worker.Registry) {
r.RegisterWorkflow(helloworld.Workflow)
r.RegisterActivity(helloworld.Activity)
})

ids := []string{"1", "2", "3"}

for _, id := range ids {
wfr, err := c.ExecuteWorkflow(
context.Background(),
client.StartWorkflowOptions{ID: id, TaskQueue: testTq},
helloworld.Workflow,
"world",
)
s.NoError(err)

var result string
err = wfr.Get(context.Background(), &result)
s.NoError(err)
}

// start batch delete operation
err := s.app.Run([]string{"", "workflow", "delete", "--query", "WorkflowId = '1' OR WorkflowId = '2'", "--reason", "test", "--yes"})
s.NoError(err)

// wait for the delete operation to complete
var wg sync.WaitGroup
wg.Add(1)
go func() {
for {
time.Sleep(1 * time.Second)

resp, err := c.WorkflowService().ListBatchOperations(context.Background(),
&workflowservice.ListBatchOperationsRequest{Namespace: testNamespace})
s.NoError(err)

if len(resp.OperationInfo) == 0 {
continue
}

s.Equal(1, len(resp.OperationInfo)) // TODO ensure new server for each e2e test

deleteJob, err := c.WorkflowService().DescribeBatchOperation(context.Background(),
&workflowservice.DescribeBatchOperationRequest{
JobId: resp.OperationInfo[0].JobId,
Namespace: testNamespace,
})
s.NoError(err)
if deleteJob.State == enums.BATCH_OPERATION_STATE_COMPLETED {
wg.Done()
return
}
}
}()
s.True(AwaitWaitGroup(&wg, 10*time.Second))

_, err = c.DescribeWorkflowExecution(context.Background(), "1", "")
s.Error(err)

_, err = c.DescribeWorkflowExecution(context.Background(), "2", "")
s.Error(err)

w3, err := c.DescribeWorkflowExecution(context.Background(), "3", "")
s.NoError(err)
s.Equal(enums.WORKFLOW_EXECUTION_STATUS_COMPLETED, w3.WorkflowExecutionInfo.Status)
}
Loading

0 comments on commit 33953f6

Please sign in to comment.