diff --git a/app/app_test.go b/app/app_test.go index 660de347..abcee95b 100644 --- a/app/app_test.go +++ b/app/app_test.go @@ -20,7 +20,6 @@ import ( sconfig "github.com/temporalio/cli/server/config" "github.com/urfave/cli/v2" commonpb "go.temporal.io/api/common/v1" - "go.temporal.io/api/enums/v1" enumspb "go.temporal.io/api/enums/v1" historypb "go.temporal.io/api/history/v1" "go.temporal.io/api/operatorservice/v1" @@ -331,7 +330,7 @@ func assertServerHealth(ctx context.Context, t *testing.T, opts sdkclient.Option t.Error(ctx.Err()) break } - resp, err := c.DescribeTaskQueue(ctx, "temporal-sys-tq-scanner-taskqueue-0", enums.TASK_QUEUE_TYPE_WORKFLOW) + resp, err := c.DescribeTaskQueue(ctx, "temporal-sys-tq-scanner-taskqueue-0", enumspb.TASK_QUEUE_TYPE_WORKFLOW) if err != nil { t.Error(err) } diff --git a/app/batch_test.go b/app/batch_test.go index cb59854b..e6a2fd31 100644 --- a/app/batch_test.go +++ b/app/batch_test.go @@ -2,7 +2,6 @@ package app_test import ( "github.com/golang/mock/gomock" - "github.com/stretchr/testify/mock" "go.temporal.io/api/workflowservice/v1" ) @@ -30,27 +29,3 @@ func (s *cliAppSuite) TestStopBatchJob() { s.Nil(err) s.sdkClient.AssertExpectations(s.T()) } - -func (s *cliAppSuite) TestStartBatchJob_Signal() { - s.sdkClient.On("CountWorkflow", mock.Anything, mock.Anything).Return(&workflowservice.CountWorkflowExecutionsResponse{Count: 5}, nil).Once() - s.frontendClient.EXPECT().StartBatchOperation(gomock.Any(), gomock.Any()).Return(&workflowservice.StartBatchOperationResponse{}, nil).Times(1) - err := s.app.Run([]string{"", "workflow", "signal", "--name", "test-signal", "--query", "WorkflowType='test-type'", "--reason", "test-reason", "--input", "test-input", "--yes"}) - s.Nil(err) - s.sdkClient.AssertExpectations(s.T()) -} - -func (s *cliAppSuite) TestStartBatchJob_Terminate() { - s.sdkClient.On("CountWorkflow", mock.Anything, mock.Anything).Return(&workflowservice.CountWorkflowExecutionsResponse{Count: 5}, nil).Once() - s.frontendClient.EXPECT().StartBatchOperation(gomock.Any(), gomock.Any()).Return(&workflowservice.StartBatchOperationResponse{}, nil).Times(1) - err := s.app.Run([]string{"", "workflow", "terminate", "--query", "WorkflowType='test-type'", "--reason", "test-reason", "--yes"}) - s.Nil(err) - s.sdkClient.AssertExpectations(s.T()) -} - -func (s *cliAppSuite) TestStartBatchJob_Cancel() { - s.sdkClient.On("CountWorkflow", mock.Anything, mock.Anything).Return(&workflowservice.CountWorkflowExecutionsResponse{Count: 5}, nil).Once() - s.frontendClient.EXPECT().StartBatchOperation(gomock.Any(), gomock.Any()).Return(&workflowservice.StartBatchOperationResponse{}, nil).Times(1) - err := s.app.Run([]string{"", "workflow", "cancel", "--query", "WorkflowType='test-type'", "--reason", "test-reason", "--yes"}) - s.Nil(err) - s.sdkClient.AssertExpectations(s.T()) -} diff --git a/batch/batch_commands.go b/batch/batch_commands.go index 13fe7baa..0bd298c0 100644 --- a/batch/batch_commands.go +++ b/batch/batch_commands.go @@ -24,10 +24,10 @@ func DescribeBatchJob(c *cli.Context) error { } jobID := c.String(common.FlagJobID) - client := cliclient.Factory(c.App).FrontendClient(c) + fclient := cliclient.Factory(c.App).FrontendClient(c) ctx, cancel := common.NewContext(c) defer cancel() - resp, err := client.DescribeBatchOperation(ctx, &workflowservice.DescribeBatchOperationRequest{ + resp, err := fclient.DescribeBatchOperation(ctx, &workflowservice.DescribeBatchOperationRequest{ Namespace: namespace, JobId: jobID, }) @@ -49,7 +49,7 @@ func ListBatchJobs(c *cli.Context) error { if err != nil { return err } - client := cliclient.Factory(c.App).FrontendClient(c) + fclient := cliclient.Factory(c.App).FrontendClient(c) paginationFunc := func(npt []byte) ([]interface{}, []byte, error) { var items []interface{} @@ -57,7 +57,7 @@ func ListBatchJobs(c *cli.Context) error { ctx, cancel := common.NewContext(c) defer cancel() - resp, err := client.ListBatchOperations(ctx, &workflowservice.ListBatchOperationsRequest{ + resp, err := fclient.ListBatchOperations(ctx, &workflowservice.ListBatchOperationsRequest{ Namespace: namespace, }) @@ -170,10 +170,10 @@ func startBatchJob(c *cli.Context, req *workflowservice.StartBatchOperationReque req.VisibilityQuery = query req.Reason = reason - client := cliclient.Factory(c.App).FrontendClient(c) ctx, cancel := common.NewContext(c) defer cancel() - _, err = client.StartBatchOperation(ctx, req) + + _, err = sdk.WorkflowService().StartBatchOperation(ctx, req) if err != nil { return fmt.Errorf("unable to start batch job: %w", err) } diff --git a/tests/workflow_test.go b/tests/workflow_test.go index 8723c988..97979635 100644 --- a/tests/workflow_test.go +++ b/tests/workflow_test.go @@ -6,16 +6,21 @@ import ( "math/rand" "os" "strconv" + "time" "github.com/pborman/uuid" + "github.com/temporalio/cli/tests/workflows/awaitsignal" "github.com/temporalio/cli/tests/workflows/helloworld" "github.com/temporalio/cli/tests/workflows/update" + "go.temporal.io/api/enums/v1" + "go.temporal.io/api/workflowservice/v1" sdkclient "go.temporal.io/sdk/client" "go.temporal.io/sdk/worker" ) const ( - testTq = "test-queue" + testTq = "test-queue" + testNamespace = "default" ) func (s *e2eSuite) TestWorkflowShow_ReplayableHistory() { @@ -94,25 +99,251 @@ func (s *e2eSuite) TestWorkflowUpdate() { defer signalWorkflow() // successful update with wait policy Completed, should show the result - err = app.Run([]string{"", "workflow", "update", "--context-timeout", "10", "--workflow-id", wfr.GetID(), "--run-id", wfr.GetRunID(), "--name", update.FetchAndAdd, "-i", strconv.Itoa(randomInt)}) + err = app.Run([]string{"", "workflow", "update", "--context-timeout", "30", "--workflow-id", wfr.GetID(), "--run-id", wfr.GetRunID(), "--name", update.FetchAndAdd, "-i", strconv.Itoa(randomInt)}) s.NoError(err) want := fmt.Sprintf(": %v", randomInt) s.Contains(writer.GetContent(), want) // successful update with wait policy Completed, passing first-execution-run-id - err = app.Run([]string{"", "workflow", "update", "--context-timeout", "10", "--workflow-id", wfr.GetID(), "--run-id", wfr.GetRunID(), "--name", update.FetchAndAdd, "-i", "1", "--first-execution-run-id", wfr.GetRunID()}) + err = app.Run([]string{"", "workflow", "update", "--context-timeout", "30", "--workflow-id", wfr.GetID(), "--run-id", wfr.GetRunID(), "--name", update.FetchAndAdd, "-i", "1", "--first-execution-run-id", wfr.GetRunID()}) s.NoError(err) // update rejected, when name is not available - err = app.Run([]string{"", "workflow", "update", "--context-timeout", "10", "--workflow-id", "non-existent-ID", "--run-id", wfr.GetRunID(), "-i", "1"}) + err = app.Run([]string{"", "workflow", "update", "--context-timeout", "30", "--workflow-id", "non-existent-ID", "--run-id", wfr.GetRunID(), "-i", "1"}) s.ErrorContains(err, "Required flag \"name\" not set") // update rejected, wrong workflowID - err = app.Run([]string{"", "workflow", "update", "--context-timeout", "10", "--workflow-id", "non-existent-ID", "--run-id", wfr.GetRunID(), "--name", update.FetchAndAdd, "-i", "1"}) + err = app.Run([]string{"", "workflow", "update", "--context-timeout", "30", "--workflow-id", "non-existent-ID", "--run-id", wfr.GetRunID(), "--name", update.FetchAndAdd, "-i", "1"}) s.ErrorContains(err, "update workflow failed") // update rejected, wrong update name - err = app.Run([]string{"", "workflow", "update", "--context-timeout", "10", "--workflow-id", wfr.GetID(), "--run-id", wfr.GetRunID(), "--name", "non-existent-name", "-i", "1"}) + err = app.Run([]string{"", "workflow", "update", "--context-timeout", "30", "--workflow-id", wfr.GetID(), "--run-id", wfr.GetRunID(), "--name", "non-existent-name", "-i", "1"}) s.ErrorContains(err, "update workflow failed: unknown update") +} + +func (s *e2eSuite) TestWorkflowCancel_Batch() { + s.T().Parallel() + + testserver, app, _ := s.setUpTestEnvironment() + defer func() { + _ = testserver.Stop() + }() + + w := s.newWorker(testserver, testTq, func(r worker.Registry) { + r.RegisterWorkflow(awaitsignal.Workflow) + }) + defer w.Stop() + + c := testserver.Client() + + ids := []string{"1", "2", "3"} + for _, id := range ids { + _, err := c.ExecuteWorkflow( + context.Background(), + sdkclient.StartWorkflowOptions{ID: id, TaskQueue: testTq}, + awaitsignal.Workflow, + ) + s.NoError(err) + } + + err := app.Run([]string{"", "workflow", "cancel", "--query", "WorkflowId = '1' OR WorkflowId = '2'", "--reason", "test", "--yes", "--namespace", testNamespace}) + s.NoError(err) + + awaitTaskQueuePoller(s, c, testTq) + awaitBatchJob(s, c, testNamespace) + + s.Eventually(func() bool { + w1 := c.GetWorkflowHistory(context.Background(), "1", "", false, enums.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT) + if expected := checkForEventType(w1, enums.EVENT_TYPE_WORKFLOW_EXECUTION_CANCEL_REQUESTED); !expected { + return false + } + + w2 := c.GetWorkflowHistory(context.Background(), "2", "", false, enums.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT) + if expected := checkForEventType(w2, enums.EVENT_TYPE_WORKFLOW_EXECUTION_CANCEL_REQUESTED); !expected { + return false + } + + w3 := c.GetWorkflowHistory(context.Background(), "3", "", false, enums.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT) + if expected := !checkForEventType(w3, enums.EVENT_TYPE_WORKFLOW_EXECUTION_CANCEL_REQUESTED); !expected { + return false + } + + return true + }, 10*time.Second, time.Second, "timed out awaiting for workflows cancellation") +} + +func (s *e2eSuite) TestWorkflowSignal_Batch() { + s.T().Parallel() + + testserver, app, _ := s.setUpTestEnvironment() + defer func() { + _ = testserver.Stop() + }() + + w := s.newWorker(testserver, testTq, func(r worker.Registry) { + r.RegisterWorkflow(awaitsignal.Workflow) + }) + defer w.Stop() + + c := testserver.Client() + + ids := []string{"1", "2", "3"} + for _, id := range ids { + _, err := c.ExecuteWorkflow( + context.Background(), + sdkclient.StartWorkflowOptions{ID: id, TaskQueue: testTq}, + awaitsignal.Workflow, + ) + s.NoError(err) + } + + s.Eventually(func() bool { + wfs, err := c.ListWorkflow(context.Background(), &workflowservice.ListWorkflowExecutionsRequest{ + Namespace: testNamespace, + }) + if err != nil { + return false + } + return len(wfs.GetExecutions()) == 3 + }, 10*time.Second, time.Second) + err := app.Run([]string{"", "workflow", "signal", "--name", awaitsignal.Done, "--query", "WorkflowId = '1' OR WorkflowId = '2'", "--reason", "test", "--yes", "--namespace", testNamespace}) + s.NoError(err) + + awaitTaskQueuePoller(s, c, testTq) + awaitBatchJob(s, c, testNamespace) + + s.Eventually(func() bool { + wfs, err := c.ListWorkflow(context.Background(), &workflowservice.ListWorkflowExecutionsRequest{ + Namespace: testNamespace, + }) + s.NoError(err) + + for _, wf := range wfs.GetExecutions() { + switch wf.GetExecution().GetWorkflowId() { + case "1", "2": + if wf.GetStatus() != enums.WORKFLOW_EXECUTION_STATUS_COMPLETED { + return false + } + case "3": + if wf.GetStatus() != enums.WORKFLOW_EXECUTION_STATUS_RUNNING { + return false + } + } + } + + return true + }, 3*time.Second, time.Second, "timed out awaiting for workflows completion after signal") +} + +func (s *e2eSuite) TestWorkflowTerminate_Batch() { + s.T().Parallel() + + testserver, app, _ := s.setUpTestEnvironment() + defer func() { + _ = testserver.Stop() + }() + + w := s.newWorker(testserver, testTq, func(r worker.Registry) { + r.RegisterWorkflow(awaitsignal.Workflow) + }) + defer w.Stop() + + c := testserver.Client() + + ids := []string{"1", "2", "3"} + for _, id := range ids { + _, err := c.ExecuteWorkflow( + context.Background(), + sdkclient.StartWorkflowOptions{ID: id, TaskQueue: testTq}, + awaitsignal.Workflow, + ) + s.NoError(err) + } + + s.Eventually(func() bool { + wfs, err := c.ListWorkflow(context.Background(), &workflowservice.ListWorkflowExecutionsRequest{ + Namespace: testNamespace, + }) + if err != nil { + return false + } + return len(wfs.GetExecutions()) == 3 + }, 10*time.Second, time.Second) + + err := app.Run([]string{"", "workflow", "terminate", "--query", "WorkflowId = '1' OR WorkflowId = '2'", "--reason", "test", "--yes", "--namespace", testNamespace}) + s.NoError(err) + + awaitTaskQueuePoller(s, c, testTq) + awaitBatchJob(s, c, testNamespace) + + s.Eventually(func() bool { + wfs, err := c.ListWorkflow(context.Background(), &workflowservice.ListWorkflowExecutionsRequest{ + Namespace: testNamespace, + }) + s.NoError(err) + + for _, wf := range wfs.GetExecutions() { + switch wf.GetExecution().GetWorkflowId() { + case "1", "2": + if wf.GetStatus() != enums.WORKFLOW_EXECUTION_STATUS_TERMINATED { + return false + } + case "3": + if wf.GetStatus() != enums.WORKFLOW_EXECUTION_STATUS_RUNNING { + return false + } + } + } + + return true + }, 10*time.Second, time.Second, "timed out awaiting for workflows termination") +} + +// awaitTaskQueuePoller used mostly for more explicit failure message +func awaitTaskQueuePoller(s *e2eSuite, c sdkclient.Client, taskqueue string) { + s.Eventually(func() bool { + resp, err := c.DescribeTaskQueue(context.Background(), taskqueue, enums.TASK_QUEUE_TYPE_WORKFLOW) + if err != nil { + return false + } + return len(resp.GetPollers()) > 0 + }, 10*time.Second, time.Second, "no worker started for taskqueue "+taskqueue) +} + +func awaitBatchJob(s *e2eSuite, c sdkclient.Client, ns string) { + s.Eventually(func() bool { + resp, err := c.WorkflowService().ListBatchOperations(context.Background(), + &workflowservice.ListBatchOperationsRequest{Namespace: ns}) + if err != nil { + return false + } + if len(resp.OperationInfo) == 0 { + return false + } + + batchJob, err := c.WorkflowService().DescribeBatchOperation(context.Background(), + &workflowservice.DescribeBatchOperationRequest{ + JobId: resp.OperationInfo[0].JobId, + Namespace: ns, + }) + if err != nil { + return false + } + + return batchJob.State == enums.BATCH_OPERATION_STATE_COMPLETED + }, 10*time.Second, time.Second, "cancellation batch job timed out") +} + +func checkForEventType(events sdkclient.HistoryEventIterator, eventType enums.EventType) bool { + for events.HasNext() { + event, err := events.Next() + if err != nil { + break + } + if event.GetEventType() == eventType { + return true + } + } + return false } diff --git a/tests/workflows/awaitsignal/awaitsignal.go b/tests/workflows/awaitsignal/awaitsignal.go new file mode 100644 index 00000000..008d0c01 --- /dev/null +++ b/tests/workflows/awaitsignal/awaitsignal.go @@ -0,0 +1,14 @@ +package awaitsignal + +import ( + "go.temporal.io/sdk/workflow" +) + +const ( + Done = "done" +) + +func Workflow(ctx workflow.Context) error { + _ = workflow.GetSignalChannel(ctx, Done).Receive(ctx, nil) + return nil +} diff --git a/workflow/workflow_commands.go b/workflow/workflow_commands.go index fdaa32db..ecd6f451 100644 --- a/workflow/workflow_commands.go +++ b/workflow/workflow_commands.go @@ -15,7 +15,7 @@ import ( "github.com/olekukonko/tablewriter" "github.com/pborman/uuid" "github.com/temporalio/cli/batch" - cliclient "github.com/temporalio/cli/client" + "github.com/temporalio/cli/client" "github.com/temporalio/cli/common" "github.com/temporalio/cli/common/stringify" "github.com/temporalio/cli/dataconverter" @@ -80,7 +80,7 @@ func StartWorkflowBaseArgs(c *cli.Context) ( // StartWorkflow starts a new workflow execution and optionally prints progress func StartWorkflow(c *cli.Context, printProgress bool) error { - sdkClient, err := cliclient.GetSDKClient(c) + sdkClient, err := client.GetSDKClient(c) if err != nil { return err } @@ -281,7 +281,7 @@ func printWorkflowProgress(c *cli.Context, wid, rid string, watch bool) error { } var maxFieldLength = c.Int(common.FlagMaxFieldLength) - sdkClient, err := cliclient.GetSDKClient(c) + sdkClient, err := client.GetSDKClient(c) if err != nil { return err } @@ -371,17 +371,18 @@ func printReplayableHistory(c *cli.Context, iter iterator.Iterator[*historypb.Hi return nil } +// TerminateWorkflow terminates workflow executions based on filter parameters func TerminateWorkflow(c *cli.Context) error { if c.String(common.FlagQuery) != "" { return batch.BatchTerminate(c) } else { - return terminateWorkflow(c) + return terminateWorkflowByID(c) } } -// TerminateWorkflow terminates a workflow execution -func terminateWorkflow(c *cli.Context) error { - sdkClient, err := cliclient.GetSDKClient(c) +// terminateWorkflowByID terminates a single workflow execution +func terminateWorkflowByID(c *cli.Context) error { + sdkClient, err := client.GetSDKClient(c) if err != nil { return err } @@ -414,10 +415,10 @@ func DeleteWorkflow(c *cli.Context) error { wid := c.String(common.FlagWorkflowID) rid := c.String(common.FlagRunID) - client := cliclient.Factory(c.App).FrontendClient(c) + fclient := client.Factory(c.App).FrontendClient(c) ctx, cancel := common.NewContext(c) defer cancel() - _, err = client.DeleteWorkflowExecution(ctx, &workflowservice.DeleteWorkflowExecutionRequest{ + _, err = fclient.DeleteWorkflowExecution(ctx, &workflowservice.DeleteWorkflowExecutionRequest{ Namespace: nsName, WorkflowExecution: &commonpb.WorkflowExecution{ WorkflowId: wid, @@ -434,17 +435,18 @@ func DeleteWorkflow(c *cli.Context) error { return nil } +// CancelWorkflow cancels workflow executions based on filter parameters func CancelWorkflow(c *cli.Context) error { if c.String(common.FlagQuery) != "" { return batch.BatchCancel(c) } else { - return cancelWorkflow(c) + return cancelWorkflowByID(c) } } -// cancelWorkflow cancels a workflow execution -func cancelWorkflow(c *cli.Context) error { - sdkClient, err := cliclient.GetSDKClient(c) +// cancelWorkflowByID cancels a single workflow execution +func cancelWorkflowByID(c *cli.Context) error { + sdkClient, err := client.GetSDKClient(c) if err != nil { return err } @@ -466,17 +468,18 @@ func cancelWorkflow(c *cli.Context) error { return nil } +// SignalWorkflow signals workflow executions based on filter parameters func SignalWorkflow(c *cli.Context) error { if c.String(common.FlagQuery) != "" { return batch.BatchSignal(c) } else { - return signalWorkflow(c) + return signalWorkflowByID(c) } } -// signalWorkflow signals a workflow execution -func signalWorkflow(c *cli.Context) error { - serviceClient := cliclient.Factory(c.App).FrontendClient(c) +// signalWorkflowByID signals a single workflow execution +func signalWorkflowByID(c *cli.Context) error { + serviceClient := client.Factory(c.App).FrontendClient(c) namespace, err := common.RequiredFlag(c, common.FlagNamespace) if err != nil { @@ -533,7 +536,7 @@ func QueryWorkflowUsingStackTrace(c *cli.Context) error { } func queryWorkflowHelper(c *cli.Context, queryType string) error { - serviceClient := cliclient.Factory(c.App).FrontendClient(c) + fclient := client.Factory(c.App).FrontendClient(c) namespace, err := common.RequiredFlag(c, common.FlagNamespace) if err != nil { @@ -573,7 +576,7 @@ func queryWorkflowHelper(c *cli.Context, queryType string) error { } queryRequest.QueryRejectCondition = rejectCondition } - queryResponse, err := serviceClient.QueryWorkflow(tcCtx, queryRequest) + queryResponse, err := fclient.QueryWorkflow(tcCtx, queryRequest) if err != nil { return fmt.Errorf("query workflow failed: %w", err) } @@ -592,7 +595,7 @@ func queryWorkflowHelper(c *cli.Context, queryType string) error { func ListWorkflow(c *cli.Context) error { archived := c.Bool(common.FlagArchive) - sdkClient, err := cliclient.GetSDKClient(c) + sdkClient, err := client.GetSDKClient(c) if err != nil { return err } @@ -626,7 +629,7 @@ func ListWorkflow(c *cli.Context) error { // CountWorkflow count number of workflows func CountWorkflow(c *cli.Context) error { - sdkClient, err := cliclient.GetSDKClient(c) + sdkClient, err := client.GetSDKClient(c) if err != nil { return err } @@ -660,7 +663,7 @@ func DescribeWorkflow(c *cli.Context) error { wid := c.String(common.FlagWorkflowID) rid := c.String(common.FlagRunID) - frontendClient := cliclient.Factory(c.App).FrontendClient(c) + frontendClient := client.Factory(c.App).FrontendClient(c) namespace, err := common.RequiredFlag(c, common.FlagNamespace) if err != nil { return err @@ -865,7 +868,7 @@ func ResetWorkflow(c *cli.Context) error { ctx, cancel := common.NewContext(c) defer cancel() - frontendClient := cliclient.Factory(c.App).FrontendClient(c) + frontendClient := client.Factory(c.App).FrontendClient(c) resetBaseRunID := rid workflowTaskFinishID := eventID @@ -1046,7 +1049,7 @@ func ResetInBatch(c *cli.Context) error { } } } else { - sdkClient, err := cliclient.GetSDKClient(c) + sdkClient, err := client.GetSDKClient(c) if err != nil { return err } @@ -1101,7 +1104,7 @@ func doReset(c *cli.Context, namespace, wid, rid string, params batchResetParams ctx, cancel := common.NewContext(c) defer cancel() - frontendClient := cliclient.Factory(c.App).FrontendClient(c) + frontendClient := client.Factory(c.App).FrontendClient(c) resp, err := frontendClient.DescribeWorkflowExecution(ctx, &workflowservice.DescribeWorkflowExecutionRequest{ Namespace: namespace, Execution: &commonpb.WorkflowExecution{ @@ -1468,7 +1471,7 @@ func UpdateWorkflow(c *cli.Context) error { func updateWorkflowHelper(c *cli.Context, request *sdkclient.UpdateWorkflowWithOptionsRequest) error { ctx, cancel := common.NewContext(c) defer cancel() - sdk, err := cliclient.GetSDKClient(c) + sdk, err := client.GetSDKClient(c) if err != nil { return err }