Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DP-1762] - topicctl get actions for under replicated and offline partitions #149

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions cmd/topicctl/subcmd/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/segmentio/topicctl/pkg/cli"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
prefixed "github.com/x-cray/logrus-prefixed-formatter"
)

var getCmd = &cobra.Command{
Expand All @@ -31,7 +32,24 @@ type getCmdConfig struct {

var getConfig getCmdConfig

type urpCmdConfig struct {
topics []string
}

var urpConfig urpCmdConfig

type opCmdConfig struct {
topics []string
}

var opConfig urpCmdConfig
Comment on lines +35 to +45
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using abbreviations for under-replicated-partitions and offline-partitions breaks the pattern used everywhere else. Even within these two new commands, it's not consistent: you have urpCmdConfg, underReplicatedPartitionsCmd, GetUnderReplicatedPartitions, urpCommand, FormatURPs, etc.

I realize they are long but I think it's better not to use abbreviations and be more consistent.


func init() {
log.SetFormatter(&prefixed.TextFormatter{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This log format and the debug flag should already be getting set by the root command that these subcommands are added to

RootCmd.PersistentFlags().BoolVar(

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see, we are overriding them, nevermind

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can remove some duplicate code by adding this to the get subcommands prerun

rootCmd.PersistentPreRun(cmd, args)

spf13/cobra#252 (comment)

TimestampFormat: "2006-01-02 15:04:05",
FullTimestamp: true,
})

getCmd.PersistentFlags().BoolVar(
&getConfig.full,
"full",
Expand All @@ -44,6 +62,12 @@ func init() {
false,
"Sort by value instead of name; only applies for lags at the moment",
)
getCmd.PersistentFlags().BoolVar(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

--debug is noted as being a "global flag". Isn't this already set somewhere higher up the stack?

&debug,
"debug",
false,
"enable debug logging",
)
addSharedFlags(getCmd, &getConfig.shared)
getCmd.AddCommand(
balanceCmd(),
Expand All @@ -55,11 +79,16 @@ func init() {
partitionsCmd(),
offsetsCmd(),
topicsCmd(),
underReplicatedPartitionsCmd(),
offlinePartitionsCmd(),
)
RootCmd.AddCommand(getCmd)
}

func getPreRun(cmd *cobra.Command, args []string) error {
if debug {
log.SetLevel(log.DebugLevel)
}
return getConfig.shared.validate()
}

Expand Down Expand Up @@ -234,3 +263,53 @@ func topicsCmd() *cobra.Command {
},
}
}

func underReplicatedPartitionsCmd() *cobra.Command {
urpCommand := &cobra.Command{
Use: "under-replicated-partitions",
Short: "Fetch all under replicated partitions in the kafka cluster.",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The term "under-replicated partitions", when output, is inconsistently written. I would suggest "under-replicated partitions" (with the hyphen between under and replicated, and not capitalized) because that is most grammatically correct IMO.

Args: cobra.MinimumNArgs(0),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you want cobra.NoArgs, right?

RunE: func(cmd *cobra.Command, args []string) error {
ctx, cliRunner, err := getCliRunnerAndCtx()
if err != nil {
return err
}
return cliRunner.GetUnderReplicatedPartitions(ctx, urpConfig.topics)
},
}

urpCommand.Flags().StringSliceVarP(
&urpConfig.topics,
"topics",
"t",
[]string{},
"under replicated partitions for the topics",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

under-replicated partitions

)

return urpCommand
}

func offlinePartitionsCmd() *cobra.Command {
opCommand := &cobra.Command{
Use: "offline-partitions",
Short: "Fetch all offline partitions in the kafka cluster.",
Args: cobra.MinimumNArgs(0),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you want cobra.NoArgs, right?

RunE: func(cmd *cobra.Command, args []string) error {
ctx, cliRunner, err := getCliRunnerAndCtx()
if err != nil {
return err
}
return cliRunner.GetOfflinePartitions(ctx, opConfig.topics)
},
}

opCommand.Flags().StringSliceVarP(
&opConfig.topics,
"topics",
"t",
[]string{},
"offline partitions for the topics",
)

return opCommand
}
114 changes: 114 additions & 0 deletions pkg/admin/format.go
Original file line number Diff line number Diff line change
Expand Up @@ -884,3 +884,117 @@ func maxMapValues(inputMap map[int]int) int {

return maxValue
}

// FormatURPs creates a pretty table that lists the details of the
// under replicated topic partitions
func FormatURPs(allTopicURPs []TopicURPsInfo) string {
buf := &bytes.Buffer{}

headers := []string{
"Name",
"Partition",
"Leader",
"ISR",
"Replicas",
}

table := tablewriter.NewWriter(buf)
table.SetHeader(headers)
table.SetAutoWrapText(false)
table.SetColumnAlignment(
[]int{
tablewriter.ALIGN_LEFT,
tablewriter.ALIGN_LEFT,
tablewriter.ALIGN_LEFT,
tablewriter.ALIGN_LEFT,
tablewriter.ALIGN_LEFT,
},
)
table.SetBorders(
tablewriter.Border{
Left: false,
Top: true,
Right: false,
Bottom: true,
},
)

for _, topicURPs := range allTopicURPs {
for _, topicPartition := range topicURPs.Partitions {
row := []string{
topicURPs.Name,
fmt.Sprintf("%d", topicPartition.ID),
fmt.Sprintf("%d", topicPartition.Leader),
fmt.Sprintf("%+v", topicPartition.ISR),
fmt.Sprintf("%+v", topicPartition.Replicas),
}

table.Append(row)
}
}

table.Render()
return string(bytes.TrimRight(buf.Bytes(), "\n"))
}

// FormatOPs creates a pretty table that lists the details of the
// offline topic partitions
func FormatOPs(allTopicOPs []TopicOPsInfo) string {
buf := &bytes.Buffer{}

headers := []string{
"Name",
"Partition",
"Leader",
"ISR",
"Replicas",
}

table := tablewriter.NewWriter(buf)
table.SetHeader(headers)
table.SetAutoWrapText(false)
table.SetColumnAlignment(
[]int{
tablewriter.ALIGN_LEFT,
tablewriter.ALIGN_LEFT,
tablewriter.ALIGN_LEFT,
tablewriter.ALIGN_LEFT,
tablewriter.ALIGN_LEFT,
},
)
table.SetBorders(
tablewriter.Border{
Left: false,
Top: true,
Right: false,
Bottom: true,
},
)

for _, topicOPs := range allTopicOPs {
for _, topicPartition := range topicOPs.Partitions {
topicPartitionIsrs := []int{}
for _, topicPartitionIsr := range topicPartition.Isr {
topicPartitionIsrs = append(topicPartitionIsrs, topicPartitionIsr.ID)
}

topicPartitionReplicas := []int{}
for _, topicPartitionReplica := range topicPartition.Replicas {
topicPartitionReplicas = append(topicPartitionReplicas, topicPartitionReplica.ID)
}

row := []string{
topicOPs.Name,
fmt.Sprintf("%d", topicPartition.ID),
fmt.Sprintf("%d", topicPartition.Leader.ID),
fmt.Sprintf("%+v", topicPartitionIsrs),
fmt.Sprintf("%+v", topicPartitionReplicas),
}

table.Append(row)
}
}

table.Render()
return string(bytes.TrimRight(buf.Bytes(), "\n"))
}
11 changes: 11 additions & 0 deletions pkg/admin/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strconv"
"time"

"github.com/segmentio/kafka-go"
"github.com/segmentio/topicctl/pkg/util"
)

Expand Down Expand Up @@ -136,6 +137,16 @@ type zkChangeNotification struct {
EntityPath string `json:"entity_path"`
}

type TopicURPsInfo struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The more I think about it, do we even need these new types? Couldn't we just reuse TopicInfo and leave the unused fields blank?

Name string `json:"name"`
Partitions []PartitionInfo `json:"partitions"`
}

type TopicOPsInfo struct {
Name string `json:"name"`
Partitions []kafka.Partition `json:"partitions"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should be converting []kafka.Partition into []PartitionInfo for consistency

}

// Addr returns the address of the current BrokerInfo.
func (b BrokerInfo) Addr() string {
return fmt.Sprintf("%s:%d", b.Host, b.Port)
Expand Down
Loading
Loading