Skip to content
This repository has been archived by the owner on Oct 25, 2023. It is now read-only.

Migration to Metrics API V2 #59

Merged
merged 5 commits into from
Jan 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 23 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,3 @@
# Important information

In previous versions, it was possible to rely on username/password to authenticate to Confluent Cloud.
Nowadays, only the API key/secret is officially supported to connect to the Metrics API.

To ensure backward compatibility, previous environment variables are still available.
Nonetheless, username/password is now **deprecated** and you **must** rely on API key/secret.

# Prometheus exporter for Confluent Cloud Metrics API

A simple prometheus exporter that can be used to extract metrics from [Confluent Cloud Metric API](https://docs.confluent.io/current/cloud/metrics-api.html).
Expand Down Expand Up @@ -124,7 +116,9 @@ If you do not provide a configuration file, the exporter creates one from the pr

| Key | Description |
|--------------------|---------------------------------------------------------------------------------------------------------------|
| rules.clusters | List of clusters to fetch metrics from |
| rules.clusters | List of Kafka clusters to fetch metrics for |
| rules.connectors | List of connectors to fetch metrics for |
| rules.ksqls | List of ksqlDB applications to fetch metrics for |
| rules.labels | Labels to exposed to Prometheus and group by in the query |
| rules.topics | Optional list of topics to filter the metrics |
| rules.metrics | List of metrics to gather |
Expand Down Expand Up @@ -159,7 +153,7 @@ rules:
- io.confluent.kafka.server/partition_count
- io.confluent.kafka.server/successful_authentication_count
labels:
- cluster_id
- kafka_id
- topic
- type
```
Expand All @@ -183,3 +177,22 @@ go get github.com/Dabz/ccloudexporter/cmd/ccloudexporter
A Grafana dashboard is provided in [./grafana/](./grafana) folder.

![Grafana Screenshot](./grafana/grafana.png)

# Deprecated configuration

## cluster_id is deprecated

Historically, the exporter and the Metrics API exposed the ID of the cluster with the label `cluster_id`.
In the Metrics API V2, this label has been renamed to `resource.kafka.id`. It is now exposed by the exporter as `kafka_id` instead.

To avoid breaking previous dashboard, the exporter is exposing, for the moment, the ID of the cluster as `cluster_id` and `kafka_id`.

## Username/Password authentication is deprecated

In previous versions, it was possible to rely on username/password to authenticate to Confluent Cloud.
Nowadays, only the API key/secret is officially supported to connect to the Metrics API.

To ensure backward compatibility, previous environment variables are still available.
Nonetheless, username/password is now **deprecated** and you **must** rely on API key/secret.


2 changes: 1 addition & 1 deletion cmd/ccloudexporter/ccloudexporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import (
"github.com/Dabz/ccloudexporter/cmd/internal/collector"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"net/http"
log "github.com/sirupsen/logrus"
"net/http"
)

func main() {
Expand Down
183 changes: 50 additions & 133 deletions cmd/internal/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@ package collector
//

import (
"fmt"
"net/http"
"strconv"
"sync"
"time"

Expand All @@ -31,161 +29,80 @@ type CCloudCollectorMetric struct {
// CCloudCollector is a custom prometheu collector to collect data from
// Confluent Cloud Metrics API
type CCloudCollector struct {
metrics map[string]CCloudCollectorMetric
rules []Rule
}

// Describe collect all metrics for ccloudexporter
func (cc CCloudCollector) Describe(ch chan<- *prometheus.Desc) {
for _, desc := range cc.metrics {
ch <- desc.desc
desc.duration.Describe(ch)
}
metrics map[string]CCloudCollectorMetric
rules []Rule
kafkaCollector *KafkaCCloudCollector
connectorCollector *ConnectorCCloudCollector
ksqlCollector *KsqlCCloudCollector
}

var (
httpClient http.Client
)

// Describe collect all metrics for ccloudexporter
func (cc CCloudCollector) Describe(ch chan<- *prometheus.Desc) {
cc.kafkaCollector.Describe(ch)
cc.connectorCollector.Describe(ch)
cc.ksqlCollector.Describe(ch)
}

// Collect all metrics for Prometheus
// to avoid reaching the scrape_timeout, metrics are fetched in multiple goroutine
func (cc CCloudCollector) Collect(ch chan<- prometheus.Metric) {
var wg sync.WaitGroup
for _, rule := range cc.rules {
for _, metric := range rule.Metrics {
wg.Add(1)
go cc.CollectMetricsForRule(&wg, ch, rule, cc.metrics[metric])
}
}

cc.kafkaCollector.Collect(ch, &wg)
cc.connectorCollector.Collect(ch, &wg)
cc.ksqlCollector.Collect(ch, &wg)
wg.Wait()
}

// CollectMetricsForRule collects all metrics for a specific rule
func (cc CCloudCollector) CollectMetricsForRule(wg *sync.WaitGroup, ch chan<- prometheus.Metric, rule Rule, ccmetric CCloudCollectorMetric) {
defer wg.Done()
query := BuildQuery(ccmetric.metric, rule.Clusters, rule.GroupByLabels, rule.Topics)
log.WithFields(log.Fields{"query": query}).Traceln("The following query has been created")
optimizedQuery, additionalLabels := OptimizeQuery(query)
log.WithFields(log.Fields{"optimizedQuery": optimizedQuery, "additionalLabels": additionalLabels}).Traceln("Query has been optimized")
durationMetric, _ := ccmetric.duration.GetMetricWithLabelValues(strconv.Itoa(rule.id))
timer := prometheus.NewTimer(prometheus.ObserverFunc(durationMetric.Set))
response, err := SendQuery(optimizedQuery)
timer.ObserveDuration()
ch <- durationMetric
if err != nil {
log.WithError(err).WithFields(log.Fields{"optimizedQuery": optimizedQuery, "response": response}).Errorln("Query did not succeed")
return
}
log.WithFields(log.Fields{"response": response}).Traceln("Response has been received")
cc.handleResponse(response, ccmetric, ch, rule, additionalLabels)
}

func (cc CCloudCollector) handleResponse(response QueryResponse, ccmetric CCloudCollectorMetric, ch chan<- prometheus.Metric, rule Rule, additionalLabels map[string]string) {
desc := ccmetric.desc
for _, dataPoint := range response.Data {
// Some data points might need to be ignored if it is the global query
topic, topicPresent := dataPoint["metric.label.topic"].(string)
cluster, clusterPresent := dataPoint["metric.label.cluster_id"].(string)

if !clusterPresent {
cluster, clusterPresent = additionalLabels["metric.label.cluster_id"]
}

if !topicPresent {
topic, topicPresent = additionalLabels["metric.label.topic"]
}

if topicPresent && clusterPresent && rule.ShouldIgnoreResultForRule(topic, cluster, ccmetric.metric.Name) {
continue
}

value, ok := dataPoint["value"].(float64)
if !ok {
log.WithField("datapoint", dataPoint["value"]).Errorln("Can not convert result to float")
return
}

labels := []string{}
for _, label := range ccmetric.labels {
labelValue, labelValuePresent := dataPoint["metric.label."+label].(string)
if !labelValuePresent {
labelValue, labelValuePresent = additionalLabels["metric.label."+label]
}
labels = append(labels, labelValue)
}

metric := prometheus.MustNewConstMetric(
desc,
prometheus.GaugeValue,
value,
labels...,
)

if Context.NoTimestamp {
ch <- metric
} else {
timestamp, err := time.Parse(time.RFC3339, fmt.Sprint(dataPoint["timestamp"]))
if err != nil {
log.WithError(err).Errorln("Can not parse timestamp, ignoring the response")
return
}
metricWithTime := prometheus.NewMetricWithTimestamp(timestamp, metric)
ch <- metricWithTime
}
}
}

// NewCCloudCollector creates a new instance of the collector
// During the creation, we invoke the descriptor endpoint to fetcha all
// existing metrics and their labels
func NewCCloudCollector() CCloudCollector {
collector := CCloudCollector{rules: Context.Rules, metrics: make(map[string]CCloudCollectorMetric)}
descriptorResponse := SendDescriptorQuery()
log.WithField("descriptor response", descriptorResponse).Traceln("The following response for the descriptor endpoint has been received")
mapOfWhiteListedMetrics := Context.GetMapOfMetrics()

for _, metr := range descriptorResponse.Data {
_, metricPresent := mapOfWhiteListedMetrics[metr.Name]
if !metricPresent {
continue
}
delete(mapOfWhiteListedMetrics, metr.Name)
metr.Labels = append(metr.Labels, MetricLabel{Key: "cluster_id", Description: "Cluster ID"})
var labels []string
for _, metrLabel := range metr.Labels {
labels = append(labels, metrLabel.Key)
}

desc := prometheus.NewDesc(
"ccloud_metric_"+GetNiceNameForMetric(metr),
metr.Description,
labels,
nil,
)

requestDuration := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "ccloud_metrics_api_request_latency",
Help: "Metrics API request latency",
ConstLabels: map[string]string{"metric": metr.Name},
}, []string{"ruleNumber"})

metric := CCloudCollectorMetric{
metric: metr,
desc: desc,
duration: requestDuration,
labels: labels,
log.Traceln("Creating http client")
httpClient = http.Client{
Timeout: time.Second * time.Duration(Context.HTTPTimeout),
}

var (
connectorResource ResourceDescription
kafkaResource ResourceDescription
ksqlResource ResourceDescription
)
resourceDescription := SendResourceDescriptorQuery()
for _, resource := range resourceDescription.Data {
if resource.Type == "connector" {
connectorResource = resource
} else if resource.Type == "kafka" {
kafkaResource = resource
} else if resource.Type == "ksql" {
ksqlResource = resource
}
collector.metrics[metr.Name] = metric
}

httpClient = http.Client{
Timeout: time.Second * time.Duration(Context.HTTPTimeout),
if connectorResource.Type == "" {
log.WithField("descriptorResponse", resourceDescription).Fatalln("No connector resource available")
}

if len(mapOfWhiteListedMetrics) > 0 {
log.WithField("Ignored metrics", mapOfWhiteListedMetrics).Warnln("The following metrics will not be gathered as they are not exposed by the Metrics API")
if kafkaResource.Type == "" {
log.WithField("descriptorResponse", resourceDescription).Fatalln("No kafka resource available")
}

if ksqlResource.Type == "" {
log.WithField("descriptorResponse", resourceDescription).Fatalln("No ksqlDB resource available")
}

collector := CCloudCollector{rules: Context.Rules, metrics: make(map[string]CCloudCollectorMetric)}
kafkaCollector := NewKafkaCCloudCollector(collector, kafkaResource)
connectorCollector := NewConnectorCCloudCollector(collector, connectorResource)
ksqlCollector := NewKsqlCCloudCollector(collector, ksqlResource)

collector.kafkaCollector = &kafkaCollector
collector.connectorCollector = &connectorCollector
collector.ksqlCollector = &ksqlCollector

return collector
}
Loading