diff --git a/cmd/topicctl/subcmd/get.go b/cmd/topicctl/subcmd/get.go index dd1a4c28..3c9c1769 100644 --- a/cmd/topicctl/subcmd/get.go +++ b/cmd/topicctl/subcmd/get.go @@ -8,6 +8,7 @@ import ( "github.com/segmentio/topicctl/pkg/cli" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" + prefixed "github.com/x-cray/logrus-prefixed-formatter" ) var getCmd = &cobra.Command{ @@ -31,7 +32,24 @@ type getCmdConfig struct { var getConfig getCmdConfig +type urpCmdConfig struct { + topics []string +} + +var urpConfig urpCmdConfig + +type opCmdConfig struct { + topics []string +} + +var opConfig urpCmdConfig + func init() { + log.SetFormatter(&prefixed.TextFormatter{ + TimestampFormat: "2006-01-02 15:04:05", + FullTimestamp: true, + }) + getCmd.PersistentFlags().BoolVar( &getConfig.full, "full", @@ -44,6 +62,12 @@ func init() { false, "Sort by value instead of name; only applies for lags at the moment", ) + getCmd.PersistentFlags().BoolVar( + &debug, + "debug", + false, + "enable debug logging", + ) addSharedFlags(getCmd, &getConfig.shared) getCmd.AddCommand( balanceCmd(), @@ -55,11 +79,16 @@ func init() { partitionsCmd(), offsetsCmd(), topicsCmd(), + underReplicatedPartitionsCmd(), + offlinePartitionsCmd(), ) RootCmd.AddCommand(getCmd) } func getPreRun(cmd *cobra.Command, args []string) error { + if debug { + log.SetLevel(log.DebugLevel) + } return getConfig.shared.validate() } @@ -234,3 +263,53 @@ func topicsCmd() *cobra.Command { }, } } + +func underReplicatedPartitionsCmd() *cobra.Command { + urpCommand := &cobra.Command{ + Use: "under-replicated-partitions", + Short: "Fetch all under replicated partitions in the kafka cluster.", + Args: cobra.MinimumNArgs(0), + RunE: func(cmd *cobra.Command, args []string) error { + ctx, cliRunner, err := getCliRunnerAndCtx() + if err != nil { + return err + } + return cliRunner.GetUnderReplicatedPartitions(ctx, urpConfig.topics) + }, + } + + urpCommand.Flags().StringSliceVarP( + &urpConfig.topics, + "topics", + "t", + []string{}, + "under replicated partitions for the topics", + ) + + return urpCommand +} + +func offlinePartitionsCmd() *cobra.Command { + opCommand := &cobra.Command{ + Use: "offline-partitions", + Short: "Fetch all offline partitions in the kafka cluster.", + Args: cobra.MinimumNArgs(0), + RunE: func(cmd *cobra.Command, args []string) error { + ctx, cliRunner, err := getCliRunnerAndCtx() + if err != nil { + return err + } + return cliRunner.GetOfflinePartitions(ctx, opConfig.topics) + }, + } + + opCommand.Flags().StringSliceVarP( + &opConfig.topics, + "topics", + "t", + []string{}, + "offline partitions for the topics", + ) + + return opCommand +} diff --git a/pkg/admin/format.go b/pkg/admin/format.go index c6b9c64f..e5375d48 100644 --- a/pkg/admin/format.go +++ b/pkg/admin/format.go @@ -884,3 +884,117 @@ func maxMapValues(inputMap map[int]int) int { return maxValue } + +// FormatURPs creates a pretty table that lists the details of the +// under replicated topic partitions +func FormatURPs(allTopicURPs []TopicURPsInfo) string { + buf := &bytes.Buffer{} + + headers := []string{ + "Name", + "Partition", + "Leader", + "ISR", + "Replicas", + } + + table := tablewriter.NewWriter(buf) + table.SetHeader(headers) + table.SetAutoWrapText(false) + table.SetColumnAlignment( + []int{ + tablewriter.ALIGN_LEFT, + tablewriter.ALIGN_LEFT, + tablewriter.ALIGN_LEFT, + tablewriter.ALIGN_LEFT, + tablewriter.ALIGN_LEFT, + }, + ) + table.SetBorders( + tablewriter.Border{ + Left: false, + Top: true, + Right: false, + Bottom: true, + }, + ) + + for _, topicURPs := range allTopicURPs { + for _, topicPartition := range topicURPs.Partitions { + row := []string{ + topicURPs.Name, + fmt.Sprintf("%d", topicPartition.ID), + fmt.Sprintf("%d", topicPartition.Leader), + fmt.Sprintf("%+v", topicPartition.ISR), + fmt.Sprintf("%+v", topicPartition.Replicas), + } + + table.Append(row) + } + } + + table.Render() + return string(bytes.TrimRight(buf.Bytes(), "\n")) +} + +// FormatOPs creates a pretty table that lists the details of the +// offline topic partitions +func FormatOPs(allTopicOPs []TopicOPsInfo) string { + buf := &bytes.Buffer{} + + headers := []string{ + "Name", + "Partition", + "Leader", + "ISR", + "Replicas", + } + + table := tablewriter.NewWriter(buf) + table.SetHeader(headers) + table.SetAutoWrapText(false) + table.SetColumnAlignment( + []int{ + tablewriter.ALIGN_LEFT, + tablewriter.ALIGN_LEFT, + tablewriter.ALIGN_LEFT, + tablewriter.ALIGN_LEFT, + tablewriter.ALIGN_LEFT, + }, + ) + table.SetBorders( + tablewriter.Border{ + Left: false, + Top: true, + Right: false, + Bottom: true, + }, + ) + + for _, topicOPs := range allTopicOPs { + for _, topicPartition := range topicOPs.Partitions { + topicPartitionIsrs := []int{} + for _, topicPartitionIsr := range topicPartition.Isr { + topicPartitionIsrs = append(topicPartitionIsrs, topicPartitionIsr.ID) + } + + topicPartitionReplicas := []int{} + for _, topicPartitionReplica := range topicPartition.Replicas { + topicPartitionReplicas = append(topicPartitionReplicas, topicPartitionReplica.ID) + } + + row := []string{ + topicOPs.Name, + fmt.Sprintf("%d", topicPartition.ID), + fmt.Sprintf("%d", topicPartition.Leader.ID), + fmt.Sprintf("%+v", topicPartitionIsrs), + fmt.Sprintf("%+v", topicPartitionReplicas), + } + + table.Append(row) + } + } + + table.Render() + return string(bytes.TrimRight(buf.Bytes(), "\n")) +} diff --git a/pkg/admin/types.go b/pkg/admin/types.go index 0221a39b..f7863bb5 100644 --- a/pkg/admin/types.go +++ b/pkg/admin/types.go @@ -8,6 +8,7 @@ import ( "strconv" "time" + "github.com/segmentio/kafka-go" "github.com/segmentio/topicctl/pkg/util" ) @@ -136,6 +137,16 @@ type zkChangeNotification struct { EntityPath string `json:"entity_path"` } +type TopicURPsInfo struct { + Name string `json:"name"` + Partitions []PartitionInfo `json:"partitions"` +} + +type TopicOPsInfo struct { + Name string `json:"name"` + Partitions []kafka.Partition `json:"partitions"` +} + // Addr returns the address of the current BrokerInfo. func (b BrokerInfo) Addr() string { return fmt.Sprintf("%s:%d", b.Host, b.Port) diff --git a/pkg/cli/cli.go b/pkg/cli/cli.go index 7b0542c7..f62b33ca 100644 --- a/pkg/cli/cli.go +++ b/pkg/cli/cli.go @@ -14,6 +14,7 @@ import ( "github.com/briandowns/spinner" "github.com/fatih/color" + "github.com/segmentio/kafka-go" "github.com/segmentio/topicctl/pkg/admin" "github.com/segmentio/topicctl/pkg/apply" "github.com/segmentio/topicctl/pkg/check" @@ -487,6 +488,138 @@ func (c *CLIRunner) GetTopics(ctx context.Context, full bool) error { return nil } +// GetUnderReplicatedPartitions fetch all under replicated partitions and prints out a summary. +func (c *CLIRunner) GetUnderReplicatedPartitions(ctx context.Context, topics []string) error { + c.startSpinner() + urpsFound := false + + filterTopics := make(map[string]bool) + + topicsInfo, err := c.adminClient.GetTopics(ctx, nil, false) + if err != nil { + c.stopSpinner() + return err + } + + for _, topic := range topics { + filterTopics[topic] = true + } + + allTopicURPs := []admin.TopicURPsInfo{} + for _, topicInfo := range topicsInfo { + if len(topics) != 0 && !filterTopics[topicInfo.Name] { + continue + } + + topicURPs := []admin.PartitionInfo{} + for _, partition := range topicInfo.Partitions { + if len(partition.ISR) < len(partition.Replicas) { + topicURPs = append(topicURPs, partition) + } + } + + if len(topicURPs) == 0 { + continue + } + + allTopicURPs = append(allTopicURPs, admin.TopicURPsInfo{ + Name: topicInfo.Name, + Partitions: topicURPs, + }) + + urpsFound = true + } + c.stopSpinner() + + c.printer("Under Replicated Partitions:\n%s", admin.FormatURPs(allTopicURPs)) + + if urpsFound { + return fmt.Errorf("Topics have Under Replicated Partitions") + } + + return nil +} + +// GetOfflinePartitions fetch all offline partitions and prints out a summary. +func (c *CLIRunner) GetOfflinePartitions(ctx context.Context, topics []string) error { + c.startSpinner() + opsFound := false + + topicsInfo, err := c.adminClient.GetTopics(ctx, nil, false) + if err != nil { + c.stopSpinner() + return err + } + + allTopics := []string{} + for _, topicInfo := range topicsInfo { + allTopics = append(allTopics, topicInfo.Name) + } + + filterTopics := make(map[string]bool) + for _, topic := range topics { + filterTopics[topic] = true + } + + client := c.adminClient.GetConnector().KafkaClient + req := kafka.MetadataRequest{ + Topics: allTopics, + } + + log.Debugf("Metadata request: %+v", req) + metadata, err := client.Metadata(ctx, &req) + if err != nil { + c.stopSpinner() + return err + } + + // If ListenerNotFound Error occurs for leader partition + // this means the partition is offline + var listenerNotFoundError kafka.Error + listenerNotFoundError = 72 + allTopicOPs := []admin.TopicOPsInfo{} + + for _, topicMetadata := range metadata.Topics { + if topicMetadata.Error != nil { + log.Println("topic metadata error:", topicMetadata.Error) + return topicMetadata.Error + } + + if len(topics) != 0 && !filterTopics[topicMetadata.Name] { + continue + } + + topicOPs := []kafka.Partition{} + for _, partition := range topicMetadata.Partitions { + if partition.Leader.Host == "" && partition.Leader.Port == 0 && + listenerNotFoundError.Error() == partition.Error.Error() { + topicOPs = append(topicOPs, partition) + } + } + + if len(topicOPs) == 0 { + continue + } + + allTopicOPs = append(allTopicOPs, admin.TopicOPsInfo{ + Name: topicMetadata.Name, + Partitions: topicOPs, + }) + + opsFound = true + } + + c.stopSpinner() + + c.printer("Offline Partitions:\n%s", admin.FormatOPs(allTopicOPs)) + + if opsFound { + return fmt.Errorf("Topics have Offline Partitions") + } + + return nil +} + // ResetOffsets resets the offsets for a single consumer group / topic combination. func (c *CLIRunner) ResetOffsets( ctx context.Context,