Skip to content

Commit

Permalink
Fix QueryRejectCondition parameter in QueryWorkflowWithOptions (#1461)
Browse files Browse the repository at this point in the history
Actually pass through QueryRejectCondition on QueryWorkflowWithOptions
  • Loading branch information
Quinn-With-Two-Ns committed May 10, 2024
1 parent ded70a3 commit 81cd5dc
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 4 deletions.
9 changes: 5 additions & 4 deletions internal/internal_workflow_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -897,10 +897,11 @@ func (wc *WorkflowClient) QueryWorkflowWithOptions(ctx context.Context, request
}

result, err := wc.interceptor.QueryWorkflow(ctx, &ClientQueryWorkflowInput{
WorkflowID: request.WorkflowID,
RunID: request.RunID,
QueryType: request.QueryType,
Args: request.Args,
WorkflowID: request.WorkflowID,
RunID: request.RunID,
QueryType: request.QueryType,
Args: request.Args,
QueryRejectCondition: request.QueryRejectCondition,
})
if err != nil {
if err, ok := err.(*queryRejectedError); ok {
Expand Down
90 changes: 90 additions & 0 deletions test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1871,6 +1871,96 @@ func (ts *IntegrationTestSuite) TestCancelChildAndExecuteActivityRace() {
ts.NoError(err)
}

func (ts *IntegrationTestSuite) TestQueryWorkflowRejectNotOpen() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Start workflow
run, err := ts.client.ExecuteWorkflow(ctx, ts.startWorkflowOptions("test-workflow-query-reject-not-open"),
ts.workflows.QueryTestWorkflow)
ts.NoError(err)

// Query when the workflow is running
queryVal, err := ts.client.QueryWorkflowWithOptions(ctx, &client.QueryWorkflowWithOptionsRequest{
WorkflowID: run.GetID(),
RunID: run.GetRunID(),
QueryType: "query",
QueryRejectCondition: enumspb.QUERY_REJECT_CONDITION_NONE,
})
ts.NoError(err)
var queryRes string
ts.NoError(queryVal.QueryResult.Get(&queryRes))
ts.Equal("running", queryRes)

ts.NoError(ts.client.SignalWorkflow(ctx, run.GetID(), run.GetRunID(), "signal", false))
ts.NoError(run.Get(ctx, nil))

// Query when the workflow is completed
queryVal, err = ts.client.QueryWorkflowWithOptions(ctx, &client.QueryWorkflowWithOptionsRequest{
WorkflowID: run.GetID(),
RunID: run.GetRunID(),
QueryType: "query",
QueryRejectCondition: enumspb.QUERY_REJECT_CONDITION_NOT_COMPLETED_CLEANLY,
})
ts.NoError(err)
queryRes = ""
ts.NoError(queryVal.QueryResult.Get(&queryRes))
ts.Equal("completed", queryRes)

queryVal, err = ts.client.QueryWorkflowWithOptions(ctx, &client.QueryWorkflowWithOptionsRequest{
WorkflowID: run.GetID(),
RunID: run.GetRunID(),
QueryType: "query",
QueryRejectCondition: enumspb.QUERY_REJECT_CONDITION_NOT_OPEN,
})
ts.NoError(err)
ts.Equal(enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED, queryVal.QueryRejected.Status)
}

func (ts *IntegrationTestSuite) TestQueryWorkflowRejectNotCompleteCleanly() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Start workflow
run, err := ts.client.ExecuteWorkflow(ctx, ts.startWorkflowOptions("test-workflow-query-not-complete-cleanly"),
ts.workflows.QueryTestWorkflow)
ts.NoError(err)

// Query when the workflow is running
queryVal, err := ts.client.QueryWorkflowWithOptions(ctx, &client.QueryWorkflowWithOptionsRequest{
WorkflowID: run.GetID(),
RunID: run.GetRunID(),
QueryType: "query",
QueryRejectCondition: enumspb.QUERY_REJECT_CONDITION_NONE,
})
ts.NoError(err)
var queryRes string
ts.NoError(queryVal.QueryResult.Get(&queryRes))
ts.Equal("running", queryRes)

ts.NoError(ts.client.SignalWorkflow(ctx, run.GetID(), run.GetRunID(), "signal", true))
ts.Error(run.Get(ctx, nil))

// Query when the workflow is failed
queryVal, err = ts.client.QueryWorkflowWithOptions(ctx, &client.QueryWorkflowWithOptionsRequest{
WorkflowID: run.GetID(),
RunID: run.GetRunID(),
QueryType: "query",
QueryRejectCondition: enumspb.QUERY_REJECT_CONDITION_NOT_OPEN,
})
ts.NoError(err)
ts.Equal(enumspb.WORKFLOW_EXECUTION_STATUS_FAILED, queryVal.QueryRejected.Status)

queryVal, err = ts.client.QueryWorkflowWithOptions(ctx, &client.QueryWorkflowWithOptionsRequest{
WorkflowID: run.GetID(),
RunID: run.GetRunID(),
QueryType: "query",
QueryRejectCondition: enumspb.QUERY_REJECT_CONDITION_NOT_COMPLETED_CLEANLY,
})
ts.NoError(err)
ts.Equal(enumspb.WORKFLOW_EXECUTION_STATUS_FAILED, queryVal.QueryRejected.Status)
}

func (ts *IntegrationTestSuite) TestInterceptorCalls() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down
21 changes: 21 additions & 0 deletions test/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2450,6 +2450,26 @@ func (w *Workflows) SignalCounter(ctx workflow.Context) error {
}
}

func (w *Workflows) QueryTestWorkflow(ctx workflow.Context) error {
status := "running"
defer func() {
status = "completed"
}()
err := workflow.SetQueryHandler(ctx, "query", func() (string, error) {
return status, nil
})
if err != nil {
return err
}
signalCh := workflow.GetSignalChannel(ctx, "signal")
var fail bool
signalCh.Receive(ctx, &fail)
if fail {
return errors.New("test failure")
}
return nil
}

func (w *Workflows) PanicOnSignal(ctx workflow.Context) error {
// Wait for signal then panic
workflow.GetSignalChannel(ctx, "panic-signal").Receive(ctx, nil)
Expand Down Expand Up @@ -3039,6 +3059,7 @@ func (w *Workflows) register(worker worker.Worker) {
worker.RegisterWorkflow(w.UpdateCancelableWorkflow)
worker.RegisterWorkflow(w.UpdateHandlerRegisteredLate)
worker.RegisterWorkflow(w.LocalActivityNextRetryDelay)
worker.RegisterWorkflow(w.QueryTestWorkflow)

worker.RegisterWorkflow(w.child)
worker.RegisterWorkflow(w.childWithRetryPolicy)
Expand Down

0 comments on commit 81cd5dc

Please sign in to comment.