Skip to content

Commit

Permalink
Accept prefix as metric_types for stackdriver metricset in GCP (elast…
Browse files Browse the repository at this point in the history
…ic#19345)

* Add metric_types for stackdriver metricset in GCP
  • Loading branch information
kaiyan-sheng authored and melchiormoulin committed Oct 14, 2020
1 parent af37a06 commit 303988f
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 40 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add memory metrics into compute googlecloud. {pull}18802[18802]
- Add new fields to HAProxy module. {issue}18523[18523]
- Add Tomcat overview dashboard {pull}14026[14026]
- Accept prefix as metric_types config parameter in googlecloud stackdriver metricset. {pull}19345[19345]
- Update Couchbase to version 6.5 {issue}18595[18595] {pull}19055[19055]
- Add dashboards for googlecloud load balancing metricset. {pull}18369[18369]
- Add support for v1 consumer API in Cloud Foundry module, use it by default. {pull}19268[19268]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,20 @@ call.
[float]
== Metricset config and parameters

* *metric_types*: Required, a list of metric type strings. Each call of the
* *metric_types*: Required, a list of metric type strings, or a list of metric
type prefixes. For example, `instance/cpu` is the prefix for metric type
`instance/cpu/usage_time`, `instance/cpu/utilization` etc Each call of the
`ListTimeSeries` API can return any number of time series from a single metric
type. Metric type is to used for identifying a specific time series.

* *aligner*: A single string with which aggregation operation need to be applied
onto time series data for ListTimeSeries API. If it's not given, default aligner
is set to be `ALIGN_NONE`. Google Cloud also supports `ALIGN_DELTA`, `ALIGN_RATE`,
`ALIGN_MIN`, `ALIGN_MAX`, `ALIGN_MEAN`, `ALIGN_COUNT`, `ALIGN_SUM` and etc.
is `ALIGN_NONE`. Google Cloud also supports `ALIGN_DELTA`, `ALIGN_RATE`,
`ALIGN_MIN`, `ALIGN_MAX`, `ALIGN_MEAN`, `ALIGN_COUNT`, `ALIGN_SUM` etc.
Please see
https://cloud.google.com/monitoring/api/ref_v3/rpc/google.monitoring.v3#aligner[Aggregation Aligner]
for the full list of aligners.


[float]
=== Example Configuration
* `stackdriver` metricset is enabled to collect metrics from all zones under
Expand All @@ -37,7 +38,7 @@ are specified: first one is to collect CPU usage time and utilization with
aggregation aligner ALIGN_MEAN; second one is to collect uptime with aggregation
aligner ALIGN_SUM. These metric types all have 240 seconds ingest delay time and
60 seconds sample period. With `period` specified as `300s` in the config below,
Metricbeat will collect compute metrics from googlecloud every 5-minute with
Metricbeat will collect compute metrics from Google Cloud every 5-minute with
given aggregation aligner applied for each metric type.
+
[source,yaml]
Expand Down Expand Up @@ -69,7 +70,7 @@ are specified: first one is to collect CPU usage time and utilization with
aggregation aligner ALIGN_MEAN; second one is to collect uptime with aggregation
aligner ALIGN_SUM. These metric types all have 240 seconds ingest delay time and
60 seconds sample period. With `period` specified as `60s` in the config below,
Metricbeat will collect compute metrics from googlecloud every minute with no
Metricbeat will collect compute metrics from Google Cloud every minute with no
aggregation. This case, the aligners specified in the configuration will be
ignored.
+
Expand All @@ -94,3 +95,32 @@ ignored.
metric_types:
- "instance/uptime"
----

* `stackdriver` metricset is enabled to collect metrics from all zones under
`europe-west1-c` region in `elastic-observability` project. One set of metrics
will be collected: metric types that starts with `instance/cpu` under `compute`
service with aligner ALIGN_NONE. These metric types all have 240 seconds ingest
delay time and 60 seconds sample period. With `period` specified as `1m` in
the config below, Metricbeat will collect compute metrics from Google Cloud
every minute with no aggregation. The metric types in `compute` service with
`instance/cpu` prefix are: `instance/cpu/reserved_cores`,
`instance/cpu/scheduler_wait_time`, `instance/cpu/usage_time`, and
`instance/cpu/utilization`.

+
[source,yaml]
----
- module: googlecloud
metricsets:
- stackdriver
zone: "europe-west1-c"
project_id: elastic-observability
credentials_file_path: "your JSON credentials file path"
exclude_labels: false
period: 1m
metrics:
- aligner: ALIGN_NONE
service: compute
metric_types:
- "instance/cpu"
----
Original file line number Diff line number Diff line change
Expand Up @@ -76,17 +76,16 @@ func (r *stackdriverMetricsRequester) Metrics(ctx context.Context, sdc stackDriv
results := make([]timeSeriesWithAligner, 0)

aligner := sdc.Aligner
serviceName := sdc.ServiceName
for _, mt := range sdc.MetricTypes {
for mt, meta := range metricsMeta {
wg.Add(1)

metricMeta := meta
go func(mt string) {
defer wg.Done()

metricMeta := metricsMeta[mt]
r.logger.Debugf("For metricType %s, metricMeta = %s", mt, metricMeta)
interval, aligner := getTimeIntervalAligner(metricMeta.ingestDelay, metricMeta.samplePeriod, r.config.period, aligner)
ts := r.Metric(ctx, serviceName+".googleapis.com/"+mt, interval, aligner)
ts := r.Metric(ctx, mt, interval, aligner)
lock.Lock()
defer lock.Unlock()
results = append(results, ts)
Expand Down
72 changes: 42 additions & 30 deletions x-pack/metricbeat/module/googlecloud/stackdriver/metricset.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,12 @@ import (
"fmt"
"time"

"github.com/golang/protobuf/ptypes/duration"

monitoring "cloud.google.com/go/monitoring/apiv3"

"github.com/golang/protobuf/ptypes/duration"
"github.com/pkg/errors"

"google.golang.org/api/iterator"
"google.golang.org/api/option"
"google.golang.org/genproto/googleapis/api/metric"
monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3"

"github.com/elastic/beats/v7/libbeat/common"
Expand Down Expand Up @@ -216,41 +215,54 @@ func (mc *stackDriverConfig) Validate() error {
// (sample period and ingest delay) of each given metric type
func (m *MetricSet) metricDescriptor(ctx context.Context, client *monitoring.MetricClient) (map[string]metricMeta, error) {
metricsWithMeta := make(map[string]metricMeta, 0)
req := &monitoringpb.ListMetricDescriptorsRequest{
Name: "projects/" + m.config.ProjectID,
}

for _, sdc := range m.stackDriverConfig {
for _, mt := range sdc.MetricTypes {
req := &monitoringpb.ListMetricDescriptorsRequest{
Name: "projects/" + m.config.ProjectID,
Filter: fmt.Sprintf(`metric.type = "%s"`, sdc.ServiceName+".googleapis.com/"+mt),
}

req.Filter = fmt.Sprintf(`metric.type = starts_with("%s")`, sdc.ServiceName+".googleapis.com/"+mt)
it := client.ListMetricDescriptors(ctx, req)
out, err := it.Next()
if err != nil {
err = errors.Errorf("Could not make ListMetricDescriptors request: %s: %v", mt, err)
m.Logger().Error(err)
return metricsWithMeta, err
}

// Set samplePeriod default to 60 seconds and ingestDelay default to 0.
meta := metricMeta{
samplePeriod: 60 * time.Second,
ingestDelay: 0 * time.Second,
for {
out, err := it.Next()
if err != nil && err != iterator.Done {
err = errors.Errorf("Could not make ListMetricDescriptors request for metric type %s: %v", mt, err)
m.Logger().Error(err)
return metricsWithMeta, err
}

if out != nil {
metricsWithMeta = m.getMetadata(out, metricsWithMeta)
}

if err == iterator.Done {
break
}
}
}
}

if out.Metadata.SamplePeriod != nil {
m.Logger().Debugf("For metric type %s: sample period = %s", mt, out.Metadata.SamplePeriod)
meta.samplePeriod = time.Duration(out.Metadata.SamplePeriod.Seconds) * time.Second
}
return metricsWithMeta, nil
}

if out.Metadata.IngestDelay != nil {
m.Logger().Debugf("For metric type %s: ingest delay = %s", mt, out.Metadata.IngestDelay)
meta.ingestDelay = time.Duration(out.Metadata.IngestDelay.Seconds) * time.Second
}
func (m *MetricSet) getMetadata(out *metric.MetricDescriptor, metricsWithMeta map[string]metricMeta) map[string]metricMeta {
// Set samplePeriod default to 60 seconds and ingestDelay default to 0.
meta := metricMeta{
samplePeriod: 60 * time.Second,
ingestDelay: 0 * time.Second,
}

metricsWithMeta[mt] = meta
}
if out.Metadata.SamplePeriod != nil {
m.Logger().Debugf("For metric type %s: sample period = %s", out.Type, out.Metadata.SamplePeriod)
meta.samplePeriod = time.Duration(out.Metadata.SamplePeriod.Seconds) * time.Second
}

return metricsWithMeta, nil
if out.Metadata.IngestDelay != nil {
m.Logger().Debugf("For metric type %s: ingest delay = %s", out.Type, out.Metadata.IngestDelay)
meta.ingestDelay = time.Duration(out.Metadata.IngestDelay.Seconds) * time.Second
}

metricsWithMeta[out.Type] = meta
return metricsWithMeta
}

0 comments on commit 303988f

Please sign in to comment.