Skip to content

Commit

Permalink
Updating elasticsearch/ccr metricset with ES ccr_stats API changes (e…
Browse files Browse the repository at this point in the history
…lastic#9179)

Resolves elastic/kibana#25778.

The ES CCR API changed a little before the 6.5.0 FF and we didn't catch these changes 😞. This PR updates the CCR metricset code to work with the new ES CCR API response structure.

## Testing this PR
1. Setup CCR: https://www.elastic.co/guide/en/elastic-stack-overview/current/ccr-getting-started.html
2. Checkout this PR and `make` Metricbeat: `cd metricbeat; make`
3. Enable the `elasticsearch` module: `./metricbeat modules enable elasticsearch`
4. Edit `modules.d/elasticsearch.yml` and make sure the `ccr` metricset is enabled and `xpack.enabled: true` is set.
5. Start Metricbeat: `./metricbeat -e`
6. After about 10-20 seconds, check that Metricbeat indexed the correct document of `type` = `ccr_stats` into `.monitoring-es-mb-6-*`.
  • Loading branch information
ycombinator committed Nov 22, 2018
1 parent d6589d1 commit d4d2e0c
Show file tree
Hide file tree
Showing 8 changed files with 81 additions and 155 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ https://github.com/elastic/beats/compare/v6.5.0...6.x[Check the HEAD diff]

- Fix race condition when enriching events with kubernetes metadata. {issue}9055[9055] {issue}9067[9067]
- Fix panic on docker healthcheck collection on dockers without healthchecks. {pull}9171[9171]
- Fix issue with not collecting Elasticsearch cross-cluster replication stats correctly. {pull}9179[9179]

*Packetbeat*

Expand Down
4 changes: 2 additions & 2 deletions metricbeat/docs/fields.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -3510,7 +3510,7 @@ Number of the shard within the index
--
*`elasticsearch.ccr.follower.operations_indexed`*::
*`elasticsearch.ccr.follower.operations_written`*::
+
--
type: long
Expand All @@ -3520,7 +3520,7 @@ Number of operations indexed (replicated) into the follower shard from the leade
--
*`elasticsearch.ccr.follower.time_since_last_fetch.ms`*::
*`elasticsearch.ccr.follower.time_since_last_read.ms`*::
+
--
type: long
Expand Down
20 changes: 10 additions & 10 deletions metricbeat/module/elasticsearch/ccr/_meta/data.json
Original file line number Diff line number Diff line change
@@ -1,30 +1,30 @@
{
"@timestamp": "2017-10-12T08:05:34.853Z",
"beat": {
"agent": {
"hostname": "host.example.com",
"name": "host.example.com"
},
"elasticsearch": {
"ccr": {
"follower": {
"global_checkpoint": 1,
"index": "my_follower",
"operations_indexed": 2,
"global_checkpoint": -1,
"index": "my_index_f",
"operations_written": 0,
"shard": {
"number": 0
},
"time_since_last_fetch": {
"ms": 4926
"time_since_last_read": {
"ms": 42294
}
},
"leader": {
"index": "my_leader",
"max_seq_no": 1
"index": "my_index",
"max_seq_no": -1
}
},
"cluster": {
"id": "KSGkOjOuSg6whAgtpPyhQw",
"name": "elasticsearch"
"id": "3LbUkLkURz--FR-YO0wLNA",
"name": "es1"
}
},
"metricset": {
Expand Down
4 changes: 2 additions & 2 deletions metricbeat/module/elasticsearch/ccr/_meta/fields.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@
type: long
description: >
Number of the shard within the index
- name: operations_indexed
- name: operations_written
type: long
description: >
Number of operations indexed (replicated) into the follower shard from the leader shard
- name: time_since_last_fetch.ms
- name: time_since_last_read.ms
type: long
description: >
Time, in ms, since the follower last fetched from the leader
Expand Down
138 changes: 43 additions & 95 deletions metricbeat/module/elasticsearch/ccr/_meta/test/ccr_stats.700.json
Original file line number Diff line number Diff line change
@@ -1,98 +1,46 @@
{
"my_follower": [
{
"leader_index": "my_leader",
"follower_index": "my_follower",
"shard_id": 0,
"leader_global_checkpoint": 1,
"leader_max_seq_no": 1,
"follower_global_checkpoint": 1,
"follower_max_seq_no": 1,
"last_requested_seq_no": 1,
"number_of_concurrent_reads": 1,
"number_of_concurrent_writes": 0,
"number_of_queued_writes": 0,
"mapping_version": 2,
"total_fetch_time_millis": 21,
"number_of_successful_fetches": 142,
"number_of_failed_fetches": 6743,
"operations_received": 2,
"total_transferred_bytes": 166,
"total_index_time_millis": 48,
"number_of_successful_bulk_operations": 2,
"number_of_failed_bulk_operations": 0,
"number_of_operations_indexed": 2,
"fetch_exceptions": [
{
"from_seq_no": 2,
"retries": 6743,
"exception": {
"type": "exception",
"reason": "NoShardAvailableActionException[No shard available for [Request{fromSeqNo=2, maxOperationCount=1024, shardId=[leader][0], expectedHistoryUUID=ki9Do9c-QQKVydB-6Txkdg, maxOperationSizeInBytes=9223372036854775807}]]; nested: RemoteTransportException[[d0XH9XU][127.0.0.1:9300][indices:data/read/xpack/ccr/shard_changes[s]]]; nested: IndexNotFoundException[no such index];",
"caused_by": {
"type": "no_shard_available_action_exception",
"reason": "No shard available for [Request{fromSeqNo=2, maxOperationCount=1024, shardId=[leader][0], expectedHistoryUUID=ki9Do9c-QQKVydB-6Txkdg, maxOperationSizeInBytes=9223372036854775807}]",
"caused_by": {
"type": "index_not_found_exception",
"reason": "no such index",
"index_uuid": "RH8_j_w0Q0GGmpY0HMVQ_A",
"index": "my_leader"
}
}
"auto_follow_stats": {
"number_of_failed_follow_indices": 0,
"number_of_failed_remote_cluster_state_requests": 0,
"number_of_successful_follow_indices": 1,
"recent_auto_follow_errors": []
},
"follow_stats": {
"indices": [
{
"index": "follower_index",
"shards": [
{
"remote_cluster": "remote_cluster",
"leader_index": "leader_index",
"follower_index": "follower_index",
"shard_id": 0,
"leader_global_checkpoint": 1024,
"leader_max_seq_no": 1536,
"follower_global_checkpoint": 768,
"follower_max_seq_no": 896,
"last_requested_seq_no": 897,
"outstanding_read_requests": 8,
"outstanding_write_requests": 2,
"write_buffer_operation_count": 64,
"follower_mapping_version": 4,
"follower_settings_version": 2,
"total_read_time_millis": 32768,
"total_read_remote_exec_time_millis": 16384,
"successful_read_requests": 32,
"failed_read_requests": 0,
"operations_read": 896,
"bytes_read": 32768,
"total_write_time_millis": 16384,
"write_buffer_size_in_bytes": 1536,
"successful_write_requests": 16,
"failed_write_requests": 0,
"operations_written": 832,
"read_exceptions": [],
"time_since_last_read_millis": 8
}
}
],
"time_since_last_fetch_millis": 470
},
{
"leader_index": "my_leader",
"follower_index": "my_follower",
"shard_id": 1,
"leader_global_checkpoint": -1,
"leader_max_seq_no": -1,
"follower_global_checkpoint": -1,
"follower_max_seq_no": -1,
"last_requested_seq_no": -1,
"number_of_concurrent_reads": 0,
"number_of_concurrent_writes": 0,
"number_of_queued_writes": 0,
"mapping_version": 2,
"total_fetch_time_millis": 0,
"number_of_successful_fetches": 336,
"number_of_failed_fetches": 0,
"operations_received": 0,
"total_transferred_bytes": 0,
"total_index_time_millis": 0,
"number_of_successful_bulk_operations": 0,
"number_of_failed_bulk_operations": 0,
"number_of_operations_indexed": 0,
"fetch_exceptions": [],
"time_since_last_fetch_millis": 4323
},
{
"leader_index": "my_leader",
"follower_index": "my_follower",
"shard_id": 2,
"leader_global_checkpoint": 1,
"leader_max_seq_no": 1,
"follower_global_checkpoint": 1,
"follower_max_seq_no": 1,
"last_requested_seq_no": 1,
"number_of_concurrent_reads": 0,
"number_of_concurrent_writes": 0,
"number_of_queued_writes": 0,
"mapping_version": 2,
"total_fetch_time_millis": 0,
"number_of_successful_fetches": 372,
"number_of_failed_fetches": 0,
"operations_received": 2,
"total_transferred_bytes": 166,
"total_index_time_millis": 32,
"number_of_successful_bulk_operations": 2,
"number_of_failed_bulk_operations": 0,
"number_of_operations_indexed": 2,
"fetch_exceptions": [],
"time_since_last_fetch_millis": 4323
}
]
]
}
]
}
}
40 changes: 15 additions & 25 deletions metricbeat/module/elasticsearch/ccr/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package ccr

import (
"encoding/json"
"fmt"

"github.com/joeshaw/multierror"
"github.com/pkg/errors"
Expand All @@ -42,17 +41,25 @@ var (
"shard": s.Object{
"number": c.Int("shard_id"),
},
"operations_indexed": c.Int("number_of_operations_indexed"),
"time_since_last_fetch": s.Object{
"ms": c.Int("time_since_last_fetch_millis"),
"operations_written": c.Int("operations_written"),
"time_since_last_read": s.Object{
"ms": c.Int("time_since_last_read_millis"),
},
"global_checkpoint": c.Int("follower_global_checkpoint"),
},
}
)

type response struct {
FollowStats struct {
Indices []struct {
Shards []map[string]interface{} `json:"shards"`
} `json:"indices"`
} `json:"follow_stats"`
}

func eventsMapping(r mb.ReporterV2, info elasticsearch.Info, content []byte) error {
var data map[string]interface{}
var data response
err := json.Unmarshal(content, &data)
if err != nil {
err = errors.Wrap(err, "failure parsing Elasticsearch CCR Stats API response")
Expand All @@ -61,17 +68,8 @@ func eventsMapping(r mb.ReporterV2, info elasticsearch.Info, content []byte) err
}

var errs multierror.Errors
for _, followerShards := range data {

shards, ok := followerShards.([]interface{})
if !ok {
err := fmt.Errorf("shards is not an array")
errs = append(errs, err)
r.Error(err)
continue
}

for _, s := range shards {
for _, followerIndex := range data.FollowStats.Indices {
for _, followerShard := range followerIndex.Shards {
event := mb.Event{}
event.RootFields = common.MapStr{}
event.RootFields.Put("service.name", elasticsearch.ModuleName)
Expand All @@ -80,15 +78,7 @@ func eventsMapping(r mb.ReporterV2, info elasticsearch.Info, content []byte) err
event.ModuleFields.Put("cluster.name", info.ClusterName)
event.ModuleFields.Put("cluster.id", info.ClusterID)

shard, ok := s.(map[string]interface{})
if !ok {
event.Error = fmt.Errorf("shard is not an object")
r.Event(event)
errs = append(errs, event.Error)
continue
}

event.MetricSetFields, err = schema.Apply(shard)
event.MetricSetFields, err = schema.Apply(followerShard)
if err != nil {
event.Error = errors.Wrap(err, "failure applying shard schema")
r.Event(event)
Expand Down
27 changes: 7 additions & 20 deletions metricbeat/module/elasticsearch/ccr/data_xpack.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package ccr

import (
"encoding/json"
"fmt"
"time"

"github.com/joeshaw/multierror"
Expand All @@ -32,36 +31,24 @@ import (
)

func eventsMappingXPack(r mb.ReporterV2, m *MetricSet, info elasticsearch.Info, content []byte) error {
var data map[string]interface{}
var data response
err := json.Unmarshal(content, &data)
if err != nil {
return errors.Wrap(err, "failure parsing Elasticsearch CCR Stats API response")
err = errors.Wrap(err, "failure parsing Elasticsearch CCR Stats API response")
r.Error(err)
return err
}

var errors multierror.Errors
for _, followerShards := range data {

shards, ok := followerShards.([]interface{})
if !ok {
err := fmt.Errorf("shards is not an array")
errors = append(errors, err)
continue
}

for _, s := range shards {
shard, ok := s.(map[string]interface{})
if !ok {
err := fmt.Errorf("shard is not an object")
errors = append(errors, err)
continue
}
for _, followerIndex := range data.FollowStats.Indices {
for _, followerShard := range followerIndex.Shards {
event := mb.Event{}
event.RootFields = common.MapStr{
"cluster_uuid": info.ClusterID,
"timestamp": common.Time(time.Now()),
"interval_ms": m.Module().Config().Period / time.Millisecond,
"type": "ccr_stats",
"ccr_stats": shard,
"ccr_stats": followerShard,
}

event.Index = elastic.MakeXPackMonitoringIndexName(elastic.Elasticsearch)
Expand Down
2 changes: 1 addition & 1 deletion metricbeat/module/elasticsearch/fields.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit d4d2e0c

Please sign in to comment.