Skip to content

Commit

Permalink
Make workflow show output json usable for replay (#110)
Browse files Browse the repository at this point in the history
* Make workflow show output json good for replay

* Use urfave context for printing pretty json

* Address comment

* .

* Update workflow/workflow_commands.go

Co-authored-by: Alex Shtin <alex@shtin.com>

---------

Co-authored-by: Alex Shtin <alex@shtin.com>
  • Loading branch information
feedmeapples and alexshtin committed Mar 29, 2023
1 parent 9956773 commit 48b50b2
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 45 deletions.
13 changes: 8 additions & 5 deletions common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func GetCurrentUserFromEnv() string {
return "unknown"
}

func PrettyPrintJSONObject(o interface{}) {
func PrettyPrintJSONObject(c *cli.Context, o interface{}) {
var b []byte
var err error
if pb, ok := o.(proto.Message); ok {
Expand All @@ -179,12 +179,15 @@ func PrettyPrintJSONObject(o interface{}) {
b, err = json.MarshalIndent(o, "", " ")
}

w := c.App.Writer

if err != nil {
fmt.Printf("Error when try to print pretty: %v", err)
fmt.Println(o)
fmt.Fprintf(w, "Error when try to print pretty: %v", err)
fmt.Fprintln(w, o)
}
_, _ = os.Stdout.Write(b)
fmt.Println()

_, _ = w.Write(b)
fmt.Fprintln(w)
}

func RequiredFlag(c *cli.Context, optionName string) (string, error) {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (
github.com/olekukonko/tablewriter v0.0.5
github.com/pborman/uuid v1.2.1
github.com/stretchr/testify v1.8.2
github.com/temporalio/tctl-kit v0.0.0-20230104170414-10932650d727
github.com/temporalio/tctl-kit v0.0.0-20230328153839-577f95d16fa0
github.com/temporalio/ui-server/v2 v2.13.0
github.com/urfave/cli/v2 v2.23.6
go.temporal.io/api v1.18.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -812,8 +812,8 @@ github.com/temporalio/ringpop-go v0.0.0-20220818230611-30bf23b490b2 h1:QIwUh2HCt
github.com/temporalio/ringpop-go v0.0.0-20220818230611-30bf23b490b2/go.mod h1:ZEYrWwPO7607ZEaPzK7nWRv55cIrTtH4TeBBu3V532U=
github.com/temporalio/tchannel-go v1.22.1-0.20220818200552-1be8d8cffa5b h1:Fs3LdlF7xbnOWHymbFmvIEuxIEt1dNRCfaDkoajSaZk=
github.com/temporalio/tchannel-go v1.22.1-0.20220818200552-1be8d8cffa5b/go.mod h1:c+V9Z/ZgkzAdyGvHrvC5AsXgN+M9Qwey04cBdKYzV7U=
github.com/temporalio/tctl-kit v0.0.0-20230104170414-10932650d727 h1:Yrisr5sO+sPzc2ATX4LS8K7iM1L1ww71RIbZk8N240Q=
github.com/temporalio/tctl-kit v0.0.0-20230104170414-10932650d727/go.mod h1:hk/LJCKZNNmtVSWRKepbdUJme+k/4fb/hPkekXk40sk=
github.com/temporalio/tctl-kit v0.0.0-20230328153839-577f95d16fa0 h1:E1iAre7/4VvSJri8uOnItKVsMKnP+WEQourm+zVO0cc=
github.com/temporalio/tctl-kit v0.0.0-20230328153839-577f95d16fa0/go.mod h1:hk/LJCKZNNmtVSWRKepbdUJme+k/4fb/hPkekXk40sk=
github.com/temporalio/ui-server/v2 v2.13.0 h1:aKurAPeskgkLYG+1GLm2Cb/1qkoL5wg0gjc776FhRw0=
github.com/temporalio/ui-server/v2 v2.13.0/go.mod h1:8yI8soutsbGEKyOblUZeuo1CPgl4U43+yVYnUrIiNto=
github.com/twmb/murmur3 v1.1.5/go.mod h1:Qq/R7NUyOfr65zD+6Q5IHKsJLwP7exErjN6lyyq3OSQ=
Expand Down
11 changes: 5 additions & 6 deletions schedule/schedule_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/temporalio/tctl-kit/pkg/output"
"github.com/temporalio/tctl-kit/pkg/pager"
"github.com/urfave/cli/v2"
apicommon "go.temporal.io/api/common/v1"
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
schedpb "go.temporal.io/api/schedule/v1"
Expand Down Expand Up @@ -132,7 +131,7 @@ func buildScheduleAction(c *cli.Context) (*schedpb.ScheduleAction, error) {

newWorkflow := &workflowpb.NewWorkflowExecutionInfo{
WorkflowId: wid,
WorkflowType: &apicommon.WorkflowType{Name: workflowType},
WorkflowType: &commonpb.WorkflowType{Name: workflowType},
TaskQueue: &taskqueue.TaskQueue{Name: taskQueue},
Input: inputs,
WorkflowExecutionTimeout: timestamp.DurationPtr(time.Second * time.Duration(et)),
Expand Down Expand Up @@ -202,7 +201,7 @@ func buildSchedule(c *cli.Context) (*schedpb.Schedule, error) {
return sched, nil
}

func getMemoAndSearchAttributesForSchedule(c *cli.Context) (*apicommon.Memo, *apicommon.SearchAttributes, error) {
func getMemoAndSearchAttributesForSchedule(c *cli.Context) (*commonpb.Memo, *commonpb.SearchAttributes, error) {
if memoMap, err := workflow.UnmarshalMemoFromCLI(c); err != nil {
return nil, nil, err
} else if memo, err := encodeMemo(memoMap); err != nil {
Expand Down Expand Up @@ -416,7 +415,7 @@ func DescribeSchedule(c *cli.Context) error {
}

if c.Bool(common.FlagPrintRaw) {
common.PrettyPrintJSONObject(resp)
common.PrettyPrintJSONObject(c, resp)
return nil
}

Expand Down Expand Up @@ -446,7 +445,7 @@ func DescribeSchedule(c *cli.Context) error {
// more convenient copies of values from Info
NextRunTime *time.Time
LastRunTime *time.Time
LastRunExecution *apicommon.WorkflowExecution
LastRunExecution *commonpb.WorkflowExecution
LastRunActualTime *time.Time

Memo map[string]string // json only
Expand Down Expand Up @@ -577,7 +576,7 @@ func ListSchedules(c *cli.Context) error {
Info struct {
NextRunTime *time.Time
LastRunTime *time.Time
LastRunExecution *apicommon.WorkflowExecution
LastRunExecution *commonpb.WorkflowExecution
LastRunActualTime *time.Time
}
}
Expand Down
82 changes: 51 additions & 31 deletions workflow/workflow_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/temporalio/cli/common/stringify"
"github.com/temporalio/cli/dataconverter"
"github.com/temporalio/tctl-kit/pkg/color"
"github.com/temporalio/tctl-kit/pkg/iterator"
"github.com/temporalio/tctl-kit/pkg/output"
"github.com/temporalio/tctl-kit/pkg/pager"
"github.com/urfave/cli/v2"
Expand Down Expand Up @@ -238,28 +239,32 @@ func UnmarshalMemoFromCLI(c *cli.Context) (map[string]interface{}, error) {
return memo, nil
}

type historyIterator struct {
iter interface {
HasNext() bool
Next() (*historypb.HistoryEvent, error)
}
// historyTableIter adapts history iterator for Table output view
type historyTableIter struct {
iter iterator.Iterator[*historypb.HistoryEvent]
maxFieldLength int
lastEvent *historypb.HistoryEvent
wfResult *historypb.HistoryEvent
}

func (h *historyIterator) HasNext() bool {
func (h *historyTableIter) HasNext() bool {
return h.iter.HasNext()
}

func (h *historyIterator) Next() (interface{}, error) {
func (h *historyTableIter) Next() (interface{}, error) {
event, err := h.iter.Next()
if err != nil {
return nil, err
}

reflect.ValueOf(h.lastEvent).Elem().Set(reflect.ValueOf(event).Elem())
reflect.ValueOf(h.wfResult).Elem().Set(reflect.ValueOf(event).Elem())

return eventRow{
// adapted structure for Table output view
return struct {
ID string
Time string
Type string
Details string
}{
ID: convert.Int64ToString(event.GetEventId()),
Time: common.FormatTime(timestamp.TimeValue(event.GetEventTime()), false),
Type: common.ColorEvent(event),
Expand Down Expand Up @@ -292,18 +297,23 @@ func printWorkflowProgress(c *cli.Context, wid, rid string, watch bool) error {
fmt.Println(color.Magenta(c, "Progress:"))
}

var lastEvent historypb.HistoryEvent // used for print result of this run
var wfResult historypb.HistoryEvent

po := &output.PrintOptions{
Fields: []string{"ID", "Time", "Type"},
FieldsLong: []string{"Details"},
Pager: pager.Less,
}
errChan := make(chan error)
go func() {
hIter := sdkClient.GetWorkflowHistory(tcCtx, wid, rid, watch, enumspb.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT)
iter := &historyIterator{iter: hIter, maxFieldLength: maxFieldLength, lastEvent: &lastEvent}
err = output.PrintIterator(c, iter, po)
iter := sdkClient.GetWorkflowHistory(tcCtx, wid, rid, watch, enumspb.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT)
if isJSON {
printReplayableHistory(c, iter)
} else {
hIter := &historyTableIter{iter: iter, maxFieldLength: maxFieldLength, wfResult: &wfResult}
po := &output.PrintOptions{
Fields: []string{"ID", "Time", "Type"},
FieldsLong: []string{"Details"},
Pager: pager.Less,
}
err = output.PrintIterator(c, hIter, po)
}

if err != nil {
errChan <- err
return
Expand Down Expand Up @@ -334,7 +344,7 @@ func printWorkflowProgress(c *cli.Context, wid, rid string, watch bool) error {
if watch {
fmt.Printf(" Run Time: %d seconds\n", timeElapsed)
}
printRunStatus(c, &lastEvent)
printRunStatus(c, &wfResult)
}
return nil
case err = <-errChan:
Expand All @@ -343,6 +353,24 @@ func printWorkflowProgress(c *cli.Context, wid, rid string, watch bool) error {
}
}

func printReplayableHistory(c *cli.Context, iter iterator.Iterator[*historypb.HistoryEvent]) error {
var events []*historypb.HistoryEvent
for iter.HasNext() {
event, err := iter.Next()
if err != nil {
return err

}
events = append(events, event)
}

history := &historypb.History{Events: events}

common.PrettyPrintJSONObject(c, history)

return nil
}

func TerminateWorkflow(c *cli.Context) error {
if c.String(common.FlagQuery) != "" {
return batch.BatchTerminate(c)
Expand Down Expand Up @@ -661,9 +689,9 @@ func DescribeWorkflow(c *cli.Context) error {
}

if printRaw {
common.PrettyPrintJSONObject(resp)
common.PrettyPrintJSONObject(c, resp)
} else {
common.PrettyPrintJSONObject(convertDescribeWorkflowExecutionResponse(c, resp))
common.PrettyPrintJSONObject(c, convertDescribeWorkflowExecutionResponse(c, resp))
}

return nil
Expand Down Expand Up @@ -800,7 +828,6 @@ func printRunStatus(c *cli.Context, event *historypb.HistoryEvent) {
func ShowHistory(c *cli.Context) error {
wid := c.String(common.FlagWorkflowID)
rid := c.String(common.FlagRunID)

follow := c.Bool(output.FlagFollow)

return printWorkflowProgress(c, wid, rid, follow)
Expand Down Expand Up @@ -862,7 +889,7 @@ func ResetWorkflow(c *cli.Context) error {
if err != nil {
return fmt.Errorf("reset failed: %w", err)
}
common.PrettyPrintJSONObject(resp)
common.PrettyPrintJSONObject(c, resp)
return nil
}

Expand Down Expand Up @@ -1418,13 +1445,6 @@ func TraceWorkflow(c *cli.Context) error {
return nil
}

type eventRow struct {
ID string
Time string
Type string
Details string
}

// this only works for ANSI terminal, which means remove existing lines won't work if users redirect to file
// ref: https://en.wikipedia.org/wiki/ANSI_escape_code
func removePrevious2LinesFromTerminal() {
Expand Down

0 comments on commit 48b50b2

Please sign in to comment.