Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Workflow update client API refactor #1489

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,9 @@ jobs:
working-directory: test

features-test:
uses: temporalio/features/.github/workflows/go.yaml@main
uses: temporalio/features/.github/workflows/go.yaml@go-sdk-update-refactor
with:
go-repo-path: ${{github.event.pull_request.head.repo.full_name}}
version: ${{github.event.pull_request.head.ref}}
features-repo-ref: go-sdk-update-refactor
version-is-repo-ref: true
39 changes: 25 additions & 14 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,25 @@ const (
TaskReachabilityClosedWorkflows = internal.TaskReachabilityClosedWorkflows
)

// WorkflowUpdateStage indicates the stage of an update request.
// NOTE: Experimental
type WorkflowUpdateStage = internal.WorkflowUpdateStage

const (
// WorkflowUpdateStageUnspecified indicates the wait stage was not specified
// NOTE: Experimental
WorkflowUpdateStageUnspecified = internal.WorkflowUpdateStageUnspecified
// WorkflowUpdateStageAdmitted indicates the update is admitted
// NOTE: Experimental
WorkflowUpdateStageAdmitted = internal.WorkflowUpdateStageAdmitted
// WorkflowUpdateStageAccepted indicates the update is accepted
// NOTE: Experimental
WorkflowUpdateStageAccepted = internal.WorkflowUpdateStageAccepted
// WorkflowUpdateStageCompleted indicates the update is completed
// NOTE: Experimental
WorkflowUpdateStageCompleted = internal.WorkflowUpdateStageCompleted
)

const (
// DefaultHostPort is the host:port which is used if not passed with options.
DefaultHostPort = internal.LocalHostPort
Expand Down Expand Up @@ -196,10 +215,10 @@ type (
// ScheduleBackfillOptions configure the parameters for backfilling a schedule.
ScheduleBackfillOptions = internal.ScheduleBackfillOptions

// UpdateWorkflowWithOptionsRequest encapsulates the parameters for
// UpdateWorkflowOptions encapsulates the parameters for
// sending an update to a workflow execution.
// WARNING: Worker versioning is currently experimental
UpdateWorkflowWithOptionsRequest = internal.UpdateWorkflowWithOptionsRequest
// NOTE: Experimental
UpdateWorkflowOptions = internal.UpdateWorkflowOptions

// WorkflowUpdateHandle represents a running or completed workflow
// execution update and gives the holder access to the outcome of the same.
Expand Down Expand Up @@ -566,24 +585,16 @@ type (
// API. If the check fails, an error is returned.
CheckHealth(ctx context.Context, request *CheckHealthRequest) (*CheckHealthResponse, error)

// UpdateWorkflow issues an update request to the specified
// workflow execution and returns the result synchronously. Calling this
// function is equivalent to calling UpdateWorkflowWithOptions with
// the same arguments and indicating that the RPC call should wait for
// completion of the update process.
// NOTE: Experimental
UpdateWorkflow(ctx context.Context, workflowID string, workflowRunID string, updateName string, args ...interface{}) (WorkflowUpdateHandle, error)

// UpdateWorkflowWithOptions issues an update request to the
// UpdateWorkflow issues an update request to the
// specified workflow execution and returns a handle to the update that
// is running in in parallel with the calling thread. Errors returned
// from the server will be exposed through the return value of
// WorkflowUpdateHandle.Get(). Errors that occur before the
// update is requested (e.g. if the required workflow ID field is
// missing from the UpdateWorkflowWithOptionsRequest) are returned
// missing from the UpdateWorkflowOptions) are returned
// directly from this function call.
// NOTE: Experimental
UpdateWorkflowWithOptions(ctx context.Context, request *UpdateWorkflowWithOptionsRequest) (WorkflowUpdateHandle, error)
UpdateWorkflow(ctx context.Context, options UpdateWorkflowOptions) (WorkflowUpdateHandle, error)

// GetWorkflowUpdateHandle creates a handle to the referenced update
// which can be polled for an outcome. Note that runID is optional and
Expand Down
14 changes: 3 additions & 11 deletions internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,24 +356,16 @@ type (
// API. If the check fails, an error is returned.
CheckHealth(ctx context.Context, request *CheckHealthRequest) (*CheckHealthResponse, error)

// UpdateWorkflow issues an update request to the specified
// workflow execution and returns the result synchronously. Calling this
// function is equivalent to calling UpdateWorkflowWithOptions with
// the same arguments and indicating that the RPC call should wait for
// completion of the update process.
// NOTE: Experimental
UpdateWorkflow(ctx context.Context, workflowID string, workflowRunID string, updateName string, args ...interface{}) (WorkflowUpdateHandle, error)

// UpdateWorkflowWithOptions issues an update request to the
// UpdateWorkflow issues an update request to the
// specified workflow execution and returns a handle to the update that
// is running in in parallel with the calling thread. Errors returned
// from the server will be exposed through the return value of
// WorkflowExecutionUpdateHandle.Get(). Errors that occur before the
// update is requested (e.g. if the required workflow ID field is
// missing from the UpdateWorkflowWithOptionsRequest) are returned
// missing from the UpdateWorkflowOptions) are returned
// directly from this function call.
// NOTE: Experimental
UpdateWorkflowWithOptions(ctx context.Context, request *UpdateWorkflowWithOptionsRequest) (WorkflowUpdateHandle, error)
UpdateWorkflow(ctx context.Context, options UpdateWorkflowOptions) (WorkflowUpdateHandle, error)

// GetWorkflowUpdateHandle creates a handle to the referenced update
// which can be polled for an outcome. Note that runID is optional and
Expand Down
13 changes: 11 additions & 2 deletions internal/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ type ClientOutboundInterceptor interface {
// server.
//
// NOTE: Experimental
PollWorkflowUpdate(context.Context, *ClientPollWorkflowUpdateInput) (converter.EncodedValue, error)
PollWorkflowUpdate(context.Context, *ClientPollWorkflowUpdateInput) (*ClientPollWorkflowUpdateOutput, error)

mustEmbedClientOutboundInterceptorBase()
}
Expand All @@ -351,7 +351,7 @@ type ClientUpdateWorkflowInput struct {
Args []interface{}
RunID string
FirstExecutionRunID string
WaitPolicy *updatepb.WaitPolicy
WaitForStage WorkflowUpdateStage
}

// ClientPollWorkflowUpdateInput is the input to
Expand All @@ -360,6 +360,15 @@ type ClientPollWorkflowUpdateInput struct {
UpdateRef *updatepb.UpdateRef
}

// ClientPollWorkflowUpdateOutput is the output to
// ClientOutboundInterceptor.PollWorkflowUpdate.
type ClientPollWorkflowUpdateOutput struct {
// Result is the result of the update, if it has completed successfully.
Result converter.EncodedValue
// Error is the result of a failed update.
Error error
}

// ScheduleClientCreateInput is the input to
// ClientOutboundInterceptor.CreateSchedule.
type ScheduleClientCreateInput struct {
Expand Down
2 changes: 1 addition & 1 deletion internal/interceptor_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ func (c *ClientOutboundInterceptorBase) UpdateWorkflow(
func (c *ClientOutboundInterceptorBase) PollWorkflowUpdate(
ctx context.Context,
in *ClientPollWorkflowUpdateInput,
) (converter.EncodedValue, error) {
) (*ClientPollWorkflowUpdateOutput, error) {
return c.Next.PollWorkflowUpdate(ctx, in)
}

Expand Down
30 changes: 30 additions & 0 deletions internal/internal_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"reflect"

commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
historypb "go.temporal.io/api/history/v1"
protocolpb "go.temporal.io/api/protocol/v1"
updatepb "go.temporal.io/api/update/v1"
Expand All @@ -39,6 +40,20 @@ import (

type updateState string

// WorkflowUpdateStage indicates the stage of an update request.
type WorkflowUpdateStage int

const (
// WorkflowUpdateStageUnspecified indicates the wait stage was not specified
WorkflowUpdateStageUnspecified WorkflowUpdateStage = iota
// WorkflowUpdateStageAdmitted indicates the update is admitted
WorkflowUpdateStageAdmitted
// WorkflowUpdateStageAccepted indicates the update is accepted
WorkflowUpdateStageAccepted
// WorkflowUpdateStageCompleted indicates the update is completed
WorkflowUpdateStageCompleted
)

const (
updateStateNew updateState = "New"
updateStateRequestInitiated updateState = "RequestScheduled"
Expand Down Expand Up @@ -453,3 +468,18 @@ func validateUpdateHandlerFn(fn interface{}) error {
}
return nil
}

func updateLifeCycleStageToProto(l WorkflowUpdateStage) enumspb.UpdateWorkflowExecutionLifecycleStage {
switch l {
case WorkflowUpdateStageUnspecified:
return enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_UNSPECIFIED
case WorkflowUpdateStageAdmitted:
return enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ADMITTED
case WorkflowUpdateStageAccepted:
return enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED
case WorkflowUpdateStageCompleted:
return enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED
default:
panic("unknown update lifecycle stage")
}
}
Loading
Loading