Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CLI Refresh: Implement workflow reset #443

Merged
merged 12 commits into from
Feb 15, 2024
26 changes: 26 additions & 0 deletions temporalcli/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,18 @@ import (
"strings"

"go.temporal.io/api/common/v1"
"go.temporal.io/api/serviceerror"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/converter"
"go.temporal.io/sdk/log"

"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)

const localHostPort = "127.0.0.1:7233"
tdeebswihart marked this conversation as resolved.
Show resolved Hide resolved

func (c *ClientOptions) dialClient(cctx *CommandContext) (client.Client, error) {
clientOptions := client.Options{
HostPort: c.Address,
Expand Down Expand Up @@ -142,6 +147,27 @@ func payloadCodecInterceptor(namespace, codecEndpoint, codecAuth string) (grpc.U
)
}

func errorInterceptor() grpc.UnaryClientInterceptor {
tdeebswihart marked this conversation as resolved.
Show resolved Hide resolved
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
err := invoker(ctx, method, req, reply, cc, opts...)
err = serviceerror.FromStatus(status.Convert(err))
return err
}
}

func headersProviderInterceptor(provider stringMapHeadersProvider) grpc.UnaryClientInterceptor {
tdeebswihart marked this conversation as resolved.
Show resolved Hide resolved
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
headers, err := provider.GetHeaders(ctx)
if err != nil {
return err
}
for k, v := range headers {
ctx = metadata.AppendToOutgoingContext(ctx, k, v)
}
return invoker(ctx, method, req, reply, cc, opts...)
}
}

func clientIdentity() string {
hostname, err := os.Hostname()
if err != nil {
Expand Down
26 changes: 23 additions & 3 deletions temporalcli/commands.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,8 +675,14 @@ func NewTemporalWorkflowQueryCommand(cctx *CommandContext, parent *TemporalWorkf
}

type TemporalWorkflowResetCommand struct {
Parent *TemporalWorkflowCommand
Command cobra.Command
Parent *TemporalWorkflowCommand
Command cobra.Command
WorkflowId string
RunId string
EventId int
Reason string
ReapplyType StringEnum
Type StringEnum
}

func NewTemporalWorkflowResetCommand(cctx *CommandContext, parent *TemporalWorkflowCommand) *TemporalWorkflowResetCommand {
Expand All @@ -685,8 +691,22 @@ func NewTemporalWorkflowResetCommand(cctx *CommandContext, parent *TemporalWorkf
s.Command.DisableFlagsInUseLine = true
s.Command.Use = "reset [flags]"
s.Command.Short = "Resets a Workflow Execution by Event ID or reset type."
s.Command.Long = "TODO"
if hasHighlighting {
s.Command.Long = "The temporal workflow reset command resets a Workflow Execution.\nA reset allows the Workflow to resume from a certain point without losing its parameters or Event History.\n\nThe Workflow Execution can be set to a given Event Type:\n\x1b[1mtemporal workflow reset --workflow-id=meaningful-business-id --type=LastContinuedAsNew\x1b[0m\n\n...or a specific any Event after \x1b[1mWorkflowTaskStarted\x1b[0m.\n\x1b[1mtemporal workflow reset --workflow-id=meaningful-business-id --event-id=MyLastEvent\x1b[0m\n\nUse the options listed below to change reset behavior."
} else {
s.Command.Long = "The temporal workflow reset command resets a Workflow Execution.\nA reset allows the Workflow to resume from a certain point without losing its parameters or Event History.\n\nThe Workflow Execution can be set to a given Event Type:\n```\ntemporal workflow reset --workflow-id=meaningful-business-id --type=LastContinuedAsNew\n```\n\n...or a specific any Event after `WorkflowTaskStarted`.\n```\ntemporal workflow reset --workflow-id=meaningful-business-id --event-id=MyLastEvent\n```\n\nUse the options listed below to change reset behavior."
}
s.Command.Args = cobra.NoArgs
s.Command.Flags().StringVarP(&s.WorkflowId, "workflow-id", "w", "", "Workflow Id.")
_ = cobra.MarkFlagRequired(s.Command.Flags(), "workflow-id")
s.Command.Flags().StringVarP(&s.RunId, "run-id", "r", "", "Run Id.")
s.Command.Flags().IntVarP(&s.EventId, "event-id", "e", 0, "The Event Id for any Event after `WorkflowTaskStarted` you want to reset to (exclusive). It can be `WorkflowTaskCompleted`, `WorkflowTaskFailed` or others.")
s.Command.Flags().StringVar(&s.Reason, "reason", "", "The reason why this workflow is being reset.")
_ = cobra.MarkFlagRequired(s.Command.Flags(), "reason")
s.ReapplyType = NewStringEnum([]string{"All", "Signal", "None"}, "All")
s.Command.Flags().Var(&s.ReapplyType, "reapply-type", "Event types to reapply after the reset point. Accepted values: All, Signal, None.")
s.Type = NewStringEnum([]string{"FirstWorkflowTask", "LastWorkflowTask", "LastContinuedAsNew"}, "")
s.Command.Flags().VarP(&s.Type, "type", "t", "Event type to which you want to reset. Accepted values: FirstWorkflowTask, LastWorkflowTask, LastContinuedAsNew.")
s.Command.Run = func(c *cobra.Command, args []string) {
if err := s.run(cctx, args); err != nil {
cctx.Options.Fail(err)
Expand Down
208 changes: 204 additions & 4 deletions temporalcli/commands.workflow.go
tdeebswihart marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package temporalcli

import (
"context"
"encoding/json"
"errors"
"fmt"
"os/user"

Expand All @@ -16,6 +18,19 @@ import (
"go.temporal.io/sdk/client"
)

var (
resetTypesMap = map[string]interface{}{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused

"FirstWorkflowTask": "",
"LastWorkflowTask": "",
"LastContinuedAsNew": "",
}
resetReapplyTypesMap = map[string]interface{}{
tdeebswihart marked this conversation as resolved.
Show resolved Hide resolved
"": enums.RESET_REAPPLY_TYPE_SIGNAL, // default value
"Signal": enums.RESET_REAPPLY_TYPE_SIGNAL,
"None": enums.RESET_REAPPLY_TYPE_NONE,
}
)

func (c *TemporalWorkflowCancelCommand) run(cctx *CommandContext, args []string) error {
cl, err := c.Parent.ClientOptions.dialClient(cctx)
if err != nil {
Expand Down Expand Up @@ -110,8 +125,57 @@ func (c *TemporalWorkflowQueryCommand) run(cctx *CommandContext, args []string)
return cctx.Printer.PrintStructured(output, printer.StructuredOptions{})
}

func (*TemporalWorkflowResetCommand) run(*CommandContext, []string) error {
return fmt.Errorf("TODO")
func (c *TemporalWorkflowResetCommand) run(cctx *CommandContext, _ []string) error {
if c.Type.Value == "" && c.EventId <= 0 {
return errors.New("must specify either valid event id or reset type")
}
cl, err := c.Parent.ClientOptions.dialClient(cctx)
if err != nil {
return err
}
defer cl.Close()

resetBaseRunID := c.RunId
eventID := int64(c.EventId)
if c.Type.Value != "" {
resetBaseRunID, eventID, err = getResetEventIDByType(cctx, c.Type.Value, c.Parent.Namespace, c.WorkflowId, c.RunId, cl.WorkflowService())
tdeebswihart marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return fmt.Errorf("getting reset event ID by type failed: %w", err)
}
}
reapplyType := enums.RESET_REAPPLY_TYPE_SIGNAL
if c.ReapplyType.Value != "All" {
reapplyType, err = enums.ResetReapplyTypeFromString(c.ReapplyType.Value)
if err != nil {
return err
}
}

if eventID > 0 {
cctx.Printer.Printlnf("Resetting workflow %s to event ID %d", c.WorkflowId, eventID)
tdeebswihart marked this conversation as resolved.
Show resolved Hide resolved
}

resp, err := cl.ResetWorkflowExecution(cctx, &workflowservice.ResetWorkflowExecutionRequest{
Namespace: c.Parent.Namespace,
WorkflowExecution: &common.WorkflowExecution{
WorkflowId: c.WorkflowId,
RunId: resetBaseRunID,
},
Reason: fmt.Sprintf("%s: %s", username(), c.Reason),
WorkflowTaskFinishEventId: eventID,
RequestId: uuid.NewString(),
tdeebswihart marked this conversation as resolved.
Show resolved Hide resolved
ResetReapplyType: reapplyType,
})
if err != nil {
return fmt.Errorf("failed to reset workflow: %w", err)
}

if cctx.JSONOutput {
return cctx.Printer.PrintStructured(
resp,
printer.StructuredOptions{})
}
return nil
}

func (*TemporalWorkflowResetBatchCommand) run(*CommandContext, []string) error {
Expand Down Expand Up @@ -227,12 +291,16 @@ func (*TemporalWorkflowUpdateCommand) run(*CommandContext, []string) error {
return fmt.Errorf("TODO")
}

func defaultReason() string {
func username() string {
username := "<unknown-user>"
if u, err := user.Current(); err != nil && u.Username != "" {
username = u.Username
}
return "Requested from CLI by " + username
return username
}

func defaultReason() string {
return "Requested from CLI by " + username()
}

type singleOrBatchOverrides struct {
Expand Down Expand Up @@ -309,3 +377,135 @@ func startBatchJob(cctx *CommandContext, cl client.Client, req *workflowservice.
cctx.Printer.Printlnf("Started batch for job ID: %v", req.JobId)
return nil
}

func getResetEventIDByType(ctx context.Context, resetType, namespace, wid, rid string, wfsvc workflowservice.WorkflowServiceClient) (string, int64, error) {
switch resetType {
case "LastWorkflowTask":
return getLastWorkflowTaskEventID(ctx, namespace, wid, rid, wfsvc)
case "LastContinuedAsNew":
return getLastContinueAsNewID(ctx, namespace, wid, rid, wfsvc)
case "FirstWorkflowTask":
return getFirstWorkflowTaskEventID(ctx, namespace, wid, rid, wfsvc)
default:
return "", -1, fmt.Errorf("invalid reset type: %s", resetType)
}
}

// Returns event id of the last completed task or id of the next event after scheduled task.
func getLastWorkflowTaskEventID(ctx context.Context, namespace, wid, rid string, wfsvc workflowservice.WorkflowServiceClient) (resetBaseRunID string, workflowTaskEventID int64, err error) {
resetBaseRunID = rid
req := workflowservice.GetWorkflowExecutionHistoryReverseRequest{
Namespace: namespace,
Execution: &common.WorkflowExecution{
WorkflowId: wid,
RunId: rid,
},
MaximumPageSize: 250,
NextPageToken: nil,
}

for more := true; more; more = len(req.NextPageToken) != 0 {
resp, err := wfsvc.GetWorkflowExecutionHistoryReverse(ctx, &req)
if err != nil {
return "", 0, fmt.Errorf("failed to get workflow execution history: %w", err)
}
for _, e := range resp.GetHistory().GetEvents() {
if e.GetEventType() == enums.EVENT_TYPE_WORKFLOW_TASK_COMPLETED {
workflowTaskEventID = e.GetEventId()
break
} else if e.GetEventType() == enums.EVENT_TYPE_WORKFLOW_TASK_SCHEDULED {
// if there is no task completed event, set it to first scheduled event + 1
workflowTaskEventID = e.GetEventId() + 1
}
}
tdeebswihart marked this conversation as resolved.
Show resolved Hide resolved
req.NextPageToken = resp.NextPageToken
}
if workflowTaskEventID == 0 {
return "", 0, errors.New("unable to find any scheduled or completed task")
}
return
}

// Returns id of the first workflow task completed event or if it doesn't exist then id of the event after task scheduled event.
func getFirstWorkflowTaskEventID(ctx context.Context, namespace, wid, rid string, wfsvc workflowservice.WorkflowServiceClient) (resetBaseRunID string, workflowTaskEventID int64, err error) {
resetBaseRunID = rid
req := workflowservice.GetWorkflowExecutionHistoryRequest{
Namespace: namespace,
Execution: &common.WorkflowExecution{
WorkflowId: wid,
RunId: rid,
},
MaximumPageSize: 250,
NextPageToken: nil,
}
for more := true; more; more = len(req.NextPageToken) != 0 {
resp, err := wfsvc.GetWorkflowExecutionHistory(ctx, &req)
if err != nil {
return "", 0, fmt.Errorf("failed to get workflow execution history: %w", err)
}
for _, e := range resp.GetHistory().GetEvents() {
if e.GetEventType() == enums.EVENT_TYPE_WORKFLOW_TASK_COMPLETED {
workflowTaskEventID = e.GetEventId()
return resetBaseRunID, workflowTaskEventID, nil
}
if e.GetEventType() == enums.EVENT_TYPE_WORKFLOW_TASK_SCHEDULED {
if workflowTaskEventID == 0 {
workflowTaskEventID = e.GetEventId() + 1
}
}
}
req.NextPageToken = resp.NextPageToken
}
if workflowTaskEventID == 0 {
return "", 0, errors.New("unable to find any scheduled or completed task")
}
return
}

func getLastContinueAsNewID(ctx context.Context, namespace, wid, rid string, wfsvc workflowservice.WorkflowServiceClient) (resetBaseRunID string, workflowTaskCompletedID int64, err error) {
// get first event
req := &workflowservice.GetWorkflowExecutionHistoryRequest{
Namespace: namespace,
Execution: &common.WorkflowExecution{
WorkflowId: wid,
RunId: rid,
},
MaximumPageSize: 1,
NextPageToken: nil,
}
resp, err := wfsvc.GetWorkflowExecutionHistory(ctx, req)
if err != nil {
return "", 0, fmt.Errorf("failed to get workflow execution history: %w", err)
}
firstEvent := resp.History.Events[0]
resetBaseRunID = firstEvent.GetWorkflowExecutionStartedEventAttributes().GetContinuedExecutionRunId()
if resetBaseRunID == "" {
return "", 0, errors.New("cannot use LastContinuedAsNew for workflow; workflow was not continued from another")
}

req = &workflowservice.GetWorkflowExecutionHistoryRequest{
Namespace: namespace,
Execution: &common.WorkflowExecution{
WorkflowId: wid,
RunId: resetBaseRunID,
},
MaximumPageSize: 250,
NextPageToken: nil,
}
for more := true; more; more = len(req.NextPageToken) != 0 {
resp, err := wfsvc.GetWorkflowExecutionHistory(ctx, req)
if err != nil {
return "", 0, fmt.Errorf("failed to get workflow execution history of previous execution (run id %s): %w", resetBaseRunID, err)
}
for _, e := range resp.GetHistory().GetEvents() {
if e.GetEventType() == enums.EVENT_TYPE_WORKFLOW_TASK_COMPLETED {
workflowTaskCompletedID = e.GetEventId()
}
}
req.NextPageToken = resp.NextPageToken
}
if workflowTaskCompletedID == 0 {
return "", 0, errors.New("unable to find WorkflowTaskCompleted event for previous execution")
}
return
}
Loading
Loading