Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Supported flattened event detail view on workflow show/execute #615

Merged
merged 2 commits into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions temporalcli/commands.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -2048,7 +2048,7 @@ type TemporalWorkflowExecuteCommand struct {
SharedWorkflowStartOptions
WorkflowStartOptions
PayloadInputOptions
EventDetails bool
Detailed bool
}

func NewTemporalWorkflowExecuteCommand(cctx *CommandContext, parent *TemporalWorkflowCommand) *TemporalWorkflowExecuteCommand {
Expand All @@ -2066,7 +2066,7 @@ func NewTemporalWorkflowExecuteCommand(cctx *CommandContext, parent *TemporalWor
s.SharedWorkflowStartOptions.buildFlags(cctx, s.Command.Flags())
s.WorkflowStartOptions.buildFlags(cctx, s.Command.Flags())
s.PayloadInputOptions.buildFlags(cctx, s.Command.Flags())
s.Command.Flags().BoolVar(&s.EventDetails, "event-details", false, "If set when using text output, include event details JSON in printed output. If set when using JSON output, this will include the entire \"history\" JSON key of the started run (does not follow runs).")
s.Command.Flags().BoolVar(&s.Detailed, "detailed", false, "If set when using text output, display events as sections with detail instead of simple table. If set when using JSON output, this will include the entire \"history\" JSON key of the started run (does not follow runs).")
s.Command.Flags().SetNormalizeFunc(aliasNormalizer(map[string]string{
"name": "type",
}))
Expand Down Expand Up @@ -2229,8 +2229,8 @@ type TemporalWorkflowShowCommand struct {
Parent *TemporalWorkflowCommand
Command cobra.Command
WorkflowReferenceOptions
Follow bool
EventDetails bool
Follow bool
Detailed bool
}

func NewTemporalWorkflowShowCommand(cctx *CommandContext, parent *TemporalWorkflowCommand) *TemporalWorkflowShowCommand {
Expand All @@ -2247,7 +2247,7 @@ func NewTemporalWorkflowShowCommand(cctx *CommandContext, parent *TemporalWorkfl
s.Command.Args = cobra.NoArgs
s.WorkflowReferenceOptions.buildFlags(cctx, s.Command.Flags())
s.Command.Flags().BoolVarP(&s.Follow, "follow", "f", false, "Follow the progress of a Workflow Execution in real time (does not apply to JSON output).")
s.Command.Flags().BoolVar(&s.EventDetails, "event-details", false, "If set when using text output, include event details JSON in printed output.")
s.Command.Flags().BoolVar(&s.Detailed, "detailed", false, "If set when using text output, display events as sections with detail instead of simple table.")
s.Command.Run = func(c *cobra.Command, args []string) {
if err := s.run(cctx, args); err != nil {
cctx.Options.Fail(err)
Expand Down
144 changes: 124 additions & 20 deletions temporalcli/commands.workflow_exec.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package temporalcli

import (
"bytes"
"context"
"encoding/json"
"fmt"
"os"
"reflect"
"sort"
"strings"
"time"

Expand All @@ -14,9 +16,8 @@ import (
"go.temporal.io/api/common/v1"
"go.temporal.io/api/enums/v1"
"go.temporal.io/api/history/v1"
"go.temporal.io/api/temporalproto"
"go.temporal.io/sdk/client"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
)

func (c *TemporalWorkflowStartCommand) run(cctx *CommandContext, args []string) error {
Expand Down Expand Up @@ -51,10 +52,10 @@ func (c *TemporalWorkflowExecuteCommand) run(cctx *CommandContext, args []string
client: cl,
workflowID: run.GetID(),
runID: run.GetRunID(),
includeDetails: c.EventDetails,
includeDetails: c.Detailed,
follow: true,
}
if err := iter.print(cctx.Printer); err != nil && cctx.Err() == nil {
if err := iter.print(cctx); err != nil && cctx.Err() == nil {
return fmt.Errorf("displaying history failed: %w", err)
}
// Separate newline
Expand Down Expand Up @@ -140,7 +141,7 @@ func (c *TemporalWorkflowExecuteCommand) printJSONResult(
}

// Build history if requested
if c.EventDetails {
if c.Detailed {
var histProto history.History
iter := client.GetWorkflowHistory(cctx, run.GetID(), run.GetRunID(), false, enums.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT)
for iter.HasNext() {
Expand Down Expand Up @@ -382,12 +383,40 @@ type structuredHistoryIter struct {
iter client.HistoryEventIterator
}

func (s *structuredHistoryIter) print(p *printer.Printer) error {
options := printer.StructuredOptions{Table: &printer.TableOptions{}}
func (s *structuredHistoryIter) print(cctx *CommandContext) error {
// If we're not including details, just print the streaming table
if !s.includeDetails {
options.ExcludeFields = []string{"Details"}
return cctx.Printer.PrintStructuredTableIter(
structuredHistoryEventType,
s,
printer.StructuredOptions{Table: &printer.TableOptions{}},
)
}

// Since details are wanted, we are going to do each event as a section
first := true
for {
event, err := s.NextRawEvent()
if event == nil || err != nil {
return err
}
// Add blank line if not first
if !first {
cctx.Printer.Println()
}
first = false

// Print section heading
cctx.Printer.Printlnf("--------------- [%v] %v ---------------", event.EventId, event.EventType)
// Convert the event to dot-delimited-field/value and print one per line
fields, err := s.flattenFields(cctx, event)
if err != nil {
return fmt.Errorf("failed flattening event fields: %w", err)
}
for _, field := range fields {
cctx.Printer.Printlnf("%v: %v", field.field, field.value)
}
}
return p.PrintStructuredIter(structuredHistoryEventType, s, options)
}

type structuredHistoryEvent struct {
Expand All @@ -399,8 +428,7 @@ type structuredHistoryEvent struct {
Time string `cli:",width=20"`
// We're going to set width to a semi-reasonable number for good header
// placement, but we expect it to extend past for larger
Type string `cli:",width=26"`
Details string `cli:",width=20"`
Type string `cli:",width=26"`
}

var structuredHistoryEventType = reflect.TypeOf(structuredHistoryEvent{})
Expand All @@ -419,15 +447,6 @@ func (s *structuredHistoryIter) Next() (any, error) {
Time: event.EventTime.AsTime().Format(time.RFC3339),
Type: coloredEventType(event.EventType),
}
if s.includeDetails {
// First field in the attributes
attrs := reflect.ValueOf(event.Attributes).Elem().Field(0).Interface().(proto.Message)
if b, err := protojson.Marshal(attrs); err != nil {
data.Details = "<failed serializing details>"
} else {
data.Details = string(b)
}
}

// Follow continue as new
if attr := event.GetWorkflowExecutionContinuedAsNewEventAttributes(); attr != nil {
Expand Down Expand Up @@ -456,6 +475,91 @@ func (s *structuredHistoryIter) NextRawEvent() (*history.HistoryEvent, error) {
return event, nil
}

type eventFieldValue struct {
field string
value string
}

func (s *structuredHistoryIter) flattenFields(
cctx *CommandContext,
event *history.HistoryEvent,
) ([]eventFieldValue, error) {
// We want all event fields and all attribute fields converted to the same
// top-level JSON object. First do the proto conversion.
opts := temporalproto.CustomJSONMarshalOptions{}
if cctx.JSONShorthandPayloads {
opts.Metadata = map[string]any{common.EnablePayloadShorthandMetadataKey: true}
}
protoJSON, err := opts.Marshal(event)
if err != nil {
return nil, fmt.Errorf("failed marshaling event: %w", err)
}
// Convert from string back to JSON
dec := json.NewDecoder(bytes.NewReader(protoJSON))
// We want json.Number
dec.UseNumber()
fieldsMap := map[string]any{}
if err := dec.Decode(&fieldsMap); err != nil {
return nil, fmt.Errorf("failed unmarshaling event proto: %w", err)
}
// Exclude eventId and eventType
delete(fieldsMap, "eventId")
delete(fieldsMap, "eventType")
// Lift any "Attributes"-suffixed fields up to the top level
for k, v := range fieldsMap {
if strings.HasSuffix(k, "Attributes") {
subMap, ok := v.(map[string]any)
if !ok {
return nil, fmt.Errorf("unexpectedly invalid attribute map")
}
for subK, subV := range subMap {
fieldsMap[subK] = subV
}
delete(fieldsMap, k)
}
}
// Flatten JSON map and sort
fields, err := s.flattenJSONValue(nil, "", fieldsMap)
if err != nil {
return nil, err
}
sort.Slice(fields, func(i, j int) bool { return fields[i].field < fields[j].field })
return fields, nil
Comment on lines +526 to +527
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For arrays, will sorting do alphabetical sort as opposed to numerical "7" > "10"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might, good catch! I will fix this and add a test for it.

}

func (s *structuredHistoryIter) flattenJSONValue(
to []eventFieldValue,
field string,
value any,
) ([]eventFieldValue, error) {
var err error
switch value := value.(type) {
case bool, string, json.Number, nil:
// Note, empty values should not occur
to = append(to, eventFieldValue{field, fmt.Sprintf("%v", value)})
case []any:
for i, subValue := range value {
if to, err = s.flattenJSONValue(to, fmt.Sprintf("%v[%v]", field, i), subValue); err != nil {
return nil, err
}
}
case map[string]any:
// Only add a dot if existing field not empty (i.e. not first)
prefix := field
if prefix != "" {
prefix += "."
}
for subField, subValue := range value {
if to, err = s.flattenJSONValue(to, prefix+subField, subValue); err != nil {
return nil, err
}
}
default:
return nil, fmt.Errorf("failed converting field %v, unknown type %T", field, value)
}
return to, nil
}

func isWorkflowTerminatingEvent(t enums.EventType) bool {
switch t {
case enums.EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED,
Expand Down
2 changes: 1 addition & 1 deletion temporalcli/commands.workflow_exec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ func (s *SharedServerSuite) TestWorkflow_Execute_CodecEndpoint() {
// actually decoded for the user
res = s.Execute(
"workflow", "execute",
"-o", "json", "--event-details",
"-o", "json", "--detailed",
"--codec-endpoint", "http://"+srv.Listener.Addr().String(),
"--address", s.Address(),
"--task-queue", taskQueue,
Expand Down
8 changes: 4 additions & 4 deletions temporalcli/commands.workflow_view.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (c *TemporalWorkflowDescribeCommand) run(cctx *CommandContext, args []strin
return nil
}

func (c *TemporalWorkflowListCommand) run(cctx *CommandContext, args []string) error {
func (c *TemporalWorkflowListCommand) run(cctx *CommandContext, _ []string) error {
cl, err := c.Parent.ClientOptions.dialClient(cctx)
if err != nil {
return err
Expand Down Expand Up @@ -268,7 +268,7 @@ func (c *TemporalWorkflowListCommand) pageFetcher(
}
}

func (c *TemporalWorkflowCountCommand) run(cctx *CommandContext, _ []string) error {
func (c *TemporalWorkflowCountCommand) run(cctx *CommandContext, args []string) error {
cl, err := c.Parent.ClientOptions.dialClient(cctx)
if err != nil {
return err
Expand Down Expand Up @@ -329,12 +329,12 @@ func (c *TemporalWorkflowShowCommand) run(cctx *CommandContext, _ []string) erro
client: cl,
workflowID: c.WorkflowId,
runID: c.RunId,
includeDetails: c.EventDetails,
includeDetails: c.Detailed,
follow: c.Follow,
}
if !cctx.JSONOutput {
cctx.Printer.Println(color.MagentaString("Progress:"))
if err := iter.print(cctx.Printer); err != nil {
if err := iter.print(cctx); err != nil {
return fmt.Errorf("displaying history failed: %w", err)
}
cctx.Printer.Println()
Expand Down
26 changes: 16 additions & 10 deletions temporalcli/commands.workflow_view_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ func (s *SharedServerSuite) TestWorkflow_Show_Follow() {
s.testWorkflowShowFollow(false)
}

func (s *SharedServerSuite) testWorkflowShowFollow(eventDetails bool) {
func (s *SharedServerSuite) testWorkflowShowFollow(detailed bool) {
s.Worker().OnDevWorkflow(func(ctx workflow.Context, a any) (any, error) {
sigs := 0
for {
Expand Down Expand Up @@ -280,8 +280,8 @@ func (s *SharedServerSuite) testWorkflowShowFollow(eventDetails bool) {
"--address", s.Address(),
"-w", run.GetID(),
"--follow"}
if eventDetails {
args = append(args, "--event-details")
if detailed {
args = append(args, "--detailed")
}
res := s.Execute(args...)
outputCh <- res
Expand All @@ -296,18 +296,24 @@ func (s *SharedServerSuite) testWorkflowShowFollow(eventDetails bool) {
res := <-outputCh
s.NoError(res.Err)
output := res.Stdout.String()
if eventDetails {
s.Contains(output, "my-signal")
}
// Confirm result present
s.ContainsOnSameLine(output, "Result", `"hi!"`)
s.NoError(run.Get(s.Context, nil))

// Detailed uses sections, non-detailed uses table
if detailed {
s.Contains(output, "input[0]: ignored")
s.Contains(output, "signalName: my-signal")
} else {
s.Contains(output, "WorkflowExecutionSignaled")
}
Comment on lines +307 to +309
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would show up in both output kinds, right? Non-detailed on the line, and detailed in the section header.

Copy link
Member Author

@cretz cretz Jul 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but before this the test tested nothing about non-detailed event output, so I just thought I'd add it in an else for code clarity reasons (I can do better assertions on detailed). But I can move it out of else.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I'd just assert it either way

}

func (s *SharedServerSuite) TestWorkflow_Show_NoFollow() {
s.testWorkflowShowNoFollow(true)
s.testWorkflowShowNoFollow(false)
}
func (s *SharedServerSuite) testWorkflowShowNoFollow(eventDetails bool) {
func (s *SharedServerSuite) testWorkflowShowNoFollow(detailed bool) {
s.Worker().OnDevWorkflow(func(ctx workflow.Context, a any) (any, error) {
sigs := 0
for {
Expand All @@ -332,8 +338,8 @@ func (s *SharedServerSuite) testWorkflowShowNoFollow(eventDetails bool) {
args := []string{"workflow", "show",
"--address", s.Address(),
"-w", run.GetID()}
if eventDetails {
args = append(args, "--event-details")
if detailed {
args = append(args, "--detailed")
}
res := s.Execute(args...)
s.NoError(res.Err)
Expand All @@ -349,7 +355,7 @@ func (s *SharedServerSuite) testWorkflowShowNoFollow(eventDetails bool) {
res = s.Execute(args...)
s.NoError(res.Err)
out = res.Stdout.String()
if eventDetails {
if detailed {
s.Contains(out, "my-signal")
}
s.ContainsOnSameLine(out, "Result", `"hi!"`)
Expand Down
7 changes: 4 additions & 3 deletions temporalcli/commandsmd/commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -943,8 +943,9 @@ temporal workflow execute

#### Options

* `--event-details` (bool) - If set when using text output, include event details JSON in printed output. If set when
using JSON output, this will include the entire "history" JSON key of the started run (does not follow runs).
* `--detailed` (bool) - If set when using text output, display events as sections with detail instead of simple table.
If set when using JSON output, this will include the entire "history" JSON key of the started run (does not follow
runs).

Includes options set for [shared workflow start](#options-set-for-shared-workflow-start).
Includes options set for [workflow start](#options-set-for-workflow-start).
Expand Down Expand Up @@ -1054,7 +1055,7 @@ Use the options listed below to change the command's behavior.

* `--follow`, `-f` (bool) - Follow the progress of a Workflow Execution in real time (does not apply
to JSON output).
* `--event-details` (bool) - If set when using text output, include event details JSON in printed output.
* `--detailed` (bool) - If set when using text output, display events as sections with detail instead of simple table.

Includes options set for [workflow reference](#options-set-for-workflow-reference).

Expand Down
Loading
Loading