Skip to content

Commit

Permalink
Create E2E tests for batch jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
feedmeapples committed May 9, 2023
1 parent 075fc0d commit bbd2306
Show file tree
Hide file tree
Showing 4 changed files with 194 additions and 54 deletions.
3 changes: 1 addition & 2 deletions app/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
sconfig "github.com/temporalio/cli/server/config"
"github.com/urfave/cli/v2"
commonpb "go.temporal.io/api/common/v1"
"go.temporal.io/api/enums/v1"
enumspb "go.temporal.io/api/enums/v1"
historypb "go.temporal.io/api/history/v1"
"go.temporal.io/api/operatorservice/v1"
Expand Down Expand Up @@ -331,7 +330,7 @@ func assertServerHealth(ctx context.Context, t *testing.T, opts sdkclient.Option
t.Error(ctx.Err())
break
}
resp, err := c.DescribeTaskQueue(ctx, "temporal-sys-tq-scanner-taskqueue-0", enums.TASK_QUEUE_TYPE_WORKFLOW)
resp, err := c.DescribeTaskQueue(ctx, "temporal-sys-tq-scanner-taskqueue-0", enumspb.TASK_QUEUE_TYPE_WORKFLOW)
if err != nil {
t.Error(err)
}
Expand Down
25 changes: 0 additions & 25 deletions app/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package app_test

import (
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/mock"
"go.temporal.io/api/workflowservice/v1"
)

Expand Down Expand Up @@ -30,27 +29,3 @@ func (s *cliAppSuite) TestStopBatchJob() {
s.Nil(err)
s.sdkClient.AssertExpectations(s.T())
}

func (s *cliAppSuite) TestStartBatchJob_Signal() {
s.sdkClient.On("CountWorkflow", mock.Anything, mock.Anything).Return(&workflowservice.CountWorkflowExecutionsResponse{Count: 5}, nil).Once()
s.frontendClient.EXPECT().StartBatchOperation(gomock.Any(), gomock.Any()).Return(&workflowservice.StartBatchOperationResponse{}, nil).Times(1)
err := s.app.Run([]string{"", "workflow", "signal", "--name", "test-signal", "--query", "WorkflowType='test-type'", "--reason", "test-reason", "--input", "test-input", "--yes"})
s.Nil(err)
s.sdkClient.AssertExpectations(s.T())
}

func (s *cliAppSuite) TestStartBatchJob_Terminate() {
s.sdkClient.On("CountWorkflow", mock.Anything, mock.Anything).Return(&workflowservice.CountWorkflowExecutionsResponse{Count: 5}, nil).Once()
s.frontendClient.EXPECT().StartBatchOperation(gomock.Any(), gomock.Any()).Return(&workflowservice.StartBatchOperationResponse{}, nil).Times(1)
err := s.app.Run([]string{"", "workflow", "terminate", "--query", "WorkflowType='test-type'", "--reason", "test-reason", "--yes"})
s.Nil(err)
s.sdkClient.AssertExpectations(s.T())
}

func (s *cliAppSuite) TestStartBatchJob_Cancel() {
s.sdkClient.On("CountWorkflow", mock.Anything, mock.Anything).Return(&workflowservice.CountWorkflowExecutionsResponse{Count: 5}, nil).Once()
s.frontendClient.EXPECT().StartBatchOperation(gomock.Any(), gomock.Any()).Return(&workflowservice.StartBatchOperationResponse{}, nil).Times(1)
err := s.app.Run([]string{"", "workflow", "cancel", "--query", "WorkflowType='test-type'", "--reason", "test-reason", "--yes"})
s.Nil(err)
s.sdkClient.AssertExpectations(s.T())
}
165 changes: 164 additions & 1 deletion tests/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,20 @@ import (
"math/rand"
"os"
"strconv"
"time"

"github.com/pborman/uuid"
"github.com/temporalio/cli/tests/workflows/helloworld"
"github.com/temporalio/cli/tests/workflows/update"
"go.temporal.io/api/enums/v1"
"go.temporal.io/api/workflowservice/v1"
sdkclient "go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
)

const (
testTq = "test-queue"
testTq = "test-queue"
testNamespace = "default"
)

func (s *e2eSuite) TestWorkflowShow_ReplayableHistory() {
Expand Down Expand Up @@ -114,5 +118,164 @@ func (s *e2eSuite) TestWorkflowUpdate() {
// update rejected, wrong update name
err = app.Run([]string{"", "workflow", "update", "--context-timeout", "10", "--workflow-id", wfr.GetID(), "--run-id", wfr.GetRunID(), "--name", "non-existent-name", "-i", "1"})
s.ErrorContains(err, "update workflow failed: unknown update")
}

func (s *e2eSuite) TestWorkflowCancel_Batch() {
s.T().Parallel()

testserver, app, _ := s.setUpTestEnvironment()
defer func() {
_ = testserver.Stop()
}()

c := testserver.Client()

ids := []string{"1", "2", "3"}

for _, id := range ids {
_, err := c.ExecuteWorkflow(
context.Background(),
sdkclient.StartWorkflowOptions{ID: id, TaskQueue: testTq},
"non-existing-workflow-type",
)
s.NoError(err)
}

err := app.Run([]string{"", "workflow", "cancel", "--query", "WorkflowId = '1' OR WorkflowId = '2'", "--reason", "test", "--yes", "--namespace", testNamespace})
s.NoError(err)

// verify the job is complete
time.Sleep(1 * time.Second)
resp, err := c.WorkflowService().ListBatchOperations(context.Background(),
&workflowservice.ListBatchOperationsRequest{Namespace: testNamespace})
s.NoError(err)
s.Equal(1, len(resp.OperationInfo))
deleteJob, err := c.WorkflowService().DescribeBatchOperation(context.Background(),
&workflowservice.DescribeBatchOperationRequest{
JobId: resp.OperationInfo[0].JobId,
Namespace: testNamespace,
})
s.NoError(err)
s.Equal(enums.BATCH_OPERATION_STATE_COMPLETED, deleteJob.State)

w1 := c.GetWorkflowHistory(context.Background(), "1", "", false, enums.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT)
s.True(checkForEventType(w1, enums.EVENT_TYPE_WORKFLOW_EXECUTION_CANCEL_REQUESTED), "Workflow 1 should have a cancellation event")

w2 := c.GetWorkflowHistory(context.Background(), "2", "", false, enums.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT)
s.True(checkForEventType(w2, enums.EVENT_TYPE_WORKFLOW_EXECUTION_CANCEL_REQUESTED), "Workflow 2 should have a cancellation event")

w3 := c.GetWorkflowHistory(context.Background(), "3", "", false, enums.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT)
s.False(checkForEventType(w3, enums.EVENT_TYPE_WORKFLOW_EXECUTION_CANCEL_REQUESTED), "Workflow 3 should not have a cancellation event")
}

func (s *e2eSuite) TestWorkflowSignal_Batch() {
s.T().Parallel()

testserver, app, _ := s.setUpTestEnvironment()
defer func() {
_ = testserver.Stop()
}()

c := testserver.Client()

ids := []string{"1", "2", "3"}

for _, id := range ids {
_, err := c.ExecuteWorkflow(
context.Background(),
sdkclient.StartWorkflowOptions{ID: id, TaskQueue: testTq},
"non-existing-workflow-type",
)
s.NoError(err)
}

err := app.Run([]string{"", "workflow", "signal", "--input", "\"testvalue\"", "--name", "test-signal", "--query", "WorkflowId = '1' OR WorkflowId = '2'", "--reason", "test", "--yes", "--namespace", testNamespace})
s.NoError(err)

// verify the job is complete
time.Sleep(1 * time.Second)
resp, err := c.WorkflowService().ListBatchOperations(context.Background(),
&workflowservice.ListBatchOperationsRequest{Namespace: testNamespace})
s.NoError(err)
s.Equal(1, len(resp.OperationInfo))
deleteJob, err := c.WorkflowService().DescribeBatchOperation(context.Background(),
&workflowservice.DescribeBatchOperationRequest{
JobId: resp.OperationInfo[0].JobId,
Namespace: testNamespace,
})
s.NoError(err)
s.Equal(enums.BATCH_OPERATION_STATE_COMPLETED, deleteJob.State)

w1 := c.GetWorkflowHistory(context.Background(), "1", "", false, enums.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT)
s.True(checkForEventType(w1, enums.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED), "Workflow 1 should have received a signal")

w2 := c.GetWorkflowHistory(context.Background(), "2", "", false, enums.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT)
s.True(checkForEventType(w2, enums.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED), "Workflow 2 should have received a signal")

w3 := c.GetWorkflowHistory(context.Background(), "3", "", false, enums.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT)
s.False(checkForEventType(w3, enums.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED), "Workflow 3 should not have received a signal")
}

func (s *e2eSuite) TestWorkflowTerminate_Batch() {
s.T().Parallel()

testserver, app, _ := s.setUpTestEnvironment()
defer func() {
_ = testserver.Stop()
}()

c := testserver.Client()

ids := []string{"1", "2", "3"}

for _, id := range ids {
_, err := c.ExecuteWorkflow(
context.Background(),
sdkclient.StartWorkflowOptions{ID: id, TaskQueue: testTq},
"non-existing-workflow-type",
)
s.NoError(err)
}

err := app.Run([]string{"", "workflow", "terminate", "--query", "WorkflowId = '1' OR WorkflowId = '2'", "--reason", "test", "--yes", "--namespace", testNamespace})
s.NoError(err)

// verify the job is complete
time.Sleep(1 * time.Second)
resp, err := c.WorkflowService().ListBatchOperations(context.Background(),
&workflowservice.ListBatchOperationsRequest{Namespace: testNamespace})
s.NoError(err)
s.Equal(1, len(resp.OperationInfo))
deleteJob, err := c.WorkflowService().DescribeBatchOperation(context.Background(),
&workflowservice.DescribeBatchOperationRequest{
JobId: resp.OperationInfo[0].JobId,
Namespace: testNamespace,
})
s.NoError(err)
s.Equal(enums.BATCH_OPERATION_STATE_COMPLETED, deleteJob.State)

w1, err := c.DescribeWorkflowExecution(context.Background(), "1", "")
s.NoError(err)
s.Equal(enums.WORKFLOW_EXECUTION_STATUS_TERMINATED, w1.GetWorkflowExecutionInfo().GetStatus())

w2, err := c.DescribeWorkflowExecution(context.Background(), "2", "")
s.NoError(err)
s.Equal(enums.WORKFLOW_EXECUTION_STATUS_TERMINATED, w2.GetWorkflowExecutionInfo().GetStatus())

w3, err := c.DescribeWorkflowExecution(context.Background(), "3", "")
s.NoError(err)
s.Equal(enums.WORKFLOW_EXECUTION_STATUS_RUNNING, w3.GetWorkflowExecutionInfo().GetStatus())
}

func checkForEventType(events sdkclient.HistoryEventIterator, eventType enums.EventType) bool {
for events.HasNext() {
event, err := events.Next()
if err != nil {
break
}
if event.GetEventType() == eventType {
return true
}
}
return false
}
Loading

0 comments on commit bbd2306

Please sign in to comment.