From 81cd5dca201a8ecf187385c1ecf6bd88eeeb98c6 Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Fri, 10 May 2024 10:33:47 -0700 Subject: [PATCH] Fix QueryRejectCondition parameter in QueryWorkflowWithOptions (#1461) Actually pass through QueryRejectCondition on QueryWorkflowWithOptions --- internal/internal_workflow_client.go | 9 +-- test/integration_test.go | 90 ++++++++++++++++++++++++++++ test/workflow_test.go | 21 +++++++ 3 files changed, 116 insertions(+), 4 deletions(-) diff --git a/internal/internal_workflow_client.go b/internal/internal_workflow_client.go index cc1322a25..63b060b4b 100644 --- a/internal/internal_workflow_client.go +++ b/internal/internal_workflow_client.go @@ -897,10 +897,11 @@ func (wc *WorkflowClient) QueryWorkflowWithOptions(ctx context.Context, request } result, err := wc.interceptor.QueryWorkflow(ctx, &ClientQueryWorkflowInput{ - WorkflowID: request.WorkflowID, - RunID: request.RunID, - QueryType: request.QueryType, - Args: request.Args, + WorkflowID: request.WorkflowID, + RunID: request.RunID, + QueryType: request.QueryType, + Args: request.Args, + QueryRejectCondition: request.QueryRejectCondition, }) if err != nil { if err, ok := err.(*queryRejectedError); ok { diff --git a/test/integration_test.go b/test/integration_test.go index 4086c61a6..bffbbed97 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -1871,6 +1871,96 @@ func (ts *IntegrationTestSuite) TestCancelChildAndExecuteActivityRace() { ts.NoError(err) } +func (ts *IntegrationTestSuite) TestQueryWorkflowRejectNotOpen() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Start workflow + run, err := ts.client.ExecuteWorkflow(ctx, ts.startWorkflowOptions("test-workflow-query-reject-not-open"), + ts.workflows.QueryTestWorkflow) + ts.NoError(err) + + // Query when the workflow is running + queryVal, err := ts.client.QueryWorkflowWithOptions(ctx, &client.QueryWorkflowWithOptionsRequest{ + WorkflowID: run.GetID(), + RunID: run.GetRunID(), + QueryType: "query", + QueryRejectCondition: enumspb.QUERY_REJECT_CONDITION_NONE, + }) + ts.NoError(err) + var queryRes string + ts.NoError(queryVal.QueryResult.Get(&queryRes)) + ts.Equal("running", queryRes) + + ts.NoError(ts.client.SignalWorkflow(ctx, run.GetID(), run.GetRunID(), "signal", false)) + ts.NoError(run.Get(ctx, nil)) + + // Query when the workflow is completed + queryVal, err = ts.client.QueryWorkflowWithOptions(ctx, &client.QueryWorkflowWithOptionsRequest{ + WorkflowID: run.GetID(), + RunID: run.GetRunID(), + QueryType: "query", + QueryRejectCondition: enumspb.QUERY_REJECT_CONDITION_NOT_COMPLETED_CLEANLY, + }) + ts.NoError(err) + queryRes = "" + ts.NoError(queryVal.QueryResult.Get(&queryRes)) + ts.Equal("completed", queryRes) + + queryVal, err = ts.client.QueryWorkflowWithOptions(ctx, &client.QueryWorkflowWithOptionsRequest{ + WorkflowID: run.GetID(), + RunID: run.GetRunID(), + QueryType: "query", + QueryRejectCondition: enumspb.QUERY_REJECT_CONDITION_NOT_OPEN, + }) + ts.NoError(err) + ts.Equal(enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED, queryVal.QueryRejected.Status) +} + +func (ts *IntegrationTestSuite) TestQueryWorkflowRejectNotCompleteCleanly() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Start workflow + run, err := ts.client.ExecuteWorkflow(ctx, ts.startWorkflowOptions("test-workflow-query-not-complete-cleanly"), + ts.workflows.QueryTestWorkflow) + ts.NoError(err) + + // Query when the workflow is running + queryVal, err := ts.client.QueryWorkflowWithOptions(ctx, &client.QueryWorkflowWithOptionsRequest{ + WorkflowID: run.GetID(), + RunID: run.GetRunID(), + QueryType: "query", + QueryRejectCondition: enumspb.QUERY_REJECT_CONDITION_NONE, + }) + ts.NoError(err) + var queryRes string + ts.NoError(queryVal.QueryResult.Get(&queryRes)) + ts.Equal("running", queryRes) + + ts.NoError(ts.client.SignalWorkflow(ctx, run.GetID(), run.GetRunID(), "signal", true)) + ts.Error(run.Get(ctx, nil)) + + // Query when the workflow is failed + queryVal, err = ts.client.QueryWorkflowWithOptions(ctx, &client.QueryWorkflowWithOptionsRequest{ + WorkflowID: run.GetID(), + RunID: run.GetRunID(), + QueryType: "query", + QueryRejectCondition: enumspb.QUERY_REJECT_CONDITION_NOT_OPEN, + }) + ts.NoError(err) + ts.Equal(enumspb.WORKFLOW_EXECUTION_STATUS_FAILED, queryVal.QueryRejected.Status) + + queryVal, err = ts.client.QueryWorkflowWithOptions(ctx, &client.QueryWorkflowWithOptionsRequest{ + WorkflowID: run.GetID(), + RunID: run.GetRunID(), + QueryType: "query", + QueryRejectCondition: enumspb.QUERY_REJECT_CONDITION_NOT_COMPLETED_CLEANLY, + }) + ts.NoError(err) + ts.Equal(enumspb.WORKFLOW_EXECUTION_STATUS_FAILED, queryVal.QueryRejected.Status) +} + func (ts *IntegrationTestSuite) TestInterceptorCalls() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/test/workflow_test.go b/test/workflow_test.go index ca1d0a54c..5050fbbf8 100644 --- a/test/workflow_test.go +++ b/test/workflow_test.go @@ -2450,6 +2450,26 @@ func (w *Workflows) SignalCounter(ctx workflow.Context) error { } } +func (w *Workflows) QueryTestWorkflow(ctx workflow.Context) error { + status := "running" + defer func() { + status = "completed" + }() + err := workflow.SetQueryHandler(ctx, "query", func() (string, error) { + return status, nil + }) + if err != nil { + return err + } + signalCh := workflow.GetSignalChannel(ctx, "signal") + var fail bool + signalCh.Receive(ctx, &fail) + if fail { + return errors.New("test failure") + } + return nil +} + func (w *Workflows) PanicOnSignal(ctx workflow.Context) error { // Wait for signal then panic workflow.GetSignalChannel(ctx, "panic-signal").Receive(ctx, nil) @@ -3039,6 +3059,7 @@ func (w *Workflows) register(worker worker.Worker) { worker.RegisterWorkflow(w.UpdateCancelableWorkflow) worker.RegisterWorkflow(w.UpdateHandlerRegisteredLate) worker.RegisterWorkflow(w.LocalActivityNextRetryDelay) + worker.RegisterWorkflow(w.QueryTestWorkflow) worker.RegisterWorkflow(w.child) worker.RegisterWorkflow(w.childWithRetryPolicy)