From 5c98b452e1cfb8dc71345e37a2f4fd5f985568d1 Mon Sep 17 00:00:00 2001 From: Damien Gasparina Date: Tue, 12 Jan 2021 16:09:02 +0100 Subject: [PATCH 1/5] Separate collectors & adding collectors for Connect and ksql --- cmd/ccloudexporter/ccloudexporter.go | 2 +- cmd/internal/collector/collector.go | 183 +++++------------ cmd/internal/collector/collector_connector.go | 171 ++++++++++++++++ cmd/internal/collector/collector_kafka.go | 186 ++++++++++++++++++ cmd/internal/collector/collector_ksql.go | 171 ++++++++++++++++ cmd/internal/collector/collector_test.go | 20 +- cmd/internal/collector/context.go | 48 ++++- cmd/internal/collector/context_test.go | 4 +- cmd/internal/collector/descriptor.go | 84 +++++++- cmd/internal/collector/descriptor_test.go | 2 - cmd/internal/collector/optimizer.go | 7 +- cmd/internal/collector/query.go | 117 ++++++++++- cmd/internal/collector/query_test.go | 34 ++-- 13 files changed, 852 insertions(+), 177 deletions(-) create mode 100644 cmd/internal/collector/collector_connector.go create mode 100644 cmd/internal/collector/collector_kafka.go create mode 100644 cmd/internal/collector/collector_ksql.go diff --git a/cmd/ccloudexporter/ccloudexporter.go b/cmd/ccloudexporter/ccloudexporter.go index f595a6e..3b033e8 100644 --- a/cmd/ccloudexporter/ccloudexporter.go +++ b/cmd/ccloudexporter/ccloudexporter.go @@ -11,8 +11,8 @@ import ( "github.com/Dabz/ccloudexporter/cmd/internal/collector" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" - "net/http" log "github.com/sirupsen/logrus" + "net/http" ) func main() { diff --git a/cmd/internal/collector/collector.go b/cmd/internal/collector/collector.go index 4f8e2b2..730351f 100644 --- a/cmd/internal/collector/collector.go +++ b/cmd/internal/collector/collector.go @@ -8,9 +8,7 @@ package collector // import ( - "fmt" "net/http" - "strconv" "sync" "time" @@ -31,161 +29,80 @@ type CCloudCollectorMetric struct { // CCloudCollector is a custom prometheu collector to collect data from // Confluent Cloud Metrics API type CCloudCollector struct { - metrics map[string]CCloudCollectorMetric - rules []Rule -} - -// Describe collect all metrics for ccloudexporter -func (cc CCloudCollector) Describe(ch chan<- *prometheus.Desc) { - for _, desc := range cc.metrics { - ch <- desc.desc - desc.duration.Describe(ch) - } + metrics map[string]CCloudCollectorMetric + rules []Rule + kafkaCollector *KafkaCCloudCollector + connectorCollector *ConnectorCCloudCollector + ksqlCollector *KsqlCCloudCollector } var ( httpClient http.Client ) +// Describe collect all metrics for ccloudexporter +func (cc CCloudCollector) Describe(ch chan<- *prometheus.Desc) { + cc.kafkaCollector.Describe(ch) + cc.connectorCollector.Describe(ch) + cc.ksqlCollector.Describe(ch) +} + // Collect all metrics for Prometheus // to avoid reaching the scrape_timeout, metrics are fetched in multiple goroutine func (cc CCloudCollector) Collect(ch chan<- prometheus.Metric) { var wg sync.WaitGroup - for _, rule := range cc.rules { - for _, metric := range rule.Metrics { - wg.Add(1) - go cc.CollectMetricsForRule(&wg, ch, rule, cc.metrics[metric]) - } - } - + cc.kafkaCollector.Collect(ch, &wg) + cc.connectorCollector.Collect(ch, &wg) + cc.ksqlCollector.Collect(ch, &wg) wg.Wait() } -// CollectMetricsForRule collects all metrics for a specific rule -func (cc CCloudCollector) CollectMetricsForRule(wg *sync.WaitGroup, ch chan<- prometheus.Metric, rule Rule, ccmetric CCloudCollectorMetric) { - defer wg.Done() - query := BuildQuery(ccmetric.metric, rule.Clusters, rule.GroupByLabels, rule.Topics) - log.WithFields(log.Fields{"query": query}).Traceln("The following query has been created") - optimizedQuery, additionalLabels := OptimizeQuery(query) - log.WithFields(log.Fields{"optimizedQuery": optimizedQuery, "additionalLabels": additionalLabels}).Traceln("Query has been optimized") - durationMetric, _ := ccmetric.duration.GetMetricWithLabelValues(strconv.Itoa(rule.id)) - timer := prometheus.NewTimer(prometheus.ObserverFunc(durationMetric.Set)) - response, err := SendQuery(optimizedQuery) - timer.ObserveDuration() - ch <- durationMetric - if err != nil { - log.WithError(err).WithFields(log.Fields{"optimizedQuery": optimizedQuery, "response": response}).Errorln("Query did not succeed") - return - } - log.WithFields(log.Fields{"response": response}).Traceln("Response has been received") - cc.handleResponse(response, ccmetric, ch, rule, additionalLabels) -} - -func (cc CCloudCollector) handleResponse(response QueryResponse, ccmetric CCloudCollectorMetric, ch chan<- prometheus.Metric, rule Rule, additionalLabels map[string]string) { - desc := ccmetric.desc - for _, dataPoint := range response.Data { - // Some data points might need to be ignored if it is the global query - topic, topicPresent := dataPoint["metric.label.topic"].(string) - cluster, clusterPresent := dataPoint["metric.label.cluster_id"].(string) - - if !clusterPresent { - cluster, clusterPresent = additionalLabels["metric.label.cluster_id"] - } - - if !topicPresent { - topic, topicPresent = additionalLabels["metric.label.topic"] - } - - if topicPresent && clusterPresent && rule.ShouldIgnoreResultForRule(topic, cluster, ccmetric.metric.Name) { - continue - } - - value, ok := dataPoint["value"].(float64) - if !ok { - log.WithField("datapoint", dataPoint["value"]).Errorln("Can not convert result to float") - return - } - - labels := []string{} - for _, label := range ccmetric.labels { - labelValue, labelValuePresent := dataPoint["metric.label."+label].(string) - if !labelValuePresent { - labelValue, labelValuePresent = additionalLabels["metric.label."+label] - } - labels = append(labels, labelValue) - } - - metric := prometheus.MustNewConstMetric( - desc, - prometheus.GaugeValue, - value, - labels..., - ) - - if Context.NoTimestamp { - ch <- metric - } else { - timestamp, err := time.Parse(time.RFC3339, fmt.Sprint(dataPoint["timestamp"])) - if err != nil { - log.WithError(err).Errorln("Can not parse timestamp, ignoring the response") - return - } - metricWithTime := prometheus.NewMetricWithTimestamp(timestamp, metric) - ch <- metricWithTime - } - } -} - // NewCCloudCollector creates a new instance of the collector // During the creation, we invoke the descriptor endpoint to fetcha all // existing metrics and their labels func NewCCloudCollector() CCloudCollector { - collector := CCloudCollector{rules: Context.Rules, metrics: make(map[string]CCloudCollectorMetric)} - descriptorResponse := SendDescriptorQuery() - log.WithField("descriptor response", descriptorResponse).Traceln("The following response for the descriptor endpoint has been received") - mapOfWhiteListedMetrics := Context.GetMapOfMetrics() - - for _, metr := range descriptorResponse.Data { - _, metricPresent := mapOfWhiteListedMetrics[metr.Name] - if !metricPresent { - continue - } - delete(mapOfWhiteListedMetrics, metr.Name) - metr.Labels = append(metr.Labels, MetricLabel{Key: "cluster_id", Description: "Cluster ID"}) - var labels []string - for _, metrLabel := range metr.Labels { - labels = append(labels, metrLabel.Key) - } - desc := prometheus.NewDesc( - "ccloud_metric_"+GetNiceNameForMetric(metr), - metr.Description, - labels, - nil, - ) - - requestDuration := prometheus.NewGaugeVec(prometheus.GaugeOpts{ - Name: "ccloud_metrics_api_request_latency", - Help: "Metrics API request latency", - ConstLabels: map[string]string{"metric": metr.Name}, - }, []string{"ruleNumber"}) - - metric := CCloudCollectorMetric{ - metric: metr, - desc: desc, - duration: requestDuration, - labels: labels, + log.Traceln("Creating http client") + httpClient = http.Client{ + Timeout: time.Second * time.Duration(Context.HTTPTimeout), + } + + var ( + connectorResource ResourceDescription + kafkaResource ResourceDescription + ksqlResource ResourceDescription + ) + resourceDescription := SendResourceDescriptorQuery() + for _, resource := range resourceDescription.Data { + if resource.Type == "connector" { + connectorResource = resource + } else if resource.Type == "kafka" { + kafkaResource = resource + } else if resource.Type == "ksql" { + ksqlResource = resource } - collector.metrics[metr.Name] = metric } - httpClient = http.Client{ - Timeout: time.Second * time.Duration(Context.HTTPTimeout), + if connectorResource.Type == "" { + log.WithField("descriptorResponse", resourceDescription).Fatalln("No connector resource available") } - if len(mapOfWhiteListedMetrics) > 0 { - log.WithField("Ignored metrics", mapOfWhiteListedMetrics).Warnln("The following metrics will not be gathered as they are not exposed by the Metrics API") + if kafkaResource.Type == "" { + log.WithField("descriptorResponse", resourceDescription).Fatalln("No kafka resource available") } + if ksqlResource.Type == "" { + log.WithField("descriptorResponse", resourceDescription).Fatalln("No ksqlDB resource available") + } + + collector := CCloudCollector{rules: Context.Rules, metrics: make(map[string]CCloudCollectorMetric)} + kafkaCollector := NewKafkaCCloudCollector(collector, kafkaResource) + connectorCollector := NewConnectorCCloudCollector(collector, connectorResource) + ksqlCollector := NewKsqlCCloudCollector(collector, ksqlResource) + + collector.kafkaCollector = &kafkaCollector + collector.connectorCollector = &connectorCollector + collector.ksqlCollector = &ksqlCollector + return collector } diff --git a/cmd/internal/collector/collector_connector.go b/cmd/internal/collector/collector_connector.go new file mode 100644 index 0000000..fcd9dd1 --- /dev/null +++ b/cmd/internal/collector/collector_connector.go @@ -0,0 +1,171 @@ +package collector + +// +// collector.go +// Copyright (C) 2020 gaspar_d +// +// Distributed under terms of the MIT license. +// + +import ( + "fmt" + "strconv" + "sync" + "time" + + "github.com/prometheus/client_golang/prometheus" + log "github.com/sirupsen/logrus" +) + +// ConnectorCCloudCollector is a custom prometheu collector to collect data from +// Confluent Cloud Metrics API. It fetches Connector resources types metrics +type ConnectorCCloudCollector struct { + metrics map[string]CCloudCollectorMetric + rules []Rule + ccloud CCloudCollector + resource ResourceDescription +} + +// Describe collect all metrics for ccloudexporter +func (cc ConnectorCCloudCollector) Describe(ch chan<- *prometheus.Desc) { + for _, desc := range cc.metrics { + ch <- desc.desc + desc.duration.Describe(ch) + } +} + +// Collect all metrics for Prometheus +// to avoid reaching the scrape_timeout, metrics are fetched in multiple goroutine +func (cc ConnectorCCloudCollector) Collect(ch chan<- prometheus.Metric, wg *sync.WaitGroup) { + for _, rule := range cc.rules { + for _, metric := range rule.Metrics { + _, present := cc.metrics[metric] + if !present { + continue + } + + if len(rule.Clusters) <= 0 { + log.WithFields(log.Fields{"rule": rule}).Errorln("Kafka rule has no cluster specified") + continue + } + + wg.Add(1) + go cc.CollectMetricsForRule(wg, ch, rule, cc.metrics[metric]) + } + } +} + +// CollectMetricsForRule collects all metrics for a specific rule +func (cc ConnectorCCloudCollector) CollectMetricsForRule(wg *sync.WaitGroup, ch chan<- prometheus.Metric, rule Rule, ccmetric CCloudCollectorMetric) { + defer wg.Done() + query := BuildConnectorsQuery(ccmetric.metric, rule.Connectors, cc.resource) + log.WithFields(log.Fields{"query": query}).Traceln("The following query has been created") + optimizedQuery, additionalLabels := OptimizeQuery(query) + log.WithFields(log.Fields{"optimizedQuery": optimizedQuery, "additionalLabels": additionalLabels}).Traceln("Query has been optimized") + durationMetric, _ := ccmetric.duration.GetMetricWithLabelValues(strconv.Itoa(rule.id)) + timer := prometheus.NewTimer(prometheus.ObserverFunc(durationMetric.Set)) + response, err := SendQuery(optimizedQuery) + timer.ObserveDuration() + ch <- durationMetric + if err != nil { + log.WithError(err).WithFields(log.Fields{"optimizedQuery": optimizedQuery, "response": response}).Errorln("Query did not succeed") + return + } + log.WithFields(log.Fields{"response": response}).Traceln("Response has been received") + cc.handleResponse(response, ccmetric, ch, rule, additionalLabels) +} + +func (cc ConnectorCCloudCollector) handleResponse(response QueryResponse, ccmetric CCloudCollectorMetric, ch chan<- prometheus.Metric, rule Rule, additionalLabels map[string]string) { + desc := ccmetric.desc + for _, dataPoint := range response.Data { + value, ok := dataPoint["value"].(float64) + if !ok { + log.WithField("datapoint", dataPoint["value"]).Errorln("Can not convert result to float") + return + } + + labels := []string{} + for _, label := range ccmetric.labels { + name := cc.resource.datapointFieldNameForLabel(label) + labelValue, labelValuePresent := dataPoint[name].(string) + if !labelValuePresent { + labelValue, labelValuePresent = additionalLabels[name] + } + labels = append(labels, labelValue) + } + + metric := prometheus.MustNewConstMetric( + desc, + prometheus.GaugeValue, + value, + labels..., + ) + + if Context.NoTimestamp { + ch <- metric + } else { + timestamp, err := time.Parse(time.RFC3339, fmt.Sprint(dataPoint["timestamp"])) + if err != nil { + log.WithError(err).Errorln("Can not parse timestamp, ignoring the response") + return + } + metricWithTime := prometheus.NewMetricWithTimestamp(timestamp, metric) + ch <- metricWithTime + } + } +} + +func NewConnectorCCloudCollector(ccloudcollecter CCloudCollector, resource ResourceDescription) ConnectorCCloudCollector { + collector := ConnectorCCloudCollector{ + rules: Context.GetConnectorRules(), + metrics: make(map[string]CCloudCollectorMetric), + ccloud: ccloudcollecter, + resource: resource, + } + descriptorResponse := SendDescriptorQuery(resource.Type) + log.WithField("descriptor response", descriptorResponse).Traceln("The following response for the descriptor endpoint has been received") + mapOfWhiteListedMetrics := Context.GetMapOfMetrics("io.confluent.kafka.connect") + + for _, metr := range descriptorResponse.Data { + _, metricPresent := mapOfWhiteListedMetrics[metr.Name] + if !metricPresent { + continue + } + delete(mapOfWhiteListedMetrics, metr.Name) + var labels []string + for _, metrLabel := range metr.Labels { + labels = append(labels, metrLabel.Key) + } + + for _, rsrcLabel := range resource.Labels { + labels = append(labels, GetPrometheusNameForLabel(rsrcLabel.Key)) + } + + desc := prometheus.NewDesc( + "ccloud_metric_connector_"+GetNiceNameForMetric(metr), + metr.Description, + labels, + nil, + ) + + requestDuration := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "ccloud_metrics_api_request_latency", + Help: "Metrics API request latency", + ConstLabels: map[string]string{"metric": metr.Name}, + }, []string{"ruleNumber"}) + + metric := CCloudCollectorMetric{ + metric: metr, + desc: desc, + duration: requestDuration, + labels: labels, + } + collector.metrics[metr.Name] = metric + } + + if len(mapOfWhiteListedMetrics) > 0 { + log.WithField("Ignored metrics", mapOfWhiteListedMetrics).Warnln("The following metrics will not be gathered as they are not exposed by the Metrics API") + } + + return collector +} diff --git a/cmd/internal/collector/collector_kafka.go b/cmd/internal/collector/collector_kafka.go new file mode 100644 index 0000000..331f1a7 --- /dev/null +++ b/cmd/internal/collector/collector_kafka.go @@ -0,0 +1,186 @@ +package collector + +// +// collector.go +// Copyright (C) 2020 gaspar_d +// +// Distributed under terms of the MIT license. +// + +import ( + "fmt" + "strconv" + "sync" + "time" + + "github.com/prometheus/client_golang/prometheus" + log "github.com/sirupsen/logrus" +) + +// KafkaCCloudCollector is a custom prometheu collector to collect data from +// Confluent Cloud Metrics API. It fetches Kafka resources types metrics +type KafkaCCloudCollector struct { + metrics map[string]CCloudCollectorMetric + rules []Rule + ccloud CCloudCollector + resource ResourceDescription +} + +// Describe collect all metrics for ccloudexporter +func (cc KafkaCCloudCollector) Describe(ch chan<- *prometheus.Desc) { + for _, desc := range cc.metrics { + ch <- desc.desc + desc.duration.Describe(ch) + } +} + +// Collect all metrics for Prometheus +// to avoid reaching the scrape_timeout, metrics are fetched in multiple goroutine +func (cc KafkaCCloudCollector) Collect(ch chan<- prometheus.Metric, wg *sync.WaitGroup) { + for _, rule := range cc.rules { + for _, metric := range rule.Metrics { + _, present := cc.metrics[metric] + if !present { + continue + } + if len(rule.Clusters) <= 0 { + log.WithFields(log.Fields{"rule": rule}).Errorln("Kafka rule has no cluster specified") + continue + } + + wg.Add(1) + go cc.CollectMetricsForRule(wg, ch, rule, cc.metrics[metric]) + } + } +} + +// CollectMetricsForRule collects all metrics for a specific rule +func (cc KafkaCCloudCollector) CollectMetricsForRule(wg *sync.WaitGroup, ch chan<- prometheus.Metric, rule Rule, ccmetric CCloudCollectorMetric) { + defer wg.Done() + query := BuildQuery(ccmetric.metric, rule.Clusters, rule.GroupByLabels, rule.Topics, cc.resource) + log.WithFields(log.Fields{"query": query}).Traceln("The following query has been created") + optimizedQuery, additionalLabels := OptimizeQuery(query) + log.WithFields(log.Fields{"optimizedQuery": optimizedQuery, "additionalLabels": additionalLabels}).Traceln("Query has been optimized") + durationMetric, _ := ccmetric.duration.GetMetricWithLabelValues(strconv.Itoa(rule.id)) + timer := prometheus.NewTimer(prometheus.ObserverFunc(durationMetric.Set)) + response, err := SendQuery(optimizedQuery) + timer.ObserveDuration() + ch <- durationMetric + if err != nil { + log.WithError(err).WithFields(log.Fields{"optimizedQuery": optimizedQuery, "response": response}).Errorln("Query did not succeed") + return + } + log.WithFields(log.Fields{"response": response}).Traceln("Response has been received") + cc.handleResponse(response, ccmetric, ch, rule, additionalLabels) +} + +func (cc KafkaCCloudCollector) handleResponse(response QueryResponse, ccmetric CCloudCollectorMetric, ch chan<- prometheus.Metric, rule Rule, additionalLabels map[string]string) { + desc := ccmetric.desc + for _, dataPoint := range response.Data { + // Some data points might need to be ignored if it is the global query + topic, topicPresent := dataPoint["metric.topic"].(string) + cluster, clusterPresent := dataPoint["resource.kafka.id"].(string) + + if !clusterPresent { + cluster, clusterPresent = additionalLabels["resource.kafka.id"] + } + + if !topicPresent { + topic, topicPresent = additionalLabels["metric.topic"] + } + + if topicPresent && clusterPresent && rule.ShouldIgnoreResultForRule(topic, cluster, ccmetric.metric.Name) { + continue + } + + value, ok := dataPoint["value"].(float64) + if !ok { + log.WithField("datapoint", dataPoint["value"]).Errorln("Can not convert result to float") + return + } + + labels := []string{} + for _, label := range ccmetric.labels { + name := cc.resource.datapointFieldNameForLabel(label) + labelValue, labelValuePresent := dataPoint[name].(string) + if !labelValuePresent { + labelValue, labelValuePresent = additionalLabels[name] + } + labels = append(labels, labelValue) + } + + metric := prometheus.MustNewConstMetric( + desc, + prometheus.GaugeValue, + value, + labels..., + ) + + if Context.NoTimestamp { + ch <- metric + } else { + timestamp, err := time.Parse(time.RFC3339, fmt.Sprint(dataPoint["timestamp"])) + if err != nil { + log.WithError(err).Errorln("Can not parse timestamp, ignoring the response") + return + } + metricWithTime := prometheus.NewMetricWithTimestamp(timestamp, metric) + ch <- metricWithTime + } + } +} + +func NewKafkaCCloudCollector(ccloudcollecter CCloudCollector, resource ResourceDescription) KafkaCCloudCollector { + collector := KafkaCCloudCollector{ + rules: Context.GetKafkaRules(), + metrics: make(map[string]CCloudCollectorMetric), + resource: resource, + ccloud: ccloudcollecter, + } + descriptorResponse := SendDescriptorQuery(resource.Type) + log.WithField("descriptor response", descriptorResponse).Traceln("The following response for the descriptor endpoint has been received") + mapOfWhiteListedMetrics := Context.GetMapOfMetrics("io.confluent.kafka.server") + + for _, metr := range descriptorResponse.Data { + _, metricPresent := mapOfWhiteListedMetrics[metr.Name] + if !metricPresent { + continue + } + delete(mapOfWhiteListedMetrics, metr.Name) + + var labels []string + for _, rsrcLabel := range resource.Labels { + labels = append(labels, GetPrometheusNameForLabel(rsrcLabel.Key)) + } + for _, metrLabel := range metr.Labels { + labels = append(labels, metrLabel.Key) + } + + desc := prometheus.NewDesc( + "ccloud_metric_"+GetNiceNameForMetric(metr), + metr.Description, + labels, + nil, + ) + + requestDuration := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "ccloud_metrics_api_request_latency", + Help: "Metrics API request latency", + ConstLabels: map[string]string{"metric": metr.Name}, + }, []string{"ruleNumber"}) + + metric := CCloudCollectorMetric{ + metric: metr, + desc: desc, + duration: requestDuration, + labels: labels, + } + collector.metrics[metr.Name] = metric + } + + if len(mapOfWhiteListedMetrics) > 0 { + log.WithField("Ignored metrics", mapOfWhiteListedMetrics).Warnln("The following metrics will not be gathered as they are not exposed by the Metrics API") + } + + return collector +} diff --git a/cmd/internal/collector/collector_ksql.go b/cmd/internal/collector/collector_ksql.go new file mode 100644 index 0000000..97d5d8f --- /dev/null +++ b/cmd/internal/collector/collector_ksql.go @@ -0,0 +1,171 @@ +package collector + +// +// collector.go +// Copyright (C) 2020 gaspar_d +// +// Distributed under terms of the MIT license. +// + +import ( + "fmt" + "strconv" + "sync" + "time" + + "github.com/prometheus/client_golang/prometheus" + log "github.com/sirupsen/logrus" +) + +// KsqlCCloudCollector is a custom prometheus collector to collect data from +// Confluent Cloud Metrics API. It fetches KSQL resources types metrics +type KsqlCCloudCollector struct { + metrics map[string]CCloudCollectorMetric + rules []Rule + ccloud CCloudCollector + resource ResourceDescription +} + +// Describe collect all metrics for ccloudexporter +func (cc KsqlCCloudCollector) Describe(ch chan<- *prometheus.Desc) { + for _, desc := range cc.metrics { + ch <- desc.desc + desc.duration.Describe(ch) + } +} + +// Collect all metrics for Prometheus +// to avoid reaching the scrape_timeout, metrics are fetched in multiple goroutine +func (cc KsqlCCloudCollector) Collect(ch chan<- prometheus.Metric, wg *sync.WaitGroup) { + for _, rule := range cc.rules { + for _, metric := range rule.Metrics { + _, present := cc.metrics[metric] + if !present { + continue + } + + if len(rule.Clusters) <= 0 { + log.WithFields(log.Fields{"rule": rule}).Errorln("Kafka rule has no cluster specified") + continue + } + + wg.Add(1) + go cc.CollectMetricsForRule(wg, ch, rule, cc.metrics[metric]) + } + } +} + +// CollectMetricsForRule collects all metrics for a specific rule +func (cc KsqlCCloudCollector) CollectMetricsForRule(wg *sync.WaitGroup, ch chan<- prometheus.Metric, rule Rule, ccmetric CCloudCollectorMetric) { + defer wg.Done() + query := BuildKsqlQuery(ccmetric.metric, rule.Ksql, cc.resource) + log.WithFields(log.Fields{"query": query}).Traceln("The following query has been created") + optimizedQuery, additionalLabels := OptimizeQuery(query) + log.WithFields(log.Fields{"optimizedQuery": optimizedQuery, "additionalLabels": additionalLabels}).Traceln("Query has been optimized") + durationMetric, _ := ccmetric.duration.GetMetricWithLabelValues(strconv.Itoa(rule.id)) + timer := prometheus.NewTimer(prometheus.ObserverFunc(durationMetric.Set)) + response, err := SendQuery(optimizedQuery) + timer.ObserveDuration() + ch <- durationMetric + if err != nil { + log.WithError(err).WithFields(log.Fields{"optimizedQuery": optimizedQuery, "response": response}).Errorln("Query did not succeed") + return + } + log.WithFields(log.Fields{"response": response}).Traceln("Response has been received") + cc.handleResponse(response, ccmetric, ch, rule, additionalLabels) +} + +func (cc KsqlCCloudCollector) handleResponse(response QueryResponse, ccmetric CCloudCollectorMetric, ch chan<- prometheus.Metric, rule Rule, additionalLabels map[string]string) { + desc := ccmetric.desc + for _, dataPoint := range response.Data { + value, ok := dataPoint["value"].(float64) + if !ok { + log.WithField("datapoint", dataPoint["value"]).Errorln("Can not convert result to float") + return + } + + labels := []string{} + for _, label := range ccmetric.labels { + name := cc.resource.datapointFieldNameForLabel(label) + labelValue, labelValuePresent := dataPoint[name].(string) + if !labelValuePresent { + labelValue, labelValuePresent = additionalLabels[name] + } + labels = append(labels, labelValue) + } + + metric := prometheus.MustNewConstMetric( + desc, + prometheus.GaugeValue, + value, + labels..., + ) + + if Context.NoTimestamp { + ch <- metric + } else { + timestamp, err := time.Parse(time.RFC3339, fmt.Sprint(dataPoint["timestamp"])) + if err != nil { + log.WithError(err).Errorln("Can not parse timestamp, ignoring the response") + return + } + metricWithTime := prometheus.NewMetricWithTimestamp(timestamp, metric) + ch <- metricWithTime + } + } +} + +func NewKsqlCCloudCollector(ccloudcollecter CCloudCollector, resource ResourceDescription) KsqlCCloudCollector { + collector := KsqlCCloudCollector{ + rules: Context.GetConnectorRules(), + metrics: make(map[string]CCloudCollectorMetric), + ccloud: ccloudcollecter, + resource: resource, + } + descriptorResponse := SendDescriptorQuery(resource.Type) + log.WithField("descriptor response", descriptorResponse).Traceln("The following response for the descriptor endpoint has been received") + mapOfWhiteListedMetrics := Context.GetMapOfMetrics("io.confluent.kafka.ksql") + + for _, metr := range descriptorResponse.Data { + _, metricPresent := mapOfWhiteListedMetrics[metr.Name] + if !metricPresent { + continue + } + delete(mapOfWhiteListedMetrics, metr.Name) + var labels []string + for _, metrLabel := range metr.Labels { + labels = append(labels, metrLabel.Key) + } + + for _, rsrcLabel := range resource.Labels { + labels = append(labels, GetPrometheusNameForLabel(rsrcLabel.Key)) + } + + desc := prometheus.NewDesc( + "ccloud_metric_ksql_"+GetNiceNameForMetric(metr), + metr.Description, + labels, + nil, + ) + + requestDuration := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "ccloud_metrics_api_request_latency", + Help: "Metrics API request latency", + ConstLabels: map[string]string{"metric": metr.Name}, + }, []string{"ruleNumber"}) + + metric := CCloudCollectorMetric{ + metric: metr, + desc: desc, + duration: requestDuration, + labels: labels, + } + collector.metrics[metr.Name] = metric + } + + if len(mapOfWhiteListedMetrics) > 0 { + log.WithField("Ignored metrics", mapOfWhiteListedMetrics).Warnln("The following metrics will not be gathered as they are not exposed by the Metrics API") + } + + return collector +} diff --git a/cmd/internal/collector/collector_test.go b/cmd/internal/collector/collector_test.go index 8ec582f..09ad6a6 100644 --- a/cmd/internal/collector/collector_test.go +++ b/cmd/internal/collector/collector_test.go @@ -18,15 +18,23 @@ import ( func TestHandleResponse(t *testing.T) { metric := CCloudCollectorMetric{ - labels: []string{"topic", "cluster_id"}, + labels: []string{"topic", "kafka_id"}, metric: MetricDescription{Name: "metric"}, - desc: prometheus.NewDesc("metric", "help", []string{"topic", "cluster_id"}, nil), + desc: prometheus.NewDesc("metric", "help", []string{"topic", "kafka_id"}, nil), } - collector := CCloudCollector{ + collector := KafkaCCloudCollector{ metrics: map[string]CCloudCollectorMetric{ "metric": metric, }, + resource: ResourceDescription{ + Type: "kafka", + Labels: []MetricLabel{ + { + Key: "kafka.id", + }, + }, + }, } var rule = Rule{ @@ -34,7 +42,7 @@ func TestHandleResponse(t *testing.T) { Topics: []string{"topic"}, Clusters: []string{"cluster"}, Metrics: []string{"metric", "metric2"}, - GroupByLabels: []string{"topic", "cluster_id"}, + GroupByLabels: []string{"topic", "kafka_id"}, } responseString := ` @@ -42,12 +50,13 @@ func TestHandleResponse(t *testing.T) { "data": [ { "metric.label.cluster_id": "cluster", + "resource.kafka.id": "cluster", "metric.label.topic": "topic", "timestamp": "2020-06-03T13:37:00Z", "value": 1.0 }, { - "metric.label.cluster_id": "cluster", + "resource.kafka.id": "cluster", "metric.label.topic": "topic2", "timestamp": "2020-06-03T13:37:00Z", "value": 1.0 @@ -78,5 +87,4 @@ func TestHandleResponse(t *testing.T) { t.Fail() return } - } diff --git a/cmd/internal/collector/context.go b/cmd/internal/collector/context.go index 784e166..7a4f6ea 100644 --- a/cmd/internal/collector/context.go +++ b/cmd/internal/collector/context.go @@ -7,6 +7,8 @@ package collector // Distributed under terms of the MIT license. // +import "strings" + // ExporterContext define the global context for ccloudexporter // This global variables define all timeout, user configuration, // and cluster information @@ -25,6 +27,8 @@ type ExporterContext struct { type Rule struct { Topics []string `mapstructure:"topics"` Clusters []string `mapstructure:"clusters"` + Connectors []string `mapstructure:"connectors"` + Ksql []string `mapstructure:"ksqls"` Metrics []string `mapstructure:"metrics"` GroupByLabels []string `mapstructure:"labels"` cachedIgnoreGlobalResultForTopic map[TopicClusterMetric]bool @@ -39,7 +43,7 @@ type TopicClusterMetric struct { } // Version is the git short SHA1 hash provided at build time -var Version string = "homecooked" +var Version = "homecooked" // Context is the global variable defining the context for the expoter var Context = ExporterContext{} @@ -66,12 +70,14 @@ var DefaultMetrics = []string{ // GetMapOfMetrics returns the whitelist of metrics in a map // where the key is the metric and the value is true if it is comming from an override -func (context ExporterContext) GetMapOfMetrics() map[string]bool { +func (context ExporterContext) GetMapOfMetrics(prefix string) map[string]bool { mapOfWhiteListedMetrics := make(map[string]bool) for _, rule := range Context.Rules { for _, metric := range rule.Metrics { - mapOfWhiteListedMetrics[metric] = true + if strings.HasPrefix(metric, prefix) { + mapOfWhiteListedMetrics[metric] = true + } } } @@ -92,6 +98,42 @@ func (context ExporterContext) GetMetrics() []string { return metrics } +// GetKafkaRules return all rules associated to a Kafka cluster +func (context ExporterContext) GetKafkaRules() []Rule { + kafkaRules := make([]Rule, 0) + for _, irule := range Context.Rules { + if len(irule.Clusters) >= 0 { + kafkaRules = append(kafkaRules, irule) + } + } + + return kafkaRules +} + +// GetConnectorRules return all rules associated to at least one connector +func (context ExporterContext) GetConnectorRules() []Rule { + connectorRule := make([]Rule, 0) + for _, irule := range Context.Rules { + if len(irule.Connectors) >= 0 { + connectorRule = append(connectorRule, irule) + } + } + + return connectorRule +} + +// GetKsqlRules return all rules associated to at least one ksql application +func (context ExporterContext) GetKsqlRules() []Rule { + ksqlRules := make([]Rule, 0) + for _, irule := range Context.Rules { + if len(irule.Ksql) >= 0 { + ksqlRules = append(ksqlRules, irule) + } + } + + return ksqlRules +} + // ShouldIgnoreResultForRule returns true if the result for this topic need to be ignored for this rule. // Some results might be ignored as they are defined in another rule, thus global and override result // could conflict if we do not ignore the global result diff --git a/cmd/internal/collector/context_test.go b/cmd/internal/collector/context_test.go index 9bdf9e1..e36e502 100644 --- a/cmd/internal/collector/context_test.go +++ b/cmd/internal/collector/context_test.go @@ -71,8 +71,8 @@ func TestGetMapOfMetrics(t *testing.T) { Rules: []Rule{rule1, rule2}, } - if len(Context.GetMapOfMetrics()) != 3 { - t.Errorf("Unexpected number of metric returned: %+v", Context.GetMapOfMetrics()) + if len(Context.GetMapOfMetrics("")) != 3 { + t.Errorf("Unexpected number of metric returned: %+v", Context.GetMapOfMetrics("")) t.Fail() } } diff --git a/cmd/internal/collector/descriptor.go b/cmd/internal/collector/descriptor.go index b0833b5..e38f0d4 100644 --- a/cmd/internal/collector/descriptor.go +++ b/cmd/internal/collector/descriptor.go @@ -12,10 +12,10 @@ import "encoding/json" import "io/ioutil" import log "github.com/sirupsen/logrus" -// DescriptorResponse is the response from Confluent Cloud API metric endpoint +// DescriptorMetricResponse is the response from Confluent Cloud API metric endpoint // This is the JSON structure for the endpoint // https://api.telemetry.confluent.cloud/v1/metrics/cloud/descriptors -type DescriptorResponse struct { +type DescriptorMetricResponse struct { Data []MetricDescription `json:"data"` } @@ -40,12 +40,27 @@ type MetricLabel struct { Description string `json:"description"` } +// DescriptorResourceResponse is the result of the Metrics API resource description +type DescriptorResourceResponse struct { + Data []ResourceDescription `json:"data"` +} + +// ResourceDescription describes one resource returned by the Metrics API +type ResourceDescription struct { + Type string `json:"type"` + Description string `json:"description"` + Labels []MetricLabel `json:"labels"` +} + var ( excludeListForMetric = map[string]string{ - "io.confluent.kafka.server": "", - "delta": "", + "io.confluent.kafka.server": "", + "io.confluent.kafka.connect": "", + "io.confluent.kafka.ksql": "", + "delta": "", } - descriptorURI = "/v1/metrics/cloud/descriptors" + descriptorURI = "v2/metrics/cloud/descriptors/metrics" + descriptorResourceURI = "v2/metrics/cloud/descriptors/resources" ) // Return true if the metric has this label @@ -58,6 +73,25 @@ func (metric MetricDescription) hasLabel(label string) bool { return false } +// Return true if the resource has this label +func (resource ResourceDescription) hasLabel(label string) bool { + stripLabel := strings.Replace(strings.Replace(label, "resource.", "", 1), ".", "_", -1) + for _, l := range resource.Labels { + stripKey := strings.Replace(strings.Replace(l.Key, "resource.", "", 1), ".", "_", -1) + if stripKey == stripLabel { + return true + } + } + return false +} + +func (resource ResourceDescription) datapointFieldNameForLabel(label string) string { + if resource.hasLabel(label) { + return "resource." + strings.Replace(label, "_", ".", -1) + } + return "metric." + label +} + // GetNiceNameForMetric returns a human friendly metric name from a Confluent Cloud API metric func GetNiceNameForMetric(metric MetricDescription) string { splits := strings.Split(metric.Name, "/") @@ -72,10 +106,42 @@ func GetNiceNameForMetric(metric MetricDescription) string { panic(nil) } -// SendDescriptorQuery calls the https://api.telemetry.confluent.cloud/v1/metrics/cloud/descriptors endpoint +// GetPrometheusNameForLabel returns a prometheus friendly name for a label +func GetPrometheusNameForLabel(label string) string { + return strings.Join(strings.Split(label, "."), "_") +} + +// SendDescriptorQuery calls the https://api.telemetry.confluent.cloud/v2/metrics/cloud/descriptors endpoint // to retrieve the list of metrics -func SendDescriptorQuery() DescriptorResponse { - endpoint := Context.HTTPBaseURL + descriptorURI +func SendDescriptorQuery(ressourceType string) DescriptorMetricResponse { + endpoint := Context.HTTPBaseURL + descriptorURI + "?resource_type=" + ressourceType + req := MustGetNewRequest("GET", endpoint, nil) + + res, err := httpClient.Do(req) + if err != nil { + log.WithError(err).Fatalln("HTTP query for the descriptor endpoint failed") + } + + if res.StatusCode != 200 { + body, _ := ioutil.ReadAll(res.Body) + log.WithFields(log.Fields{"StatusCode": res.StatusCode, "Endpoint": endpoint, "body": body}).Fatalf("Received status code %d instead of 200 for GET on %s. \n\n%s\n\n", res.StatusCode, endpoint, body) + } + + body, err := ioutil.ReadAll(res.Body) + if err != nil { + log.WithError(err).Fatalln("Can not read the content of the descriptor query") + } + + response := DescriptorMetricResponse{} + json.Unmarshal(body, &response) + + return response +} + +// SendResourceDescriptorQuery calls the https://api.telemetry.confluent.cloud/v2/metrics/cloud/descriptors endpoint +// to retrieve the list of available resources +func SendResourceDescriptorQuery() DescriptorResourceResponse { + endpoint := Context.HTTPBaseURL + descriptorResourceURI req := MustGetNewRequest("GET", endpoint, nil) res, err := httpClient.Do(req) @@ -93,7 +159,7 @@ func SendDescriptorQuery() DescriptorResponse { log.WithError(err).Fatalln("Can not read the content of the descriptor query") } - response := DescriptorResponse{} + response := DescriptorResourceResponse{} json.Unmarshal(body, &response) return response diff --git a/cmd/internal/collector/descriptor_test.go b/cmd/internal/collector/descriptor_test.go index 56c3adf..8f158d1 100644 --- a/cmd/internal/collector/descriptor_test.go +++ b/cmd/internal/collector/descriptor_test.go @@ -42,5 +42,3 @@ func TestHasLabel(t *testing.T) { t.Fail() } } - - diff --git a/cmd/internal/collector/optimizer.go b/cmd/internal/collector/optimizer.go index bd55c24..397051c 100644 --- a/cmd/internal/collector/optimizer.go +++ b/cmd/internal/collector/optimizer.go @@ -20,13 +20,14 @@ func OptimizeQuery(input Query) (Query, map[string]string) { return optimizedQuery, optimizedLabel } - +// If there is an equality filtering on a field, there is no need +// to group by this label. func removeSuperfluousGoupBy(input Query) (Query, map[string]string) { optimizedGroupByList := make([]string, 0) labels := make(map[string]string) for _, groupBy := range input.GroupBy { filters := equalityFiltering(input.Filter.Filters, groupBy, input.Filter.Op) - if (len(filters) != 1) { + if len(filters) != 1 { optimizedGroupByList = append(optimizedGroupByList, groupBy) } else { labels[groupBy] = filters[0].Value @@ -45,7 +46,7 @@ func equalityFiltering(filters []Filter, metric string, parentOp string) []Filte continue } - if (filter.Field == metric && parentOp == "OR" && filter.Op == "EQ") { + if filter.Field == metric && parentOp == "OR" && filter.Op == "EQ" { possibilities = append(possibilities, filter) } } diff --git a/cmd/internal/collector/query.go b/cmd/internal/collector/query.go index b8961eb..2a3664f 100644 --- a/cmd/internal/collector/query.go +++ b/cmd/internal/collector/query.go @@ -7,7 +7,10 @@ package collector // Distributed under terms of the MIT license. // -import "time" +import ( + "strings" + "time" +) import "fmt" import "bytes" import "errors" @@ -64,12 +67,12 @@ type QueryResponse struct { // } var ( - queryURI = "/v1/metrics/cloud/query" + queryURI = "/v2/metrics/cloud/query" ) // BuildQuery creates a new Query for a metric for a specific cluster and time interval // This function will return the main global query, override queries will not be generated -func BuildQuery(metric MetricDescription, clusters []string, groupByLabels []string, topicFiltering []string) Query { +func BuildQuery(metric MetricDescription, clusters []string, groupByLabels []string, topicFiltering []string, resource ResourceDescription) Query { timeFrom := time.Now().Add(time.Duration(-Context.Delay) * time.Second) // the last minute might contains data that is not yet finalized timeFrom = timeFrom.Add(time.Duration(-timeFrom.Second()) * time.Second) // the seconds need to be stripped to have an effective delay @@ -83,7 +86,7 @@ func BuildQuery(metric MetricDescription, clusters []string, groupByLabels []str clusterFilters := make([]Filter, 0) for _, cluster := range clusters { clusterFilters = append(clusterFilters, Filter{ - Field: "metric.label.cluster_id", + Field: "resource.kafka.id", Op: "EQ", Value: cluster, }) @@ -97,7 +100,7 @@ func BuildQuery(metric MetricDescription, clusters []string, groupByLabels []str topicFilters := make([]Filter, 0) for _, topic := range topicFiltering { topicFilters = append(topicFilters, Filter{ - Field: "metric.label.topic", + Field: "metric.topic", Op: "EQ", Value: topic, }) @@ -117,10 +120,110 @@ func BuildQuery(metric MetricDescription, clusters []string, groupByLabels []str groupBy := []string{} for _, label := range metric.Labels { if contains(groupByLabels, label.Key) { - groupBy = append(groupBy, "metric.label."+label.Key) + if resource.hasLabel(label.Key) { + groupBy = append(groupBy, "resource."+strings.Replace(label.Key, "_", ".", -1)) + } else { + groupBy = append(groupBy, "metric."+label.Key) + } } } + for _, label := range resource.Labels { + groupBy = append(groupBy, "resource."+label.Key) + } + + return Query{ + Aggreations: []Aggregation{aggregation}, + Filter: filterHeader, + Granularity: Context.Granularity, + GroupBy: groupBy, + Limit: 1000, + Intervals: []string{fmt.Sprintf("%s/%s", timeFrom.Format(time.RFC3339), Context.Granularity)}, + } +} + +// BuildConnectorsQuery creates a new Query for a metric for a specific cluster and time interval +// This function will return the main global query, override queries will not be generated +func BuildConnectorsQuery(metric MetricDescription, connectors []string, resource ResourceDescription) Query { + timeFrom := time.Now().Add(time.Duration(-Context.Delay) * time.Second) // the last minute might contains data that is not yet finalized + timeFrom = timeFrom.Add(time.Duration(-timeFrom.Second()) * time.Second) // the seconds need to be stripped to have an effective delay + + aggregation := Aggregation{ + Agg: "SUM", + Metric: metric.Name, + } + + filters := make([]Filter, 0) + + connectorFilters := make([]Filter, 0) + for _, connector := range connectors { + connectorFilters = append(connectorFilters, Filter{ + Field: "resource.connector.id", + Op: "EQ", + Value: connector, + }) + } + + filters = append(filters, Filter{ + Op: "OR", + Filters: connectorFilters, + }) + + filterHeader := FilterHeader{ + Op: "AND", + Filters: filters, + } + + groupBy := make([]string, len(resource.Labels)) + for i, rsrcLabel := range resource.Labels { + groupBy[i] = "resource." + rsrcLabel.Key + } + + return Query{ + Aggreations: []Aggregation{aggregation}, + Filter: filterHeader, + Granularity: Context.Granularity, + GroupBy: groupBy, + Limit: 1000, + Intervals: []string{fmt.Sprintf("%s/%s", timeFrom.Format(time.RFC3339), Context.Granularity)}, + } +} + +func BuildKsqlQuery(metric MetricDescription, ksqlAppIds []string, resource ResourceDescription) Query { + timeFrom := time.Now().Add(time.Duration(-Context.Delay) * time.Second) // the last minute might contains data that is not yet finalized + timeFrom = timeFrom.Add(time.Duration(-timeFrom.Second()) * time.Second) // the seconds need to be stripped to have an effective delay + + aggregation := Aggregation{ + Agg: "SUM", + Metric: metric.Name, + } + + filters := make([]Filter, 0) + + connectorFilters := make([]Filter, 0) + for _, ksqlId := range ksqlAppIds { + connectorFilters = append(connectorFilters, Filter{ + Field: "resource.ksql.id", + Op: "EQ", + Value: ksqlId, + }) + } + + filters = append(filters, Filter{ + Op: "OR", + Filters: connectorFilters, + }) + + filterHeader := FilterHeader{ + Op: "AND", + Filters: filters, + } + + groupBy := make([]string, len(resource.Labels)) + for i, rsrcLabel := range resource.Labels { + groupBy[i] = "resource." + rsrcLabel.Key + } + return Query{ Aggreations: []Aggregation{aggregation}, Filter: filterHeader, @@ -136,7 +239,7 @@ func SendQuery(query Query) (QueryResponse, error) { jsonQuery, err := json.Marshal(query) if err != nil { log.WithError(err).Errorln("Failed serialize query in JSON") - return QueryResponse{}, errors.New("Failed serialize query in JSON") + return QueryResponse{}, errors.New("failed serializing query in JSON") } endpoint := Context.HTTPBaseURL + queryURI req := MustGetNewRequest("POST", endpoint, bytes.NewBuffer(jsonQuery)) diff --git a/cmd/internal/collector/query_test.go b/cmd/internal/collector/query_test.go index f9d94cc..bf5d18e 100644 --- a/cmd/internal/collector/query_test.go +++ b/cmd/internal/collector/query_test.go @@ -11,26 +11,38 @@ import "testing" import "strings" import "time" +var ( + resource = ResourceDescription{ + Type: "kafka", + Description: "", + Labels: []MetricLabel{ + { + Key: "kafka.id", + }, + }, + } +) + func TestBuildQuery(t *testing.T) { metric := MetricDescription{ Name: "io.confluent.kafka.server/retained_bytes", Labels: []MetricLabel{{ Key: "topic", }, { - Key: "cluster_id", + Key: "kafka_id", }, { Key: "partition", }}, } - query := BuildQuery(metric, []string{"cluster"}, []string{"cluster_id", "topic"}, nil) + query := BuildQuery(metric, []string{"cluster"}, []string{"kafka_id", "topic"}, nil, resource) if len(query.Filter.Filters) != 1 || len(query.Filter.Filters[0].Filters) != 1 { t.Fail() return } - if query.Filter.Filters[0].Filters[0].Field != "metric.label.cluster_id" { + if query.Filter.Filters[0].Filters[0].Field != "resource.kafka.id" { t.Fail() return } @@ -58,20 +70,20 @@ func TestBuildQueryWithTopic(t *testing.T) { Labels: []MetricLabel{{ Key: "topic", }, { - Key: "cluster_id", + Key: "kafka_id", }, { Key: "partition", }}, } - query := BuildQuery(metric, []string{"cluster"}, []string{"cluster_id", "topic", "partition"}, []string{"topic"}) + query := BuildQuery(metric, []string{"cluster"}, []string{"kafka_id", "topic", "partition"}, []string{"topic"}, resource) if len(query.Filter.Filters) != 2 || len(query.Filter.Filters[1].Filters) != 1 { t.Fail() return } - if query.Filter.Filters[0].Filters[0].Field != "metric.label.cluster_id" { + if query.Filter.Filters[0].Filters[0].Field != "resource.kafka.id" { t.Fail() return } @@ -81,7 +93,7 @@ func TestBuildQueryWithTopic(t *testing.T) { return } - if query.Filter.Filters[1].Filters[0].Field != "metric.label.topic" { + if query.Filter.Filters[1].Filters[0].Field != "metric.topic" { t.Fail() return } @@ -98,13 +110,13 @@ func TestOptimizationRemoveSuperfelousGroupBy(t *testing.T) { Labels: []MetricLabel{{ Key: "topic", }, { - Key: "cluster_id", + Key: "kafka_id", }, { Key: "partition", }}, } - query, _ := OptimizeQuery(BuildQuery(metric, []string{"cluster"}, []string{"cluster_id", "topic"}, nil)) + query, _ := OptimizeQuery(BuildQuery(metric, []string{"cluster"}, []string{"kafka_id", "topic"}, nil, resource)) if len(query.GroupBy) > 1 { t.Errorf("Unexepected groupBy list: %s\n", query.GroupBy) @@ -119,13 +131,13 @@ func TestOptimizationDoesNotRemoveRequiredGroupBy(t *testing.T) { Labels: []MetricLabel{{ Key: "topic", }, { - Key: "cluster_id", + Key: "kafka_id", }, { Key: "partition", }}, } - query, _ := OptimizeQuery(BuildQuery(metric, []string{"cluster1", "cluster2"}, []string{"cluster_id", "topic"}, nil)) + query, _ := OptimizeQuery(BuildQuery(metric, []string{"cluster1", "cluster2"}, []string{"kafka_id", "topic"}, nil, resource)) if len(query.GroupBy) <= 1 { t.Errorf("Unexepected groupBy list: %s\n", query.GroupBy) From df10c22c685215f0e3fe60ff5ba78df56c089f6e Mon Sep 17 00:00:00 2001 From: Damien Gasparina Date: Tue, 12 Jan 2021 17:27:17 +0100 Subject: [PATCH 2/5] Golint & updated Grafana dashboard --- cmd/internal/collector/collector_connector.go | 1 + cmd/internal/collector/collector_kafka.go | 1 + cmd/internal/collector/collector_ksql.go | 1 + cmd/internal/collector/query.go | 8 +- grafana/ccloud-exporter.json | 1859 ++++++++++++----- 5 files changed, 1311 insertions(+), 559 deletions(-) diff --git a/cmd/internal/collector/collector_connector.go b/cmd/internal/collector/collector_connector.go index fcd9dd1..81a9179 100644 --- a/cmd/internal/collector/collector_connector.go +++ b/cmd/internal/collector/collector_connector.go @@ -115,6 +115,7 @@ func (cc ConnectorCCloudCollector) handleResponse(response QueryResponse, ccmetr } } +// NewConnectorCCloudCollector create a new Confluent Cloud Connector collector func NewConnectorCCloudCollector(ccloudcollecter CCloudCollector, resource ResourceDescription) ConnectorCCloudCollector { collector := ConnectorCCloudCollector{ rules: Context.GetConnectorRules(), diff --git a/cmd/internal/collector/collector_kafka.go b/cmd/internal/collector/collector_kafka.go index 331f1a7..8325efe 100644 --- a/cmd/internal/collector/collector_kafka.go +++ b/cmd/internal/collector/collector_kafka.go @@ -130,6 +130,7 @@ func (cc KafkaCCloudCollector) handleResponse(response QueryResponse, ccmetric C } } +// NewKafkaCCloudCollector create a new Confluent Cloud Kafka collector func NewKafkaCCloudCollector(ccloudcollecter CCloudCollector, resource ResourceDescription) KafkaCCloudCollector { collector := KafkaCCloudCollector{ rules: Context.GetKafkaRules(), diff --git a/cmd/internal/collector/collector_ksql.go b/cmd/internal/collector/collector_ksql.go index 97d5d8f..fbe72fe 100644 --- a/cmd/internal/collector/collector_ksql.go +++ b/cmd/internal/collector/collector_ksql.go @@ -115,6 +115,7 @@ func (cc KsqlCCloudCollector) handleResponse(response QueryResponse, ccmetric CC } } +// NewKsqlCCloudCollector create a new Confluent Cloud ksql collector func NewKsqlCCloudCollector(ccloudcollecter CCloudCollector, resource ResourceDescription) KsqlCCloudCollector { collector := KsqlCCloudCollector{ rules: Context.GetConnectorRules(), diff --git a/cmd/internal/collector/query.go b/cmd/internal/collector/query.go index 2a3664f..ce069fa 100644 --- a/cmd/internal/collector/query.go +++ b/cmd/internal/collector/query.go @@ -142,7 +142,7 @@ func BuildQuery(metric MetricDescription, clusters []string, groupByLabels []str } } -// BuildConnectorsQuery creates a new Query for a metric for a specific cluster and time interval +// BuildConnectorsQuery creates a new Query for a metric for a set of connectors // This function will return the main global query, override queries will not be generated func BuildConnectorsQuery(metric MetricDescription, connectors []string, resource ResourceDescription) Query { timeFrom := time.Now().Add(time.Duration(-Context.Delay) * time.Second) // the last minute might contains data that is not yet finalized @@ -189,6 +189,8 @@ func BuildConnectorsQuery(metric MetricDescription, connectors []string, resourc } } +// BuildKsqlQuery creates a new Query for a metric for a specific ksql application +// This function will return the main global query, override queries will not be generated func BuildKsqlQuery(metric MetricDescription, ksqlAppIds []string, resource ResourceDescription) Query { timeFrom := time.Now().Add(time.Duration(-Context.Delay) * time.Second) // the last minute might contains data that is not yet finalized timeFrom = timeFrom.Add(time.Duration(-timeFrom.Second()) * time.Second) // the seconds need to be stripped to have an effective delay @@ -201,11 +203,11 @@ func BuildKsqlQuery(metric MetricDescription, ksqlAppIds []string, resource Reso filters := make([]Filter, 0) connectorFilters := make([]Filter, 0) - for _, ksqlId := range ksqlAppIds { + for _, ksqlID := range ksqlAppIds { connectorFilters = append(connectorFilters, Filter{ Field: "resource.ksql.id", Op: "EQ", - Value: ksqlId, + Value: ksqlID, }) } diff --git a/grafana/ccloud-exporter.json b/grafana/ccloud-exporter.json index a672c54..fce8f50 100644 --- a/grafana/ccloud-exporter.json +++ b/grafana/ccloud-exporter.json @@ -1,8 +1,8 @@ { "__inputs": [ { - "name": "DS_PROMETHEUS-CCLOUD", - "label": "prometheus-ccloud", + "name": "DS_PROMETHEUS", + "label": "Prometheus", "description": "", "type": "datasource", "pluginId": "prometheus", @@ -14,7 +14,7 @@ "type": "grafana", "id": "grafana", "name": "Grafana", - "version": "6.7.3" + "version": "7.3.6" }, { "type": "panel", @@ -52,12 +52,12 @@ "gnetId": null, "graphTooltip": 0, "id": null, - "iteration": 1589557541338, + "iteration": 1610467085733, "links": [], "panels": [ { "collapsed": false, - "datasource": "${DS_PROMETHEUS-CCLOUD}", + "datasource": "${DS_PROMETHEUS}", "gridPos": { "h": 1, "w": 24, @@ -70,7 +70,37 @@ "type": "row" }, { - "datasource": "${DS_PROMETHEUS-CCLOUD}", + "datasource": "${DS_PROMETHEUS}", + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "custom": {}, + "mappings": [], + "max": 10000, + "min": 0, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "#EAB839", + "value": 4000 + }, + { + "color": "red", + "value": 10000 + } + ] + }, + "unit": "none" + }, + "overrides": [] + }, "gridPos": { "h": 5, "w": 3, @@ -80,47 +110,22 @@ "id": 19, "options": { "colorMode": "value", - "fieldOptions": { + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { "calcs": [ "last" ], - "defaults": { - "color": { - "mode": "thresholds" - }, - "mappings": [], - "max": 10000, - "min": 0, - "thresholds": { - "mode": "absolute", - "steps": [ - { - "color": "green", - "value": null - }, - { - "color": "#EAB839", - "value": 4000 - }, - { - "color": "red", - "value": 10000 - } - ] - }, - "unit": "none" - }, - "overrides": [], + "fields": "", "values": false }, - "graphMode": "area", - "justifyMode": "auto", - "orientation": "auto" + "textMode": "auto" }, - "pluginVersion": "6.7.3", + "pluginVersion": "7.3.6", "targets": [ { - "expr": "sum(ccloud_metric_partition_count{cluster_id=~\"$cluster\"})", + "expr": "sum(ccloud_metric_partition_count{kafka_id=~\"$cluster\"})", "hide": false, "interval": "", "legendFormat": "{{cluster}}", @@ -133,7 +138,37 @@ "type": "stat" }, { - "datasource": "${DS_PROMETHEUS-CCLOUD}", + "datasource": "${DS_PROMETHEUS}", + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "custom": {}, + "mappings": [], + "max": 5000000000, + "min": 0, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "#EAB839", + "value": 4000000000 + }, + { + "color": "red", + "value": 5000000000 + } + ] + }, + "unit": "decbytes" + }, + "overrides": [] + }, "gridPos": { "h": 5, "w": 3, @@ -143,47 +178,22 @@ "id": 8, "options": { "colorMode": "value", - "fieldOptions": { + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { "calcs": [ "lastNotNull" ], - "defaults": { - "color": { - "mode": "thresholds" - }, - "mappings": [], - "max": 5000000000, - "min": 0, - "thresholds": { - "mode": "absolute", - "steps": [ - { - "color": "green", - "value": null - }, - { - "color": "#EAB839", - "value": 4000000000 - }, - { - "color": "red", - "value": 5000000000 - } - ] - }, - "unit": "decbytes" - }, - "overrides": [], + "fields": "", "values": false }, - "graphMode": "area", - "justifyMode": "auto", - "orientation": "auto" + "textMode": "auto" }, - "pluginVersion": "6.7.3", + "pluginVersion": "7.3.6", "targets": [ { - "expr": "sum(ccloud_metric_retained_bytes{topic=~\"$topic\", cluster_id=~\"$cluster\"})", + "expr": "sum(ccloud_metric_retained_bytes{topic=~\"$topic\", kafka_id=~\"$cluster\"})", "hide": false, "interval": "", "legendFormat": "{{cluster}}", @@ -196,7 +206,37 @@ "type": "stat" }, { - "datasource": "${DS_PROMETHEUS-CCLOUD}", + "datasource": "${DS_PROMETHEUS}", + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "custom": {}, + "mappings": [], + "max": 25000000, + "min": 0, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "#EAB839", + "value": 1000000 + }, + { + "color": "red", + "value": 2500000 + } + ] + }, + "unit": "decbytes" + }, + "overrides": [] + }, "gridPos": { "h": 5, "w": 3, @@ -206,47 +246,22 @@ "id": 10, "options": { "colorMode": "value", - "fieldOptions": { + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { "calcs": [ "lastNotNull" ], - "defaults": { - "color": { - "mode": "thresholds" - }, - "mappings": [], - "max": 25000000, - "min": 0, - "thresholds": { - "mode": "absolute", - "steps": [ - { - "color": "green", - "value": null - }, - { - "color": "#EAB839", - "value": 1000000 - }, - { - "color": "red", - "value": 2500000 - } - ] - }, - "unit": "decbytes" - }, - "overrides": [], + "fields": "", "values": false }, - "graphMode": "area", - "justifyMode": "auto", - "orientation": "auto" + "textMode": "auto" }, - "pluginVersion": "6.7.3", + "pluginVersion": "7.3.6", "targets": [ { - "expr": "sum(ccloud_metric_sent_bytes{topic=~\"$topic\", cluster_id=~\"$cluster\"})", + "expr": "sum(ccloud_metric_sent_bytes{topic=~\"$topic\", kafka_id=~\"$cluster\"})", "interval": "", "legendFormat": "{{cluster}}", "refId": "A" @@ -258,8 +273,38 @@ "type": "stat" }, { - "datasource": "${DS_PROMETHEUS-CCLOUD}", + "datasource": "${DS_PROMETHEUS}", "description": "", + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "custom": {}, + "mappings": [], + "max": 25000000, + "min": 0, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "#EAB839", + "value": 1000000 + }, + { + "color": "red", + "value": 2500000 + } + ] + }, + "unit": "decbytes" + }, + "overrides": [] + }, "gridPos": { "h": 5, "w": 3, @@ -269,47 +314,22 @@ "id": 17, "options": { "colorMode": "value", - "fieldOptions": { + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { "calcs": [ "mean" ], - "defaults": { - "color": { - "mode": "thresholds" - }, - "mappings": [], - "max": 25000000, - "min": 0, - "thresholds": { - "mode": "absolute", - "steps": [ - { - "color": "green", - "value": null - }, - { - "color": "#EAB839", - "value": 1000000 - }, - { - "color": "red", - "value": 2500000 - } - ] - }, - "unit": "decbytes" - }, - "overrides": [], + "fields": "", "values": false }, - "graphMode": "area", - "justifyMode": "auto", - "orientation": "auto" + "textMode": "auto" }, - "pluginVersion": "6.7.3", + "pluginVersion": "7.3.6", "targets": [ { - "expr": "sum(ccloud_metric_received_bytes{topic=~\"$topic\", cluster_id=~\"$cluster\"})", + "expr": "sum(ccloud_metric_received_bytes{topic=~\"$topic\", kafka_id=~\"$cluster\"})", "interval": "", "legendFormat": "{{cluster}}", "refId": "A" @@ -321,7 +341,37 @@ "type": "stat" }, { - "datasource": "${DS_PROMETHEUS-CCLOUD}", + "datasource": "${DS_PROMETHEUS}", + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "custom": {}, + "mappings": [], + "max": 25000000, + "min": 0, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "#EAB839", + "value": 1000000 + }, + { + "color": "red", + "value": 2500000 + } + ] + }, + "unit": "decbytes" + }, + "overrides": [] + }, "gridPos": { "h": 5, "w": 3, @@ -331,47 +381,22 @@ "id": 18, "options": { "colorMode": "value", - "fieldOptions": { + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { "calcs": [ "mean" ], - "defaults": { - "color": { - "mode": "thresholds" - }, - "mappings": [], - "max": 25000000, - "min": 0, - "thresholds": { - "mode": "absolute", - "steps": [ - { - "color": "green", - "value": null - }, - { - "color": "#EAB839", - "value": 1000000 - }, - { - "color": "red", - "value": 2500000 - } - ] - }, - "unit": "decbytes" - }, - "overrides": [], + "fields": "", "values": false }, - "graphMode": "area", - "justifyMode": "auto", - "orientation": "auto" + "textMode": "auto" }, - "pluginVersion": "6.7.3", + "pluginVersion": "7.3.6", "targets": [ { - "expr": "sum(ccloud_metric_sent_bytes{topic=~\"$topic\", cluster_id=~\"$cluster\"})", + "expr": "sum(ccloud_metric_sent_bytes{topic=~\"$topic\", kafka_id=~\"$cluster\"})", "interval": "", "legendFormat": "{{cluster}}", "refId": "A" @@ -383,7 +408,37 @@ "type": "stat" }, { - "datasource": "${DS_PROMETHEUS-CCLOUD}", + "datasource": "${DS_PROMETHEUS}", + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "custom": {}, + "mappings": [], + "max": 25000000, + "min": 0, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "#EAB839", + "value": 1000000 + }, + { + "color": "red", + "value": 2500000 + } + ] + }, + "unit": "decbytes" + }, + "overrides": [] + }, "gridPos": { "h": 5, "w": 3, @@ -393,47 +448,22 @@ "id": 12, "options": { "colorMode": "value", - "fieldOptions": { + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { "calcs": [ "lastNotNull" ], - "defaults": { - "color": { - "mode": "thresholds" - }, - "mappings": [], - "max": 25000000, - "min": 0, - "thresholds": { - "mode": "absolute", - "steps": [ - { - "color": "green", - "value": null - }, - { - "color": "#EAB839", - "value": 1000000 - }, - { - "color": "red", - "value": 2500000 - } - ] - }, - "unit": "decbytes" - }, - "overrides": [], + "fields": "", "values": false }, - "graphMode": "area", - "justifyMode": "auto", - "orientation": "auto" + "textMode": "auto" }, - "pluginVersion": "6.7.3", + "pluginVersion": "7.3.6", "targets": [ { - "expr": "sum(ccloud_metric_received_bytes{topic=~\"$topic\", cluster_id=~\"$cluster\"})", + "expr": "sum(ccloud_metric_received_bytes{topic=~\"$topic\", kafka_id=~\"$cluster\"})", "interval": "", "legendFormat": "{{cluster}}", "refId": "A" @@ -445,7 +475,38 @@ "type": "stat" }, { - "datasource": "${DS_PROMETHEUS-CCLOUD}", + "datasource": "${DS_PROMETHEUS}", + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "custom": {}, + "displayName": "", + "mappings": [], + "max": 100, + "min": 0, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "#EAB839", + "value": 100 + }, + { + "color": "red", + "value": 1000 + } + ] + }, + "unit": "none" + }, + "overrides": [] + }, "gridPos": { "h": 5, "w": 3, @@ -455,48 +516,22 @@ "id": 22, "options": { "colorMode": "value", - "fieldOptions": { + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { "calcs": [ "max" ], - "defaults": { - "color": { - "mode": "thresholds" - }, - "mappings": [], - "max": 100, - "min": 0, - "thresholds": { - "mode": "absolute", - "steps": [ - { - "color": "green", - "value": null - }, - { - "color": "#EAB839", - "value": 100 - }, - { - "color": "red", - "value": 1000 - } - ] - }, - "title": "", - "unit": "none" - }, - "overrides": [], + "fields": "", "values": false }, - "graphMode": "area", - "justifyMode": "auto", - "orientation": "auto" + "textMode": "auto" }, - "pluginVersion": "6.7.3", + "pluginVersion": "7.3.6", "targets": [ { - "expr": "avg(ccloud_metric_active_connection_count{cluster_id=~\"$cluster\"})", + "expr": "avg(ccloud_metric_active_connection_count{kafka_id=~\"$cluster\"})", "hide": false, "interval": "", "legendFormat": "{{cluster}}", @@ -509,7 +544,37 @@ "type": "stat" }, { - "datasource": "${DS_PROMETHEUS-CCLOUD}", + "datasource": "${DS_PROMETHEUS}", + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "custom": {}, + "mappings": [], + "max": 90, + "min": 0, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "#EAB839", + "value": 30 + }, + { + "color": "red", + "value": 60 + } + ] + }, + "unit": "s" + }, + "overrides": [] + }, "gridPos": { "h": 5, "w": 3, @@ -519,44 +584,19 @@ "id": 23, "options": { "colorMode": "value", - "fieldOptions": { + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { "calcs": [ "max" ], - "defaults": { - "color": { - "mode": "thresholds" - }, - "mappings": [], - "max": 90, - "min": 0, - "thresholds": { - "mode": "absolute", - "steps": [ - { - "color": "green", - "value": null - }, - { - "color": "#EAB839", - "value": 30 - }, - { - "color": "red", - "value": 60 - } - ] - }, - "unit": "s" - }, - "overrides": [], + "fields": "", "values": false }, - "graphMode": "area", - "justifyMode": "auto", - "orientation": "auto" + "textMode": "auto" }, - "pluginVersion": "6.7.3", + "pluginVersion": "7.3.6", "targets": [ { "expr": "avg(ccloud_metrics_api_request_latency)", @@ -573,7 +613,7 @@ }, { "collapsed": false, - "datasource": "${DS_PROMETHEUS-CCLOUD}", + "datasource": "${DS_PROMETHEUS}", "gridPos": { "h": 1, "w": 24, @@ -590,8 +630,15 @@ "bars": false, "dashLength": 10, "dashes": false, - "datasource": "${DS_PROMETHEUS-CCLOUD}", + "datasource": "${DS_PROMETHEUS}", "description": "The current count of bytes retained by the cluster, summed across all partitions. The count is sampled every 60 seconds.", + "fieldConfig": { + "defaults": { + "custom": {}, + "links": [] + }, + "overrides": [] + }, "fill": 1, "fillGradient": 0, "gridPos": { @@ -619,9 +666,10 @@ "linewidth": 1, "nullPointMode": "null", "options": { - "dataLinks": [] + "alertThreshold": true }, "percentage": false, + "pluginVersion": "7.3.6", "pointradius": 2, "points": false, "renderer": "flot", @@ -631,10 +679,10 @@ "steppedLine": false, "targets": [ { - "expr": "sum by (cluster_id, topic) (ccloud_metric_retained_bytes{topic=~\"$topic\", cluster_id=~\"$cluster\"})", + "expr": "sum by (kafka_id, topic) (ccloud_metric_retained_bytes{topic=~\"$topic\", kafka_id=~\"$cluster\"})", "hide": false, "interval": "", - "legendFormat": "{{cluster_id}}-{{topic}}", + "legendFormat": "{{kafka_id}}-{{topic}}", "refId": "A" } ], @@ -685,8 +733,15 @@ "bars": false, "dashLength": 10, "dashes": false, - "datasource": "${DS_PROMETHEUS-CCLOUD}", + "datasource": "${DS_PROMETHEUS}", "description": "The delta count of bytes received from the network. Each sample is the number of bytes received since the previous data sample. The count is sampled every 60 seconds.", + "fieldConfig": { + "defaults": { + "custom": {}, + "links": [] + }, + "overrides": [] + }, "fill": 1, "fillGradient": 0, "gridPos": { @@ -710,9 +765,10 @@ "linewidth": 1, "nullPointMode": "null", "options": { - "dataLinks": [] + "alertThreshold": true }, "percentage": false, + "pluginVersion": "7.3.6", "pointradius": 2, "points": false, "renderer": "flot", @@ -722,10 +778,10 @@ "steppedLine": false, "targets": [ { - "expr": "sum by (topic, cluster_id) (ccloud_metric_received_bytes{topic=~\"$topic\", cluster_id=~\"$cluster\"})", + "expr": "sum by (topic, kafka_id) (ccloud_metric_received_bytes{topic=~\"$topic\", kafka_id=~\"$cluster\"})", "hide": false, "interval": "", - "legendFormat": "{{topic}} - {{cluster_id}}", + "legendFormat": "{{topic}} - {{kafka_id}}", "refId": "A" } ], @@ -776,8 +832,15 @@ "bars": false, "dashLength": 10, "dashes": false, - "datasource": "${DS_PROMETHEUS-CCLOUD}", + "datasource": "${DS_PROMETHEUS}", "description": "The delta count of bytes sent over the network. Each sample is the number of bytes sent since the previous data point. The count is sampled every 60 seconds.", + "fieldConfig": { + "defaults": { + "custom": {}, + "links": [] + }, + "overrides": [] + }, "fill": 1, "fillGradient": 0, "gridPos": { @@ -803,9 +866,10 @@ "linewidth": 1, "nullPointMode": "null", "options": { - "dataLinks": [] + "alertThreshold": true }, "percentage": false, + "pluginVersion": "7.3.6", "pointradius": 2, "points": false, "renderer": "flot", @@ -815,10 +879,10 @@ "steppedLine": false, "targets": [ { - "expr": "sum by (topic, cluster_id) (ccloud_metric_sent_bytes{topic=~\"$topic\", cluster_id=~\"$cluster\"})", + "expr": "sum by (topic, kafka_id) (ccloud_metric_sent_bytes{topic=~\"$topic\", kafka_id=~\"$cluster\"})", "hide": false, "interval": "", - "legendFormat": "{{topic}} - {{cluster_id}}", + "legendFormat": "{{topic}} - {{kafka_id}}", "refId": "A" } ], @@ -868,8 +932,15 @@ "bars": false, "dashLength": 10, "dashes": false, - "datasource": "${DS_PROMETHEUS-CCLOUD}", + "datasource": "${DS_PROMETHEUS}", "description": "The delta count of records received. Each sample is the number of records received since the previous data sample. The count is sampled every 60 seconds.", + "fieldConfig": { + "defaults": { + "custom": {}, + "links": [] + }, + "overrides": [] + }, "fill": 1, "fillGradient": 0, "gridPos": { @@ -893,9 +964,10 @@ "linewidth": 1, "nullPointMode": "null", "options": { - "dataLinks": [] + "alertThreshold": true }, "percentage": false, + "pluginVersion": "7.3.6", "pointradius": 2, "points": false, "renderer": "flot", @@ -905,10 +977,10 @@ "steppedLine": false, "targets": [ { - "expr": "sum by (topic, cluster_id) (ccloud_metric_received_records{topic=~\"$topic\", cluster_id=~\"$cluster\"})", + "expr": "sum by (topic, kafka_id) (ccloud_metric_received_records{topic=~\"$topic\", kafka_id=~\"$cluster\"})", "hide": false, "interval": "", - "legendFormat": "{{topic}} - {{cluster_id}}", + "legendFormat": "{{topic}} - {{kafka_id}}", "refId": "A" } ], @@ -959,8 +1031,15 @@ "bars": false, "dashLength": 10, "dashes": false, - "datasource": "${DS_PROMETHEUS-CCLOUD}", + "datasource": "${DS_PROMETHEUS}", "description": "The delta count of records sent. Each sample is the number of records sent since the previous data point. The count is sampled every 60 seconds.", + "fieldConfig": { + "defaults": { + "custom": {}, + "links": [] + }, + "overrides": [] + }, "fill": 1, "fillGradient": 0, "gridPos": { @@ -984,9 +1063,10 @@ "linewidth": 1, "nullPointMode": "null", "options": { - "dataLinks": [] + "alertThreshold": true }, "percentage": false, + "pluginVersion": "7.3.6", "pointradius": 2, "points": false, "renderer": "flot", @@ -996,10 +1076,10 @@ "steppedLine": false, "targets": [ { - "expr": "sum by (topic, cluster_id) (ccloud_metric_sent_records{topic=~\"$topic\", cluster_id=~\"$cluster\"})", + "expr": "sum by (topic, kafka_id) (ccloud_metric_sent_records{topic=~\"$topic\", kafka_id=~\"$cluster\"})", "hide": false, "interval": "", - "legendFormat": "{{topic}} - {{cluster_id}}", + "legendFormat": "{{topic}} - {{kafka_id}}", "refId": "A" } ], @@ -1050,8 +1130,15 @@ "bars": true, "dashLength": 10, "dashes": false, - "datasource": "${DS_PROMETHEUS-CCLOUD}", + "datasource": "${DS_PROMETHEUS}", "description": "The delta count of requests received over the network. Each sample is the number of requests received since the previous data point. The count sampled every 60 seconds.", + "fieldConfig": { + "defaults": { + "custom": {}, + "links": [] + }, + "overrides": [] + }, "fill": 1, "fillGradient": 0, "gridPos": { @@ -1079,9 +1166,10 @@ "linewidth": 1, "nullPointMode": "null", "options": { - "dataLinks": [] + "alertThreshold": true }, "percentage": false, + "pluginVersion": "7.3.6", "pointradius": 2, "points": false, "renderer": "flot", @@ -1091,10 +1179,10 @@ "steppedLine": false, "targets": [ { - "expr": "ccloud_metric_request_count{cluster_id=~\"$cluster\"}", + "expr": "ccloud_metric_request_count{kafka_id=~\"$cluster\"}", "hide": false, "interval": "", - "legendFormat": "{{cluster_id}}-{{type}}", + "legendFormat": "{{kafka_id}}-{{type}}", "refId": "A" } ], @@ -1141,302 +1229,964 @@ } }, { - "collapsed": false, - "datasource": "${DS_PROMETHEUS-CCLOUD}", + "collapsed": true, + "datasource": null, "gridPos": { "h": 1, "w": 24, "x": 0, "y": 36 }, - "id": 25, - "panels": [], - "title": "Metrics API", - "type": "row" - }, - { - "alert": { - "alertRuleTags": {}, - "conditions": [ - { - "evaluator": { - "params": [ - 30 + "id": 33, + "panels": [ + { + "datasource": "${DS_PROMETHEUS}", + "description": "", + "fieldConfig": { + "defaults": { + "custom": {}, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 1 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 3, + "w": 11, + "x": 0, + "y": 8 + }, + "id": 39, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "mean" ], - "type": "gt" + "fields": "", + "values": false }, - "operator": { - "type": "and" + "textMode": "auto" + }, + "pluginVersion": "7.3.6", + "targets": [ + { + "expr": "ccloud_metric_connector_dead_letter_queue_records{connector_kafka_id=~\"$cluster\"}", + "interval": "", + "legendFormat": "{{connector_kafka_id}} -- {{connector_id}}", + "queryType": "randomWalk", + "refId": "A" + } + ], + "timeFrom": null, + "timeShift": null, + "title": "Dead letter queue records", + "type": "stat" + }, + { + "datasource": "${DS_PROMETHEUS}", + "description": "", + "fieldConfig": { + "defaults": { + "custom": {}, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + } + ] + } }, - "query": { - "params": [ - "A", - "5m", - "now" - ] + "overrides": [] + }, + "gridPos": { + "h": 3, + "w": 7, + "x": 11, + "y": 8 + }, + "id": 40, + "options": { + "colorMode": "value", + "graphMode": "none", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "last" + ], + "fields": "", + "values": false }, - "reducer": { - "params": [], - "type": "max" + "textMode": "auto" + }, + "pluginVersion": "7.3.6", + "targets": [ + { + "expr": "count(ccloud_metric_connector_dead_letter_queue_records{connector_kafka_id=~\"$cluster\"})", + "interval": "", + "legendFormat": "", + "queryType": "randomWalk", + "refId": "A" + } + ], + "timeFrom": null, + "timeShift": null, + "title": "Number of connector", + "type": "stat" + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_PROMETHEUS}", + "fieldConfig": { + "defaults": { + "custom": {} }, - "type": "query" + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 9, + "w": 12, + "x": 0, + "y": 11 + }, + "hiddenSeries": false, + "id": 35, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "7.3.6", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "ccloud_metric_connector_sent_records{connector_kafka_id=~\"$cluster\"}", + "interval": "", + "legendFormat": "{{connector_kafka_id}} -- {{connector_id}}", + "queryType": "randomWalk", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Sent records", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null } - ], - "executionErrorState": "alerting", - "for": "5m", - "frequency": "1m", - "handler": 1, - "name": "Metrics API latency alert", - "noDataState": "no_data", - "notifications": [] - }, - "aliasColors": {}, - "bars": false, - "dashLength": 10, - "dashes": false, - "datasource": "${DS_PROMETHEUS-CCLOUD}", - "fill": 1, - "fillGradient": 0, - "gridPos": { - "h": 8, - "w": 12, - "x": 0, - "y": 37 - }, - "hiddenSeries": false, - "id": 27, - "legend": { - "avg": false, - "current": false, - "max": false, - "min": false, - "show": true, - "total": false, - "values": false - }, - "lines": true, - "linewidth": 1, - "nullPointMode": "null", - "options": { - "dataLinks": [] - }, - "percentage": false, - "pointradius": 2, - "points": false, - "renderer": "flot", - "seriesOverrides": [], - "spaceLength": 10, - "stack": false, - "steppedLine": false, - "targets": [ - { - "expr": "ccloud_metrics_api_request_latency", - "interval": "", - "legendFormat": "{{metric}}", - "refId": "A" - } - ], - "thresholds": [ - { - "colorMode": "critical", - "fill": true, - "line": true, - "op": "gt", - "value": 30 - } - ], - "timeFrom": null, - "timeRegions": [], - "timeShift": null, - "title": "Metrics API latency", - "tooltip": { - "shared": true, - "sort": 0, - "value_type": "individual" - }, - "type": "graph", - "xaxis": { - "buckets": null, - "mode": "time", - "name": null, - "show": true, - "values": [] - }, - "yaxes": [ + }, { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_PROMETHEUS}", + "fieldConfig": { + "defaults": { + "custom": {} + }, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 9, + "w": 12, + "x": 12, + "y": 11 + }, + "hiddenSeries": false, + "id": 38, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "7.3.6", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "ccloud_metric_connector_received_bytes{connector_kafka_id=~\"$cluster\"}", + "interval": "", + "legendFormat": "{{connector_kafka_id}} -- {{connector_id}}", + "queryType": "randomWalk", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Received bytes", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } }, { - "format": "s", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - } - ], - "yaxis": { - "align": false, - "alignLevel": null - } - }, - { - "alert": { - "alertRuleTags": {}, - "conditions": [ - { - "evaluator": { - "params": [ - 50 - ], - "type": "gt" + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_PROMETHEUS}", + "fieldConfig": { + "defaults": { + "custom": {} }, - "operator": { - "type": "and" + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 20 + }, + "hiddenSeries": false, + "id": 37, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "7.3.6", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "ccloud_metric_connector_sent_bytes{connector_kafka_id=~\"$cluster\"}", + "interval": "", + "legendFormat": "{{connector_kafka_id}} -- {{connector_id}}", + "queryType": "randomWalk", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Sent bytes", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true }, - "query": { - "params": [ - "A", - "5m", - "now" - ] + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_PROMETHEUS}", + "fieldConfig": { + "defaults": { + "custom": {} }, - "reducer": { - "params": [], - "type": "max" + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 20 + }, + "hiddenSeries": false, + "id": 36, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "7.3.6", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "ccloud_metric_connector_received_records{connector_kafka_id=~\"$cluster\"}}", + "interval": "", + "legendFormat": "{{connector_kafka_id}} -- {{connector_id}}", + "queryType": "randomWalk", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Received records", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true }, - "type": "query" + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null } - ], - "executionErrorState": "alerting", - "for": "5m", - "frequency": "1m", - "handler": 1, - "name": "Metrics API latency alert", - "noDataState": "no_data", - "notifications": [] - }, - "aliasColors": {}, - "bars": false, - "dashLength": 10, - "dashes": false, - "datasource": "${DS_PROMETHEUS-CCLOUD}", - "fill": 1, - "fillGradient": 0, + } + ], + "title": "Connectors", + "type": "row" + }, + { + "collapsed": true, + "datasource": null, "gridPos": { - "h": 8, - "w": 12, - "x": 12, + "h": 1, + "w": 24, + "x": 0, "y": 37 }, - "hiddenSeries": false, - "id": 28, - "legend": { - "avg": false, - "current": false, - "max": false, - "min": false, - "show": true, - "total": false, - "values": false - }, - "lines": true, - "linewidth": 1, - "nullPointMode": "null", - "options": { - "dataLinks": [] - }, - "percentage": false, - "pointradius": 2, - "points": false, - "renderer": "flot", - "seriesOverrides": [], - "spaceLength": 10, - "stack": false, - "steppedLine": false, - "targets": [ + "id": 42, + "panels": [ { - "expr": "scrape_duration_seconds{job=\"ccloudexporter\"}", - "interval": "", - "legendFormat": "{{instance}}", - "refId": "A" - } - ], - "thresholds": [ + "datasource": "${DS_PROMETHEUS}", + "description": "", + "fieldConfig": { + "defaults": { + "custom": {}, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 5, + "w": 4, + "x": 0, + "y": 9 + }, + "id": 44, + "options": { + "colorMode": "value", + "graphMode": "none", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "last" + ], + "fields": "", + "values": false + }, + "textMode": "auto" + }, + "pluginVersion": "7.3.6", + "targets": [ + { + "expr": "ccloud_metric_ksql_streaming_unit_count", + "interval": "", + "legendFormat": "", + "queryType": "randomWalk", + "refId": "A" + } + ], + "timeFrom": null, + "timeShift": null, + "title": "ksqlDB streaming unit", + "type": "stat" + }, { - "colorMode": "critical", - "fill": true, - "line": true, - "op": "gt", - "value": 50, - "yaxis": "left" + "datasource": "${DS_PROMETHEUS}", + "description": "", + "fieldConfig": { + "defaults": { + "custom": {}, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 5, + "w": 4, + "x": 4, + "y": 9 + }, + "id": 45, + "options": { + "colorMode": "value", + "graphMode": "none", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "last" + ], + "fields": "", + "values": false + }, + "textMode": "auto" + }, + "pluginVersion": "7.3.6", + "targets": [ + { + "expr": "count(ccloud_metric_ksql_streaming_unit_count)", + "interval": "", + "legendFormat": "", + "queryType": "randomWalk", + "refId": "A" + } + ], + "timeFrom": null, + "timeShift": null, + "title": "ksqlDB application count", + "type": "stat" } ], - "timeFrom": null, - "timeRegions": [], - "timeShift": null, - "title": "Scrape duration", - "tooltip": { - "shared": true, - "sort": 0, - "value_type": "individual" - }, - "type": "graph", - "xaxis": { - "buckets": null, - "mode": "time", - "name": null, - "show": true, - "values": [] + "title": "ksqlDB", + "type": "row" + }, + { + "collapsed": true, + "datasource": "${DS_PROMETHEUS}", + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 38 }, - "yaxes": [ + "id": 25, + "panels": [ { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true + "alert": { + "alertRuleTags": {}, + "conditions": [ + { + "evaluator": { + "params": [ + 30 + ], + "type": "gt" + }, + "operator": { + "type": "and" + }, + "query": { + "params": [ + "A", + "5m", + "now" + ] + }, + "reducer": { + "params": [], + "type": "max" + }, + "type": "query" + } + ], + "executionErrorState": "alerting", + "for": "5m", + "frequency": "1m", + "handler": 1, + "name": "Metrics API latency alert", + "noDataState": "no_data", + "notifications": [] + }, + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_PROMETHEUS}", + "fieldConfig": { + "defaults": { + "custom": {}, + "links": [] + }, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 10 + }, + "hiddenSeries": false, + "id": 27, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "7.3.6", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "ccloud_metrics_api_request_latency", + "interval": "", + "legendFormat": "{{metric}}", + "refId": "A" + } + ], + "thresholds": [ + { + "colorMode": "critical", + "fill": true, + "line": true, + "op": "gt", + "value": 30 + } + ], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Metrics API latency", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "s", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } }, { - "format": "s", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true + "alert": { + "alertRuleTags": {}, + "conditions": [ + { + "evaluator": { + "params": [ + 50 + ], + "type": "gt" + }, + "operator": { + "type": "and" + }, + "query": { + "params": [ + "A", + "5m", + "now" + ] + }, + "reducer": { + "params": [], + "type": "max" + }, + "type": "query" + } + ], + "executionErrorState": "alerting", + "for": "5m", + "frequency": "1m", + "handler": 1, + "name": "Metrics API latency alert", + "noDataState": "no_data", + "notifications": [] + }, + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_PROMETHEUS}", + "fieldConfig": { + "defaults": { + "custom": {}, + "links": [] + }, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 10 + }, + "hiddenSeries": false, + "id": 28, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "7.3.6", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "scrape_duration_seconds{job=\"ccloudexporter\"}", + "interval": "", + "legendFormat": "{{instance}}", + "refId": "A" + } + ], + "thresholds": [ + { + "colorMode": "critical", + "fill": true, + "line": true, + "op": "gt", + "value": 50 + } + ], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Scrape duration", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "s", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } } ], - "yaxis": { - "align": false, - "alignLevel": null - } + "title": "Metrics API", + "type": "row" } ], "refresh": "5s", - "schemaVersion": 22, + "schemaVersion": 26, "style": "dark", "tags": [], "templating": { "list": [ { - "allValue": "", + "allValue": ".*", "current": {}, - "datasource": "${DS_PROMETHEUS-CCLOUD}", - "definition": "label_values(ccloud_metric_retained_bytes, cluster_id)", + "datasource": "${DS_PROMETHEUS}", + "definition": "label_values(ccloud_metric_retained_bytes, kafka_id)", + "error": null, "hide": 0, "includeAll": true, - "index": -1, "label": "Cluster", "multi": true, "name": "cluster", "options": [], - "query": "label_values(ccloud_metric_retained_bytes, cluster_id)", + "query": "label_values(ccloud_metric_retained_bytes, kafka_id)", "refresh": 1, "regex": "", "skipUrlSync": false, @@ -1450,11 +2200,11 @@ { "allValue": "", "current": {}, - "datasource": "${DS_PROMETHEUS-CCLOUD}", + "datasource": "${DS_PROMETHEUS}", "definition": "label_values(ccloud_metric_retained_bytes, topic)", + "error": null, "hide": 0, "includeAll": true, - "index": -1, "label": "Topics", "multi": true, "name": "topic", @@ -1493,8 +2243,5 @@ "timezone": "", "title": "Confluent Cloud", "uid": "fhX5408Zk", - "variables": { - "list": [] - }, - "version": 12 + "version": 5 } From 0641ccae757da4a1b2d02acea1a027983122818d Mon Sep 17 00:00:00 2001 From: Damien Gasparina Date: Thu, 28 Jan 2021 12:13:39 +0100 Subject: [PATCH 3/5] Readme, bug fixes & updated Grafana dashboard --- README.md | 32 ++++++++++++++++++++++------ cmd/internal/collector/context.go | 2 +- cmd/internal/collector/option.go | 14 ++++++++++++- config/config.connector.yaml | 32 ++++++++++++++++++++++++++++ config/config.ksqldb.yaml | 35 +++++++++++++++++++++++++++++++ config/config.partition.yaml | 4 ++-- config/config.simple.yaml | 2 +- grafana/ccloud-exporter.json | 2 +- 8 files changed, 111 insertions(+), 12 deletions(-) create mode 100644 config/config.connector.yaml create mode 100644 config/config.ksqldb.yaml diff --git a/README.md b/README.md index 46ae093..41c7804 100644 --- a/README.md +++ b/README.md @@ -1,10 +1,9 @@ # Important information -In previous versions, it was possible to rely on username/password to authenticate to Confluent Cloud. -Nowadays, only the API key/secret is officially supported to connect to the Metrics API. - -To ensure backward compatibility, previous environment variables are still available. -Nonetheless, username/password is now **deprecated** and you **must** rely on API key/secret. +The exporter has been upgraded to leverage the V2 of the Metrics API. +In the V2, `cluster_id` labels has been renamed `kafka_id`. +Upgrading to the latest version of the exporter will certainly break your dashboard. +You need to rename `cluster_id` to `kafka_id` in your dashboards. # Prometheus exporter for Confluent Cloud Metrics API @@ -124,7 +123,9 @@ If you do not provide a configuration file, the exporter creates one from the pr | Key | Description | |--------------------|---------------------------------------------------------------------------------------------------------------| -| rules.clusters | List of clusters to fetch metrics from | +| rules.clusters | List of Kafka clusters to fetch metrics for | +| rules.connectors | List of connectors to fetch metrics for | +| rules.ksqls | List of ksqlDB applications to fetch metrics for | | rules.labels | Labels to exposed to Prometheus and group by in the query | | rules.topics | Optional list of topics to filter the metrics | | rules.metrics | List of metrics to gather | @@ -183,3 +184,22 @@ go get github.com/Dabz/ccloudexporter/cmd/ccloudexporter A Grafana dashboard is provided in [./grafana/](./grafana) folder. ![Grafana Screenshot](./grafana/grafana.png) + +# Deprecated configuration + +## cluster_id is deprecated + +Metrics API v1 exposed the ID of the cluster with the label `label.cluster_id`, and it was exposed as `cluster_id` by the exporter. +In the V2, this label has been renamed to `resource.kafka.id`. It is now exposed by the exporter as `kafka_id` instead. + +Upgrading to the latest version of the exporter will certainly break your dashboard as you will need to rename `cluster_id` to `kafka_id`. + +## Username/Password authentication is deprecated + +In previous versions, it was possible to rely on username/password to authenticate to Confluent Cloud. +Nowadays, only the API key/secret is officially supported to connect to the Metrics API. + +To ensure backward compatibility, previous environment variables are still available. +Nonetheless, username/password is now **deprecated** and you **must** rely on API key/secret. + + diff --git a/cmd/internal/collector/context.go b/cmd/internal/collector/context.go index 7a4f6ea..6ddce96 100644 --- a/cmd/internal/collector/context.go +++ b/cmd/internal/collector/context.go @@ -50,7 +50,7 @@ var Context = ExporterContext{} // DefaultGroupingLabels is the default value for groupBy.labels var DefaultGroupingLabels = []string{ - "cluster_id", + "kafka.id", "topic", "type", } diff --git a/cmd/internal/collector/option.go b/cmd/internal/collector/option.go index 682b2bf..2445429 100644 --- a/cmd/internal/collector/option.go +++ b/cmd/internal/collector/option.go @@ -146,7 +146,7 @@ func parseConfigFile(configPath string) { viper.UnmarshalKey("rules", &Context.Rules) for i, rule := range Context.Rules { rule.id = i - Context.Rules[i] = rule + Context.Rules[i] = upgradeRuleIfRequired(rule) } } @@ -160,6 +160,18 @@ func createDefaultRule(cluster string) { } } +func upgradeRuleIfRequired(rule Rule) Rule { + for i, labelsToGroupBy := range(rule.GroupByLabels) { + // In Metrics API v2, label.cluster_id has been replaced by + // ressource.kafka.id + if labelsToGroupBy == "cluster_id" { + rule.GroupByLabels[i] = "kafka.id" + } + } + + return rule +} + func setStringIfExit(destination *string, key string) { if viper.Get(key) != nil { *destination = viper.GetString(key) diff --git a/config/config.connector.yaml b/config/config.connector.yaml new file mode 100644 index 0000000..11be85e --- /dev/null +++ b/config/config.connector.yaml @@ -0,0 +1,32 @@ +config: + http: + baseurl: https://api.telemetry.confluent.cloud/ + timeout: 60 + listener: 0.0.0.0:2112 + noTimestamp: false + delay: 120 + granularity: PT1M +rules: + - clusters: + - lkc-xxxxx + connectors: + - lcc-xxxxx + metrics: + - io.confluent.kafka.server/received_bytes + - io.confluent.kafka.server/sent_bytes + - io.confluent.kafka.server/received_records + - io.confluent.kafka.server/sent_records + - io.confluent.kafka.server/retained_bytes + - io.confluent.kafka.server/active_connection_count + - io.confluent.kafka.server/request_count + - io.confluent.kafka.server/partition_count + - io.confluent.kafka.server/successful_authentication_count + - io.confluent.kafka.connect/sent_bytes + - io.confluent.kafka.connect/received_bytes + - io.confluent.kafka.connect/received_records + - io.confluent.kafka.connect/sent_records + - io.confluent.kafka.connect/dead_letter_queue_records + labels: + - kafka.id + - topic + - type diff --git a/config/config.ksqldb.yaml b/config/config.ksqldb.yaml new file mode 100644 index 0000000..7a73efe --- /dev/null +++ b/config/config.ksqldb.yaml @@ -0,0 +1,35 @@ +config: + http: + baseurl: https://api.telemetry.confluent.cloud/ + timeout: 60 + listener: 0.0.0.0:2112 + noTimestamp: false + delay: 120 + granularity: PT1M +rules: + - clusters: + - lkc-xxxxx + connectors: + - lcc-xxxxx + ksqls: + - lksqlc-xxxxx + metrics: + - io.confluent.kafka.server/received_bytes + - io.confluent.kafka.server/sent_bytes + - io.confluent.kafka.server/received_records + - io.confluent.kafka.server/sent_records + - io.confluent.kafka.server/retained_bytes + - io.confluent.kafka.server/active_connection_count + - io.confluent.kafka.server/request_count + - io.confluent.kafka.server/partition_count + - io.confluent.kafka.server/successful_authentication_count + - io.confluent.kafka.connect/sent_bytes + - io.confluent.kafka.connect/received_bytes + - io.confluent.kafka.connect/received_records + - io.confluent.kafka.connect/sent_records + - io.confluent.kafka.connect/dead_letter_queue_records + - io.confluent.kafka.ksql/streaming_unit_count + labels: + - kafka.id + - topic + - type diff --git a/config/config.partition.yaml b/config/config.partition.yaml index 2d2adee..ebdea72 100644 --- a/config/config.partition.yaml +++ b/config/config.partition.yaml @@ -20,7 +20,7 @@ rules: - io.confluent.kafka.server/partition_count - io.confluent.kafka.server/successful_authentication_count labels: - - cluster_id + - kafka.id - topic - type - clusters: @@ -31,7 +31,7 @@ rules: metrics: - io.confluent.kafka.server/retained_bytes labels: - - cluster_id + - kafka.id - topic - type - partition diff --git a/config/config.simple.yaml b/config/config.simple.yaml index bb8bd38..46dc036 100644 --- a/config/config.simple.yaml +++ b/config/config.simple.yaml @@ -20,6 +20,6 @@ rules: - io.confluent.kafka.server/partition_count - io.confluent.kafka.server/successful_authentication_count labels: - - cluster_id + - kafka.id - topic - type diff --git a/grafana/ccloud-exporter.json b/grafana/ccloud-exporter.json index fce8f50..08cf8f9 100644 --- a/grafana/ccloud-exporter.json +++ b/grafana/ccloud-exporter.json @@ -1689,7 +1689,7 @@ "steppedLine": false, "targets": [ { - "expr": "ccloud_metric_connector_received_records{connector_kafka_id=~\"$cluster\"}}", + "expr": "ccloud_metric_connector_received_records{connector_kafka_id=~\"$cluster\"}", "interval": "", "legendFormat": "{{connector_kafka_id}} -- {{connector_id}}", "queryType": "randomWalk", From 2d9569729ae6c55d71b84a02371741e71ed6eb0f Mon Sep 17 00:00:00 2001 From: Damien Gasparina Date: Thu, 28 Jan 2021 15:44:52 +0100 Subject: [PATCH 4/5] Readme updated --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 41c7804..b7e7341 100644 --- a/README.md +++ b/README.md @@ -160,7 +160,7 @@ rules: - io.confluent.kafka.server/partition_count - io.confluent.kafka.server/successful_authentication_count labels: - - cluster_id + - kafka_id - topic - type ``` From 188e940517cd326124d65623e8a725a45fb6b2b6 Mon Sep 17 00:00:00 2001 From: Damien Gasparina Date: Thu, 28 Jan 2021 16:10:21 +0100 Subject: [PATCH 5/5] Duplicating kafka_id as cluster_id && Readme updated --- README.md | 13 +++---------- cmd/internal/collector/collector_kafka.go | 8 ++++++++ cmd/internal/collector/option.go | 2 +- 3 files changed, 12 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index b7e7341..1e22dee 100644 --- a/README.md +++ b/README.md @@ -1,10 +1,3 @@ -# Important information - -The exporter has been upgraded to leverage the V2 of the Metrics API. -In the V2, `cluster_id` labels has been renamed `kafka_id`. -Upgrading to the latest version of the exporter will certainly break your dashboard. -You need to rename `cluster_id` to `kafka_id` in your dashboards. - # Prometheus exporter for Confluent Cloud Metrics API A simple prometheus exporter that can be used to extract metrics from [Confluent Cloud Metric API](https://docs.confluent.io/current/cloud/metrics-api.html). @@ -189,10 +182,10 @@ A Grafana dashboard is provided in [./grafana/](./grafana) folder. ## cluster_id is deprecated -Metrics API v1 exposed the ID of the cluster with the label `label.cluster_id`, and it was exposed as `cluster_id` by the exporter. -In the V2, this label has been renamed to `resource.kafka.id`. It is now exposed by the exporter as `kafka_id` instead. +Historically, the exporter and the Metrics API exposed the ID of the cluster with the label `cluster_id`. +In the Metrics API V2, this label has been renamed to `resource.kafka.id`. It is now exposed by the exporter as `kafka_id` instead. -Upgrading to the latest version of the exporter will certainly break your dashboard as you will need to rename `cluster_id` to `kafka_id`. +To avoid breaking previous dashboard, the exporter is exposing, for the moment, the ID of the cluster as `cluster_id` and `kafka_id`. ## Username/Password authentication is deprecated diff --git a/cmd/internal/collector/collector_kafka.go b/cmd/internal/collector/collector_kafka.go index 8325efe..6d32933 100644 --- a/cmd/internal/collector/collector_kafka.go +++ b/cmd/internal/collector/collector_kafka.go @@ -101,6 +101,10 @@ func (cc KafkaCCloudCollector) handleResponse(response QueryResponse, ccmetric C labels := []string{} for _, label := range ccmetric.labels { + // For compatibility reason, kafka_id label is also added as cluster_id + if label == "cluster_id" { + label = "kafka_id" + } name := cc.resource.datapointFieldNameForLabel(label) labelValue, labelValuePresent := dataPoint[name].(string) if !labelValuePresent { @@ -152,6 +156,10 @@ func NewKafkaCCloudCollector(ccloudcollecter CCloudCollector, resource ResourceD var labels []string for _, rsrcLabel := range resource.Labels { labels = append(labels, GetPrometheusNameForLabel(rsrcLabel.Key)) + // For retro-compatibility, kafka_id is also exposed as cluster_id + if rsrcLabel.Key == "kafka.id" { + labels = append(labels, "cluster_id") + } } for _, metrLabel := range metr.Labels { labels = append(labels, metrLabel.Key) diff --git a/cmd/internal/collector/option.go b/cmd/internal/collector/option.go index 2445429..7057dd7 100644 --- a/cmd/internal/collector/option.go +++ b/cmd/internal/collector/option.go @@ -161,7 +161,7 @@ func createDefaultRule(cluster string) { } func upgradeRuleIfRequired(rule Rule) Rule { - for i, labelsToGroupBy := range(rule.GroupByLabels) { + for i, labelsToGroupBy := range rule.GroupByLabels { // In Metrics API v2, label.cluster_id has been replaced by // ressource.kafka.id if labelsToGroupBy == "cluster_id" {