From f6758c8abf2732c076eeb13c52616fe7e7d6a6fa Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Fri, 16 Feb 2024 16:53:13 -0800 Subject: [PATCH] Enhance Task Queue Describe (#467) --- temporalcli/commands.gen.go | 2 + temporalcli/commands.taskqueue.go | 60 +++++++++++++++++++++++--- temporalcli/commands.taskqueue_test.go | 16 ++++++- temporalcli/commandsmd/commands.md | 1 + 4 files changed, 73 insertions(+), 6 deletions(-) diff --git a/temporalcli/commands.gen.go b/temporalcli/commands.gen.go index 4ef85844..ed44b46a 100644 --- a/temporalcli/commands.gen.go +++ b/temporalcli/commands.gen.go @@ -487,6 +487,7 @@ type TemporalTaskQueueDescribeCommand struct { Command cobra.Command TaskQueue string TaskQueueType StringEnum + Partitions int } func NewTemporalTaskQueueDescribeCommand(cctx *CommandContext, parent *TemporalTaskQueueCommand) *TemporalTaskQueueDescribeCommand { @@ -505,6 +506,7 @@ func NewTemporalTaskQueueDescribeCommand(cctx *CommandContext, parent *TemporalT _ = cobra.MarkFlagRequired(s.Command.Flags(), "task-queue") s.TaskQueueType = NewStringEnum([]string{"workflow", "activity"}, "workflow") s.Command.Flags().Var(&s.TaskQueueType, "task-queue-type", "Task Queue type. Accepted values: workflow, activity.") + s.Command.Flags().IntVar(&s.Partitions, "partitions", 1, "Query for all partitions up to this number (experimental+temporary feature).") s.Command.Run = func(c *cobra.Command, args []string) { if err := s.run(cctx, args); err != nil { cctx.Options.Fail(err) diff --git a/temporalcli/commands.taskqueue.go b/temporalcli/commands.taskqueue.go index eec40bf8..762b43a2 100644 --- a/temporalcli/commands.taskqueue.go +++ b/temporalcli/commands.taskqueue.go @@ -2,11 +2,15 @@ package temporalcli import ( "fmt" + commonpb "go.temporal.io/api/common/v1" "time" "github.com/fatih/color" "github.com/temporalio/cli/temporalcli/internal/printer" "go.temporal.io/api/enums/v1" + "go.temporal.io/api/taskqueue/v1" + "go.temporal.io/api/workflowservice/v1" + "go.temporal.io/server/common/tqname" ) func (c *TemporalTaskQueueDescribeCommand) run(cctx *CommandContext, args []string) error { @@ -25,14 +29,60 @@ func (c *TemporalTaskQueueDescribeCommand) run(cctx *CommandContext, args []stri default: return fmt.Errorf("unrecognized task queue type: %q", c.TaskQueueType.Value) } - resp, err := cl.DescribeTaskQueue(cctx, c.TaskQueue, taskQueueType) + + taskQueueName, err := tqname.FromBaseName(c.TaskQueue) if err != nil { - return fmt.Errorf("failed describing task queue") + return fmt.Errorf("failed to parse task queue name: %w", err) + } + partitions := c.Partitions + + type statusWithPartition struct { + Partition int `json:"partition"` + taskqueue.TaskQueueStatus + } + type pollerWithPartition struct { + Partition int `json:"partition"` + taskqueue.PollerInfo + // copy this out to display nicer in table or card, but not json + Versioning *commonpb.WorkerVersionCapabilities `json:"-"` + } + + var statuses []*statusWithPartition + var pollers []*pollerWithPartition + + // TODO: remove this when the server does partition fan-out + for p := 0; p < partitions; p++ { + resp, err := cl.WorkflowService().DescribeTaskQueue(cctx, &workflowservice.DescribeTaskQueueRequest{ + Namespace: c.Parent.Namespace, + TaskQueue: &taskqueue.TaskQueue{ + Name: taskQueueName.WithPartition(p).FullName(), + Kind: enums.TASK_QUEUE_KIND_NORMAL, + }, + TaskQueueType: taskQueueType, + IncludeTaskQueueStatus: true, + }) + if err != nil { + return fmt.Errorf("unable to describe task queue: %w", err) + } + statuses = append(statuses, &statusWithPartition{ + Partition: p, + TaskQueueStatus: *resp.TaskQueueStatus, + }) + for _, pi := range resp.Pollers { + pollers = append(pollers, &pollerWithPartition{ + Partition: p, + PollerInfo: *pi, + Versioning: pi.WorkerVersionCapabilities, + }) + } } // For JSON, we'll just dump the proto if cctx.JSONOutput { - return cctx.Printer.PrintStructured(resp, printer.StructuredOptions{}) + return cctx.Printer.PrintStructured(map[string]any{ + "taskQueues": statuses, + "pollers": pollers, + }, printer.StructuredOptions{}) } // For text, we will use a table for pollers @@ -41,8 +91,8 @@ func (c *TemporalTaskQueueDescribeCommand) run(cctx *CommandContext, args []stri Identity string LastAccessTime time.Time RatePerSecond float64 - }, len(resp.Pollers)) - for i, poller := range resp.Pollers { + }, len(pollers)) + for i, poller := range pollers { items[i].Identity = poller.Identity items[i].LastAccessTime = poller.LastAccessTime.AsTime() items[i].RatePerSecond = poller.RatePerSecond diff --git a/temporalcli/commands.taskqueue_test.go b/temporalcli/commands.taskqueue_test.go index 0246595e..ef655166 100644 --- a/temporalcli/commands.taskqueue_test.go +++ b/temporalcli/commands.taskqueue_test.go @@ -39,9 +39,23 @@ func (s *SharedServerSuite) TestTaskQueue_Describe_Simple() { ) s.NoError(res.Err) var jsonOut struct { - Pollers []map[string]any `json:"pollers"` + Pollers []map[string]any `json:"pollers"` + TaskQueues []map[string]any `json:"taskQueues"` } s.NoError(json.Unmarshal(res.Stdout.Bytes(), &jsonOut)) + s.Equal(1, len(jsonOut.TaskQueues)) // Check identity in the output s.Equal(s.DevServer.Options.ClientOptions.Identity, jsonOut.Pollers[0]["identity"]) + + // Multiple partitions + res = s.Execute( + "task-queue", "describe", + "-o", "json", + "--address", s.Address(), + "--task-queue", s.Worker.Options.TaskQueue, + "--partitions", "10", + ) + s.NoError(res.Err) + s.NoError(json.Unmarshal(res.Stdout.Bytes(), &jsonOut)) + s.GreaterOrEqual(10, len(jsonOut.TaskQueues)) } diff --git a/temporalcli/commandsmd/commands.md b/temporalcli/commandsmd/commands.md index df3d6deb..a79b9916 100644 --- a/temporalcli/commandsmd/commands.md +++ b/temporalcli/commandsmd/commands.md @@ -252,6 +252,7 @@ Use the options listed below to modify what this command returns. * `--task-queue`, `-t` (string) - Task queue name. Required. * `--task-queue-type` (string-enum) - Task Queue type. Options: workflow, activity. Default: workflow. +* `--partitions` (int) - Query for all partitions up to this number (experimental+temporary feature). Default: 1. ### temporal workflow: Start, list, and operate on Workflows.