Skip to content

Commit

Permalink
Enhance Task Queue Describe (#467)
Browse files Browse the repository at this point in the history
  • Loading branch information
Sushisource committed Feb 17, 2024
1 parent fee97f7 commit f6758c8
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 6 deletions.
2 changes: 2 additions & 0 deletions temporalcli/commands.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,7 @@ type TemporalTaskQueueDescribeCommand struct {
Command cobra.Command
TaskQueue string
TaskQueueType StringEnum
Partitions int
}

func NewTemporalTaskQueueDescribeCommand(cctx *CommandContext, parent *TemporalTaskQueueCommand) *TemporalTaskQueueDescribeCommand {
Expand All @@ -505,6 +506,7 @@ func NewTemporalTaskQueueDescribeCommand(cctx *CommandContext, parent *TemporalT
_ = cobra.MarkFlagRequired(s.Command.Flags(), "task-queue")
s.TaskQueueType = NewStringEnum([]string{"workflow", "activity"}, "workflow")
s.Command.Flags().Var(&s.TaskQueueType, "task-queue-type", "Task Queue type. Accepted values: workflow, activity.")
s.Command.Flags().IntVar(&s.Partitions, "partitions", 1, "Query for all partitions up to this number (experimental+temporary feature).")
s.Command.Run = func(c *cobra.Command, args []string) {
if err := s.run(cctx, args); err != nil {
cctx.Options.Fail(err)
Expand Down
60 changes: 55 additions & 5 deletions temporalcli/commands.taskqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@ package temporalcli

import (
"fmt"
commonpb "go.temporal.io/api/common/v1"
"time"

"github.com/fatih/color"
"github.com/temporalio/cli/temporalcli/internal/printer"
"go.temporal.io/api/enums/v1"
"go.temporal.io/api/taskqueue/v1"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/server/common/tqname"
)

func (c *TemporalTaskQueueDescribeCommand) run(cctx *CommandContext, args []string) error {
Expand All @@ -25,14 +29,60 @@ func (c *TemporalTaskQueueDescribeCommand) run(cctx *CommandContext, args []stri
default:
return fmt.Errorf("unrecognized task queue type: %q", c.TaskQueueType.Value)
}
resp, err := cl.DescribeTaskQueue(cctx, c.TaskQueue, taskQueueType)

taskQueueName, err := tqname.FromBaseName(c.TaskQueue)
if err != nil {
return fmt.Errorf("failed describing task queue")
return fmt.Errorf("failed to parse task queue name: %w", err)
}
partitions := c.Partitions

type statusWithPartition struct {
Partition int `json:"partition"`
taskqueue.TaskQueueStatus
}
type pollerWithPartition struct {
Partition int `json:"partition"`
taskqueue.PollerInfo
// copy this out to display nicer in table or card, but not json
Versioning *commonpb.WorkerVersionCapabilities `json:"-"`
}

var statuses []*statusWithPartition
var pollers []*pollerWithPartition

// TODO: remove this when the server does partition fan-out
for p := 0; p < partitions; p++ {
resp, err := cl.WorkflowService().DescribeTaskQueue(cctx, &workflowservice.DescribeTaskQueueRequest{
Namespace: c.Parent.Namespace,
TaskQueue: &taskqueue.TaskQueue{
Name: taskQueueName.WithPartition(p).FullName(),
Kind: enums.TASK_QUEUE_KIND_NORMAL,
},
TaskQueueType: taskQueueType,
IncludeTaskQueueStatus: true,
})
if err != nil {
return fmt.Errorf("unable to describe task queue: %w", err)
}
statuses = append(statuses, &statusWithPartition{
Partition: p,
TaskQueueStatus: *resp.TaskQueueStatus,
})
for _, pi := range resp.Pollers {
pollers = append(pollers, &pollerWithPartition{
Partition: p,
PollerInfo: *pi,
Versioning: pi.WorkerVersionCapabilities,
})
}
}

// For JSON, we'll just dump the proto
if cctx.JSONOutput {
return cctx.Printer.PrintStructured(resp, printer.StructuredOptions{})
return cctx.Printer.PrintStructured(map[string]any{
"taskQueues": statuses,
"pollers": pollers,
}, printer.StructuredOptions{})
}

// For text, we will use a table for pollers
Expand All @@ -41,8 +91,8 @@ func (c *TemporalTaskQueueDescribeCommand) run(cctx *CommandContext, args []stri
Identity string
LastAccessTime time.Time
RatePerSecond float64
}, len(resp.Pollers))
for i, poller := range resp.Pollers {
}, len(pollers))
for i, poller := range pollers {
items[i].Identity = poller.Identity
items[i].LastAccessTime = poller.LastAccessTime.AsTime()
items[i].RatePerSecond = poller.RatePerSecond
Expand Down
16 changes: 15 additions & 1 deletion temporalcli/commands.taskqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,23 @@ func (s *SharedServerSuite) TestTaskQueue_Describe_Simple() {
)
s.NoError(res.Err)
var jsonOut struct {
Pollers []map[string]any `json:"pollers"`
Pollers []map[string]any `json:"pollers"`
TaskQueues []map[string]any `json:"taskQueues"`
}
s.NoError(json.Unmarshal(res.Stdout.Bytes(), &jsonOut))
s.Equal(1, len(jsonOut.TaskQueues))
// Check identity in the output
s.Equal(s.DevServer.Options.ClientOptions.Identity, jsonOut.Pollers[0]["identity"])

// Multiple partitions
res = s.Execute(
"task-queue", "describe",
"-o", "json",
"--address", s.Address(),
"--task-queue", s.Worker.Options.TaskQueue,
"--partitions", "10",
)
s.NoError(res.Err)
s.NoError(json.Unmarshal(res.Stdout.Bytes(), &jsonOut))
s.GreaterOrEqual(10, len(jsonOut.TaskQueues))
}
1 change: 1 addition & 0 deletions temporalcli/commandsmd/commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ Use the options listed below to modify what this command returns.

* `--task-queue`, `-t` (string) - Task queue name. Required.
* `--task-queue-type` (string-enum) - Task Queue type. Options: workflow, activity. Default: workflow.
* `--partitions` (int) - Query for all partitions up to this number (experimental+temporary feature). Default: 1.

### temporal workflow: Start, list, and operate on Workflows.

Expand Down

0 comments on commit f6758c8

Please sign in to comment.