Skip to content
This repository has been archived by the owner on Oct 25, 2023. It is now read-only.

Commit

Permalink
Migration to Metrics API V2 (#59)
Browse files Browse the repository at this point in the history
* Migration to Metrics API v2
  • Loading branch information
Dabz authored Jan 28, 2021
1 parent 95c8540 commit 921aec3
Show file tree
Hide file tree
Showing 20 changed files with 2,275 additions and 748 deletions.
33 changes: 23 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,3 @@
# Important information

In previous versions, it was possible to rely on username/password to authenticate to Confluent Cloud.
Nowadays, only the API key/secret is officially supported to connect to the Metrics API.

To ensure backward compatibility, previous environment variables are still available.
Nonetheless, username/password is now **deprecated** and you **must** rely on API key/secret.

# Prometheus exporter for Confluent Cloud Metrics API

A simple prometheus exporter that can be used to extract metrics from [Confluent Cloud Metric API](https://docs.confluent.io/current/cloud/metrics-api.html).
Expand Down Expand Up @@ -124,7 +116,9 @@ If you do not provide a configuration file, the exporter creates one from the pr

| Key | Description |
|--------------------|---------------------------------------------------------------------------------------------------------------|
| rules.clusters | List of clusters to fetch metrics from |
| rules.clusters | List of Kafka clusters to fetch metrics for |
| rules.connectors | List of connectors to fetch metrics for |
| rules.ksqls | List of ksqlDB applications to fetch metrics for |
| rules.labels | Labels to exposed to Prometheus and group by in the query |
| rules.topics | Optional list of topics to filter the metrics |
| rules.metrics | List of metrics to gather |
Expand Down Expand Up @@ -159,7 +153,7 @@ rules:
- io.confluent.kafka.server/partition_count
- io.confluent.kafka.server/successful_authentication_count
labels:
- cluster_id
- kafka_id
- topic
- type
```
Expand All @@ -183,3 +177,22 @@ go get github.com/Dabz/ccloudexporter/cmd/ccloudexporter
A Grafana dashboard is provided in [./grafana/](./grafana) folder.
![Grafana Screenshot](./grafana/grafana.png)
# Deprecated configuration
## cluster_id is deprecated
Historically, the exporter and the Metrics API exposed the ID of the cluster with the label `cluster_id`.
In the Metrics API V2, this label has been renamed to `resource.kafka.id`. It is now exposed by the exporter as `kafka_id` instead.
To avoid breaking previous dashboard, the exporter is exposing, for the moment, the ID of the cluster as `cluster_id` and `kafka_id`.
## Username/Password authentication is deprecated
In previous versions, it was possible to rely on username/password to authenticate to Confluent Cloud.
Nowadays, only the API key/secret is officially supported to connect to the Metrics API.
To ensure backward compatibility, previous environment variables are still available.
Nonetheless, username/password is now **deprecated** and you **must** rely on API key/secret.
2 changes: 1 addition & 1 deletion cmd/ccloudexporter/ccloudexporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import (
"github.com/Dabz/ccloudexporter/cmd/internal/collector"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"net/http"
log "github.com/sirupsen/logrus"
"net/http"
)

func main() {
Expand Down
183 changes: 50 additions & 133 deletions cmd/internal/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@ package collector
//

import (
"fmt"
"net/http"
"strconv"
"sync"
"time"

Expand All @@ -31,161 +29,80 @@ type CCloudCollectorMetric struct {
// CCloudCollector is a custom prometheu collector to collect data from
// Confluent Cloud Metrics API
type CCloudCollector struct {
metrics map[string]CCloudCollectorMetric
rules []Rule
}

// Describe collect all metrics for ccloudexporter
func (cc CCloudCollector) Describe(ch chan<- *prometheus.Desc) {
for _, desc := range cc.metrics {
ch <- desc.desc
desc.duration.Describe(ch)
}
metrics map[string]CCloudCollectorMetric
rules []Rule
kafkaCollector *KafkaCCloudCollector
connectorCollector *ConnectorCCloudCollector
ksqlCollector *KsqlCCloudCollector
}

var (
httpClient http.Client
)

// Describe collect all metrics for ccloudexporter
func (cc CCloudCollector) Describe(ch chan<- *prometheus.Desc) {
cc.kafkaCollector.Describe(ch)
cc.connectorCollector.Describe(ch)
cc.ksqlCollector.Describe(ch)
}

// Collect all metrics for Prometheus
// to avoid reaching the scrape_timeout, metrics are fetched in multiple goroutine
func (cc CCloudCollector) Collect(ch chan<- prometheus.Metric) {
var wg sync.WaitGroup
for _, rule := range cc.rules {
for _, metric := range rule.Metrics {
wg.Add(1)
go cc.CollectMetricsForRule(&wg, ch, rule, cc.metrics[metric])
}
}

cc.kafkaCollector.Collect(ch, &wg)
cc.connectorCollector.Collect(ch, &wg)
cc.ksqlCollector.Collect(ch, &wg)
wg.Wait()
}

// CollectMetricsForRule collects all metrics for a specific rule
func (cc CCloudCollector) CollectMetricsForRule(wg *sync.WaitGroup, ch chan<- prometheus.Metric, rule Rule, ccmetric CCloudCollectorMetric) {
defer wg.Done()
query := BuildQuery(ccmetric.metric, rule.Clusters, rule.GroupByLabels, rule.Topics)
log.WithFields(log.Fields{"query": query}).Traceln("The following query has been created")
optimizedQuery, additionalLabels := OptimizeQuery(query)
log.WithFields(log.Fields{"optimizedQuery": optimizedQuery, "additionalLabels": additionalLabels}).Traceln("Query has been optimized")
durationMetric, _ := ccmetric.duration.GetMetricWithLabelValues(strconv.Itoa(rule.id))
timer := prometheus.NewTimer(prometheus.ObserverFunc(durationMetric.Set))
response, err := SendQuery(optimizedQuery)
timer.ObserveDuration()
ch <- durationMetric
if err != nil {
log.WithError(err).WithFields(log.Fields{"optimizedQuery": optimizedQuery, "response": response}).Errorln("Query did not succeed")
return
}
log.WithFields(log.Fields{"response": response}).Traceln("Response has been received")
cc.handleResponse(response, ccmetric, ch, rule, additionalLabels)
}

func (cc CCloudCollector) handleResponse(response QueryResponse, ccmetric CCloudCollectorMetric, ch chan<- prometheus.Metric, rule Rule, additionalLabels map[string]string) {
desc := ccmetric.desc
for _, dataPoint := range response.Data {
// Some data points might need to be ignored if it is the global query
topic, topicPresent := dataPoint["metric.label.topic"].(string)
cluster, clusterPresent := dataPoint["metric.label.cluster_id"].(string)

if !clusterPresent {
cluster, clusterPresent = additionalLabels["metric.label.cluster_id"]
}

if !topicPresent {
topic, topicPresent = additionalLabels["metric.label.topic"]
}

if topicPresent && clusterPresent && rule.ShouldIgnoreResultForRule(topic, cluster, ccmetric.metric.Name) {
continue
}

value, ok := dataPoint["value"].(float64)
if !ok {
log.WithField("datapoint", dataPoint["value"]).Errorln("Can not convert result to float")
return
}

labels := []string{}
for _, label := range ccmetric.labels {
labelValue, labelValuePresent := dataPoint["metric.label."+label].(string)
if !labelValuePresent {
labelValue, labelValuePresent = additionalLabels["metric.label."+label]
}
labels = append(labels, labelValue)
}

metric := prometheus.MustNewConstMetric(
desc,
prometheus.GaugeValue,
value,
labels...,
)

if Context.NoTimestamp {
ch <- metric
} else {
timestamp, err := time.Parse(time.RFC3339, fmt.Sprint(dataPoint["timestamp"]))
if err != nil {
log.WithError(err).Errorln("Can not parse timestamp, ignoring the response")
return
}
metricWithTime := prometheus.NewMetricWithTimestamp(timestamp, metric)
ch <- metricWithTime
}
}
}

// NewCCloudCollector creates a new instance of the collector
// During the creation, we invoke the descriptor endpoint to fetcha all
// existing metrics and their labels
func NewCCloudCollector() CCloudCollector {
collector := CCloudCollector{rules: Context.Rules, metrics: make(map[string]CCloudCollectorMetric)}
descriptorResponse := SendDescriptorQuery()
log.WithField("descriptor response", descriptorResponse).Traceln("The following response for the descriptor endpoint has been received")
mapOfWhiteListedMetrics := Context.GetMapOfMetrics()

for _, metr := range descriptorResponse.Data {
_, metricPresent := mapOfWhiteListedMetrics[metr.Name]
if !metricPresent {
continue
}
delete(mapOfWhiteListedMetrics, metr.Name)
metr.Labels = append(metr.Labels, MetricLabel{Key: "cluster_id", Description: "Cluster ID"})
var labels []string
for _, metrLabel := range metr.Labels {
labels = append(labels, metrLabel.Key)
}

desc := prometheus.NewDesc(
"ccloud_metric_"+GetNiceNameForMetric(metr),
metr.Description,
labels,
nil,
)

requestDuration := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "ccloud_metrics_api_request_latency",
Help: "Metrics API request latency",
ConstLabels: map[string]string{"metric": metr.Name},
}, []string{"ruleNumber"})

metric := CCloudCollectorMetric{
metric: metr,
desc: desc,
duration: requestDuration,
labels: labels,
log.Traceln("Creating http client")
httpClient = http.Client{
Timeout: time.Second * time.Duration(Context.HTTPTimeout),
}

var (
connectorResource ResourceDescription
kafkaResource ResourceDescription
ksqlResource ResourceDescription
)
resourceDescription := SendResourceDescriptorQuery()
for _, resource := range resourceDescription.Data {
if resource.Type == "connector" {
connectorResource = resource
} else if resource.Type == "kafka" {
kafkaResource = resource
} else if resource.Type == "ksql" {
ksqlResource = resource
}
collector.metrics[metr.Name] = metric
}

httpClient = http.Client{
Timeout: time.Second * time.Duration(Context.HTTPTimeout),
if connectorResource.Type == "" {
log.WithField("descriptorResponse", resourceDescription).Fatalln("No connector resource available")
}

if len(mapOfWhiteListedMetrics) > 0 {
log.WithField("Ignored metrics", mapOfWhiteListedMetrics).Warnln("The following metrics will not be gathered as they are not exposed by the Metrics API")
if kafkaResource.Type == "" {
log.WithField("descriptorResponse", resourceDescription).Fatalln("No kafka resource available")
}

if ksqlResource.Type == "" {
log.WithField("descriptorResponse", resourceDescription).Fatalln("No ksqlDB resource available")
}

collector := CCloudCollector{rules: Context.Rules, metrics: make(map[string]CCloudCollectorMetric)}
kafkaCollector := NewKafkaCCloudCollector(collector, kafkaResource)
connectorCollector := NewConnectorCCloudCollector(collector, connectorResource)
ksqlCollector := NewKsqlCCloudCollector(collector, ksqlResource)

collector.kafkaCollector = &kafkaCollector
collector.connectorCollector = &connectorCollector
collector.ksqlCollector = &ksqlCollector

return collector
}
Loading

0 comments on commit 921aec3

Please sign in to comment.