Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add workflow stack command #457

Merged
merged 5 commits into from
Feb 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions temporalcli/commands.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -818,16 +818,25 @@ func NewTemporalWorkflowSignalCommand(cctx *CommandContext, parent *TemporalWork
type TemporalWorkflowStackCommand struct {
Parent *TemporalWorkflowCommand
Command cobra.Command
WorkflowReferenceOptions
RejectCondition StringEnum
}

func NewTemporalWorkflowStackCommand(cctx *CommandContext, parent *TemporalWorkflowCommand) *TemporalWorkflowStackCommand {
var s TemporalWorkflowStackCommand
s.Parent = parent
s.Command.DisableFlagsInUseLine = true
s.Command.Use = "stack [flags]"
s.Command.Short = "Query a Workflow Execution with __stack_trace as the query type."
s.Command.Long = "TODO"
s.Command.Short = "Query a Workflow Execution for its stack trace."
if hasHighlighting {
s.Command.Long = "The \x1b[1mtemporal workflow stack\x1b[0m command Queries a\nWorkflow Execution with \x1b[1m__stack_trace\x1b[0m as the query type.\nThis returns a stack trace of all the threads or routines currently used by the workflow, and is\nuseful for troubleshooting.\n\n\x1b[1mtemporal workflow stack --workflow-id MyWorkflowId\x1b[0m\n\nUse the options listed below to change the command's behavior."
} else {
s.Command.Long = "The `temporal workflow stack` command Queries a\nWorkflow Execution with `__stack_trace` as the query type.\nThis returns a stack trace of all the threads or routines currently used by the workflow, and is\nuseful for troubleshooting.\n\n```\ntemporal workflow stack --workflow-id MyWorkflowId\n```\n\nUse the options listed below to change the command's behavior."
}
s.Command.Args = cobra.NoArgs
s.WorkflowReferenceOptions.buildFlags(cctx, s.Command.Flags())
s.RejectCondition = NewStringEnum([]string{"not_open", "not_completed_cleanly"}, "")
s.Command.Flags().Var(&s.RejectCondition, "reject-condition", "Optional flag for rejecting Queries based on Workflow state. Accepted values: not_open, not_completed_cleanly.")
s.Command.Run = func(c *cobra.Command, args []string) {
if err := s.run(cctx, args); err != nil {
cctx.Options.Fail(err)
Expand Down
128 changes: 70 additions & 58 deletions temporalcli/commands.workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,13 @@ import (

"github.com/fatih/color"
"github.com/google/uuid"
"github.com/temporalio/cli/temporalcli/internal/printer"
"go.temporal.io/api/batch/v1"
"go.temporal.io/api/common/v1"
"go.temporal.io/api/enums/v1"
"go.temporal.io/api/query/v1"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/sdk/client"

"github.com/temporalio/cli/temporalcli/internal/printer"
)

func (c *TemporalWorkflowCancelCommand) run(cctx *CommandContext, args []string) error {
Expand Down Expand Up @@ -55,60 +54,8 @@ func (*TemporalWorkflowDeleteCommand) run(*CommandContext, []string) error {
}

func (c *TemporalWorkflowQueryCommand) run(cctx *CommandContext, args []string) error {
cl, err := c.Parent.ClientOptions.dialClient(cctx)
if err != nil {
return err
}
defer cl.Close()

// Get input payloads
input, err := c.buildRawInputPayloads()
if err != nil {
return err
}

queryRejectCond := enums.QUERY_REJECT_CONDITION_UNSPECIFIED
switch c.RejectCondition.Value {
case "":
case "not_open":
queryRejectCond = enums.QUERY_REJECT_CONDITION_NOT_OPEN
case "not_completed_cleanly":
queryRejectCond = enums.QUERY_REJECT_CONDITION_NOT_COMPLETED_CLEANLY
default:
return fmt.Errorf("invalid query reject condition: %v, valid values are: 'not_open', 'not_completed_cleanly'", c.RejectCondition)
}

result, err := cl.WorkflowService().QueryWorkflow(cctx, &workflowservice.QueryWorkflowRequest{
Namespace: c.Parent.Namespace,
Execution: &common.WorkflowExecution{WorkflowId: c.WorkflowId, RunId: c.RunId},
Query: &query.WorkflowQuery{
QueryType: c.Type,
QueryArgs: input,
},
QueryRejectCondition: queryRejectCond,
})

if err != nil {
return fmt.Errorf("querying workflow failed: %w", err)
}

if result.QueryRejected != nil {
return fmt.Errorf("query was rejected, workflow has status: %v\n", result.QueryRejected.GetStatus())
}

if cctx.JSONOutput {
return cctx.Printer.PrintStructured(result, printer.StructuredOptions{})
}

cctx.Printer.Println(color.MagentaString("Query result:"))
output := struct {
QueryResult json.RawMessage `cli:",cardOmitEmpty"`
}{}
output.QueryResult, err = cctx.MarshalFriendlyJSONPayloads(result.QueryResult)
if err != nil {
return fmt.Errorf("failed to marshal query result: %w", err)
}
return cctx.Printer.PrintStructured(output, printer.StructuredOptions{})
return queryHelper(cctx, c.Parent, c.PayloadInputOptions,
c.Type, c.RejectCondition, c.WorkflowReferenceOptions)
}

func (*TemporalWorkflowResetBatchCommand) run(*CommandContext, []string) error {
Expand Down Expand Up @@ -163,8 +110,9 @@ func (c *TemporalWorkflowSignalCommand) run(cctx *CommandContext, args []string)
return nil
}

func (*TemporalWorkflowStackCommand) run(*CommandContext, []string) error {
return fmt.Errorf("TODO")
func (c *TemporalWorkflowStackCommand) run(cctx *CommandContext, args []string) error {
return queryHelper(cctx, c.Parent, PayloadInputOptions{},
"__stack_trace", c.RejectCondition, c.WorkflowReferenceOptions)
}

func (c *TemporalWorkflowTerminateCommand) run(cctx *CommandContext, _ []string) error {
Expand Down Expand Up @@ -347,3 +295,67 @@ func startBatchJob(cctx *CommandContext, cl client.Client, req *workflowservice.
cctx.Printer.Printlnf("Started batch for job ID: %v", req.JobId)
return nil
}

func queryHelper(cctx *CommandContext,
parent *TemporalWorkflowCommand,
inputOpts PayloadInputOptions,
queryType string,
rejectCondition StringEnum,
execution WorkflowReferenceOptions,
) error {
cl, err := parent.ClientOptions.dialClient(cctx)
if err != nil {
return err
}
defer cl.Close()

// Get input payloads
input, err := inputOpts.buildRawInputPayloads()
if err != nil {
return err
}

queryRejectCond := enums.QUERY_REJECT_CONDITION_UNSPECIFIED
switch rejectCondition.Value {
case "":
case "not_open":
queryRejectCond = enums.QUERY_REJECT_CONDITION_NOT_OPEN
case "not_completed_cleanly":
queryRejectCond = enums.QUERY_REJECT_CONDITION_NOT_COMPLETED_CLEANLY
default:
return fmt.Errorf("invalid query reject condition: %v, valid values are: 'not_open', 'not_completed_cleanly'", rejectCondition)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In theory with how string-enum works this should never get this far, but I support the default anyways

}

result, err := cl.WorkflowService().QueryWorkflow(cctx, &workflowservice.QueryWorkflowRequest{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mentioned in other review, and don't have to change, but this could use the high-level QueryWorkflowWithOptions (changing buildRawInputPayloads to buildRawInput), but no big difference.

Namespace: parent.Namespace,
Execution: &common.WorkflowExecution{WorkflowId: execution.WorkflowId, RunId: execution.RunId},
Query: &query.WorkflowQuery{
QueryType: queryType,
QueryArgs: input,
},
QueryRejectCondition: queryRejectCond,
})

if err != nil {
return fmt.Errorf("querying workflow failed: %w", err)
}

if result.QueryRejected != nil {
return fmt.Errorf("query was rejected, workflow has status: %v", result.QueryRejected.GetStatus())
}

if cctx.JSONOutput {
return cctx.Printer.PrintStructured(result, printer.StructuredOptions{})
}

cctx.Printer.Println(color.MagentaString("Query result:"))
output := struct {
QueryResult json.RawMessage `cli:",cardOmitEmpty"`
}{}
output.QueryResult, err = cctx.MarshalFriendlyJSONPayloads(result.QueryResult)
if err != nil {
return fmt.Errorf("failed to marshal query result: %w", err)
}

return cctx.Printer.PrintStructured(output, printer.StructuredOptions{})
}
66 changes: 66 additions & 0 deletions temporalcli/commands.workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,3 +524,69 @@ func (s *SharedServerSuite) testQueryWorkflow(json bool) {
s.Error(res.Err)
s.Contains(res.Err.Error(), "query was rejected, workflow has status: Completed")
}

func (s *SharedServerSuite) TestWorkflow_Stack_SingleWorkflowSuccess() {
s.testStackWorkflow(false)
}

func (s *SharedServerSuite) TestWorkflow_Stack_SingleWorkflowSuccessJSON() {
s.testStackWorkflow(true)
}

func (s *SharedServerSuite) testStackWorkflow(json bool) {
// Make workflow wait for signal and then return it
s.Worker.OnDevWorkflow(func(ctx workflow.Context, a any) (any, error) {
done := false
workflow.Go(ctx, func(ctx workflow.Context) {
_ = workflow.Await(ctx, func() bool {
return done
})
})
workflow.GetSignalChannel(ctx, "my-signal").Receive(ctx, nil)
done = true
return nil, nil

})

// Start the workflow
run, err := s.Client.ExecuteWorkflow(
s.Context,
client.StartWorkflowOptions{TaskQueue: s.Worker.Options.TaskQueue},
DevWorkflow,
"ignored",
)
s.NoError(err)

args := []string{
"workflow", "stack",
"--address", s.Address(),
"-w", run.GetID(),
}
if json {
args = append(args, "-o", "json")
}
// Do the query
res := s.Execute(args...)
s.NoError(res.Err)
s.Contains(res.Stdout.String(), "coroutine root")
s.Contains(res.Stdout.String(), "coroutine 2")

// Unblock the workflow with a signal
s.NoError(s.Client.SignalWorkflow(s.Context, run.GetID(), "", "my-signal", nil))

s.NoError(run.Get(s.Context, nil))

// Ensure query is rejected when using not open rejection condition
args = []string{
"workflow", "stack",
"--address", s.Address(),
"-w", run.GetID(),
"--reject-condition", "not_open",
}
if json {
args = append(args, "-o", "json")
}
res = s.Execute(args...)
s.Error(res.Err)
s.Contains(res.Err.Error(), "query was rejected, workflow has status: Completed")
}
20 changes: 18 additions & 2 deletions temporalcli/commandsmd/commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -441,9 +441,25 @@ Includes options set for [payload input](#options-set-for-payload-input).
otherwise. Defaults to message with the current user's name.
* `--yes`, `-y` (bool) - Confirm prompt to perform batch. Only allowed if query is present.

### temporal workflow stack: Query a Workflow Execution with __stack_trace as the query type.
### temporal workflow stack: Query a Workflow Execution for its stack trace.

TODO
The `temporal workflow stack` command [Queries](/concepts/what-is-a-query) a
[Workflow Execution](/concepts/what-is-a-workflow-execution) with `__stack_trace` as the query type.
This returns a stack trace of all the threads or routines currently used by the workflow, and is
useful for troubleshooting.

```
temporal workflow stack --workflow-id MyWorkflowId
```

Use the options listed below to change the command's behavior.

#### Options

* `--reject-condition` (string-enum) - Optional flag for rejecting Queries based on Workflow state.
Options: not_open, not_completed_cleanly.

Includes options set for [workflow reference](#options-set-for-workflow-reference).

### temporal workflow start: Starts a new Workflow Execution.

Expand Down
Loading