Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rabbitmq exchanges metricset #6955

Merged
merged 4 commits into from
May 8, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ https://github.com/elastic/beats/compare/v6.2.3...master[Check the HEAD diff]
- Add experimental Elasticsearch index metricset. {pull}6881[6881]
- Add dashboards and visualizations for haproxy metrics. {pull}6934[6934]
- Add message rates to the RabbitMQ queue metricset {issue}6442[6442] {pull}6606[6606]
- Add exchanges metricset to the RabbitMQ module {issue}6442[6442] {pull}6607[6607]

*Packetbeat*

Expand Down
107 changes: 107 additions & 0 deletions metricbeat/docs/fields.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -10912,6 +10912,113 @@ type: long
Number of octets received on the connection.


--

[float]
== exchange fields

exchange



*`rabbitmq.exchange.name`*::
+
--
type: keyword

The name of the queue with non-ASCII characters escaped as in C.


--

*`rabbitmq.exchange.vhost`*::
+
--
type: keyword

Virtual host name with non-ASCII characters escaped as in C.


--

*`rabbitmq.exchange.durable`*::
+
--
type: boolean

Whether or not the queue survives server restarts.


--

*`rabbitmq.exchange.auto_delete`*::
+
--
type: boolean

Whether the queue will be deleted automatically when no longer used.


--

*`rabbitmq.exchange.internal`*::
+
--
type: boolean

Whether the exchange is internal, i.e. cannot be directly published to by a client.


--

*`rabbitmq.exchange.user`*::
+
--
type: keyword

User who created the exchange.


--

*`rabbitmq.exchange.messages.publish_in.count`*::
+
--
type: long

Count of messages published "in" to an exchange, i.e. not taking account of routing.


--

*`rabbitmq.exchange.messages.publish_in.details.rate`*::
+
--
type: float

How much the exchange publish-in count has changed per second in the most recent sampling interval.


--

*`rabbitmq.exchange.messages.publish_out.count`*::
+
--
type: long

Count of messages published "out" of an exchange, i.e. taking account of routing.


--

*`rabbitmq.exchange.messages.publish_out.details.rate`*::
+
--
type: float

How much the exchange publish-out count has changed per second in the most recent sampling interval.


--

[float]
Expand Down
4 changes: 4 additions & 0 deletions metricbeat/docs/modules/rabbitmq.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,16 @@ The following metricsets are available:

* <<metricbeat-metricset-rabbitmq-connection,connection>>

* <<metricbeat-metricset-rabbitmq-exchange,exchange>>

* <<metricbeat-metricset-rabbitmq-node,node>>

* <<metricbeat-metricset-rabbitmq-queue,queue>>

include::rabbitmq/connection.asciidoc[]

include::rabbitmq/exchange.asciidoc[]

include::rabbitmq/node.asciidoc[]

include::rabbitmq/queue.asciidoc[]
Expand Down
23 changes: 23 additions & 0 deletions metricbeat/docs/modules/rabbitmq/exchange.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
////
This file is generated! See scripts/docs_collector.py
////

[[metricbeat-metricset-rabbitmq-exchange]]
=== RabbitMQ exchange metricset

beta[]

include::../../../module/rabbitmq/exchange/_meta/docs.asciidoc[]


==== Fields

For a description of each field in the metricset, see the
<<exported-fields-rabbitmq,exported fields>> section.

Here is an example document generated by this metricset:

[source,json]
----
include::../../../module/rabbitmq/exchange/_meta/data.json[]
----
3 changes: 2 additions & 1 deletion metricbeat/docs/modules_list.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ This file is generated! See scripts/docs_collector.py
.2+| .2+| |<<metricbeat-metricset-prometheus-collector,collector>> beta[]
|<<metricbeat-metricset-prometheus-stats,stats>> beta[]
|<<metricbeat-module-rabbitmq,RabbitMQ>> beta[] |image:./images/icon-yes.png[Prebuilt dashboards are available] |
.3+| .3+| |<<metricbeat-metricset-rabbitmq-connection,connection>> beta[]
.4+| .4+| |<<metricbeat-metricset-rabbitmq-connection,connection>> beta[]
|<<metricbeat-metricset-rabbitmq-exchange,exchange>> beta[]
|<<metricbeat-metricset-rabbitmq-node,node>> beta[]
|<<metricbeat-metricset-rabbitmq-queue,queue>> beta[]
|<<metricbeat-module-redis,Redis>> |image:./images/icon-yes.png[Prebuilt dashboards are available] |
Expand Down
1 change: 1 addition & 0 deletions metricbeat/include/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ import (
_ "github.com/elastic/beats/metricbeat/module/prometheus/stats"
_ "github.com/elastic/beats/metricbeat/module/rabbitmq"
_ "github.com/elastic/beats/metricbeat/module/rabbitmq/connection"
_ "github.com/elastic/beats/metricbeat/module/rabbitmq/exchange"
_ "github.com/elastic/beats/metricbeat/module/rabbitmq/node"
_ "github.com/elastic/beats/metricbeat/module/rabbitmq/queue"
_ "github.com/elastic/beats/metricbeat/module/redis"
Expand Down
2 changes: 1 addition & 1 deletion metricbeat/module/rabbitmq/_meta/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM rabbitmq:3-management
FROM rabbitmq:3.7.4-management

RUN apt-get update && apt-get install -y netcat && apt-get clean
HEALTHCHECK --interval=1s --retries=90 CMD nc -w 1 -v 127.0.0.1 15672 </dev/null
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
[
{
"message_stats": {
"publish_in": 100,
"publish_in_details": {
"rate": 0.5
},
"publish_out": 99,
"publish_out_details": {
"rate": 0.9
}
},
"user_who_performed_action": "guest",
"name": "exchange.name",
"vhost": "/",
"type": "fanout",
"durable": true,
"auto_delete": false,
"internal": false,
"arguments": {}
}
]
1 change: 1 addition & 0 deletions metricbeat/module/rabbitmq/connection/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func eventsMapping(content []byte) ([]common.MapStr, error) {
err := json.Unmarshal(content, &connections)
if err != nil {
logp.Err("Error: ", err)
return nil, err
}

events := []common.MapStr{}
Expand Down
45 changes: 45 additions & 0 deletions metricbeat/module/rabbitmq/exchange/_meta/data.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
{
"@timestamp": "2017-10-12T08:05:34.853Z",
"beat": {
"hostname": "host.example.com",
"name": "host.example.com"
},
"metricset": {
"host": "localhost:15672",
"module": "rabbitmq",
"name": "exchange",
"rtt": 115
},
"rabbitmq": {
"exchange": {
"arguments": {},
"auto_delete": false,
"durable": true,
"internal": false,
"messages": {
"ack": {},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Normally in case of empty objects they should not be added. Perhaps an issue in the schema? Not a blocker of the PR but worth mentioning.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't found how to avoid it, they are definned as optional schema.Objects, there would be other way to define them?

                "messages": c.Dict("message_stats", s.Schema{
                        "publish": s.Object{
                                "count": c.Int("publish", s.Optional),
                                "details": c.Dict("publish_details", s.Schema{
                                        "rate": c.Float("rate"),
                                }, c.DictOptional),
                        },
                        ...
                }

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ruflin would it be ok to define the schema with dotted field names?

                "messages": c.Dict("message_stats", s.Schema{
                        "publish.count": c.Int("publish", s.Optional),
                        "publish.details": c.Dict("publish_details", s.Schema{
                                "rate": c.Float("rate"),
                        }, c.DictOptional),

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this will have any affect on the output in the data.json?

For the empty objects: Can you try to set the surrounding doc as optional too? Just had a look at ack and I think it's not set to optional.

"confirm": {},
"deliver_get": {},
"publish": {},
"publish_in": {
"count": 607,
"details": {
"rate": 4
}
},
"publish_out": {
"count": 547,
"details": {
"rate": 4
}
},
"redeliver": {},
"return_unroutable": {}
},
"name": "",
"type": "direct",
"user": "rmq-internal",
"vhost": "/"
}
}
}
3 changes: 3 additions & 0 deletions metricbeat/module/rabbitmq/exchange/_meta/docs.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
=== rabbitmq exchange MetricSet

This is the exchange metricset of the module rabbitmq.
47 changes: 47 additions & 0 deletions metricbeat/module/rabbitmq/exchange/_meta/fields.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
- name: exchange
type: group
description: >
exchange
release: beta
fields:
- name: name
type: keyword
description: >
The name of the queue with non-ASCII characters escaped as in C.
- name: vhost
type: keyword
description: >
Virtual host name with non-ASCII characters escaped as in C.
- name: durable
type: boolean
description: >
Whether or not the queue survives server restarts.
- name: auto_delete
type: boolean
description: >
Whether the queue will be deleted automatically when no longer used.
- name: internal
type: boolean
description: >
Whether the exchange is internal, i.e. cannot be directly published to by a client.
- name: user
type: keyword
description: >
User who created the exchange.

- name: messages.publish_in.count
type: long
description: >
Count of messages published "in" to an exchange, i.e. not taking account of routing.
- name: messages.publish_in.details.rate
type: float
description: >
How much the exchange publish-in count has changed per second in the most recent sampling interval.
- name: messages.publish_out.count
type: long
description: >
Count of messages published "out" of an exchange, i.e. taking account of routing.
- name: messages.publish_out.details.rate
type: float
description: >
How much the exchange publish-out count has changed per second in the most recent sampling interval.
61 changes: 61 additions & 0 deletions metricbeat/module/rabbitmq/exchange/data.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package exchange

import (
"encoding/json"

"github.com/elastic/beats/libbeat/common"
s "github.com/elastic/beats/libbeat/common/schema"
c "github.com/elastic/beats/libbeat/common/schema/mapstriface"
"github.com/elastic/beats/libbeat/logp"
)

var (
schema = s.Schema{
"name": c.Str("name"),
"vhost": c.Str("vhost"),
"type": c.Str("type"),
"durable": c.Bool("durable"),
"auto_delete": c.Bool("auto_delete"),
"internal": c.Bool("internal"),
"arguments": c.Dict("arguments", s.Schema{}),
"user": c.Str("user_who_performed_action", s.Optional),
"messages": c.Dict("message_stats", s.Schema{
"publish_in": s.Object{
"count": c.Int("publish_in", s.Optional),
"details": c.Dict("publish_in_details", s.Schema{
"rate": c.Float("rate"),
}, c.DictOptional),
},
"publish_out": s.Object{
"count": c.Int("publish_out", s.Optional),
"details": c.Dict("publish_out_details", s.Schema{
"rate": c.Float("rate"),
}, c.DictOptional),
},
}, c.DictOptional),
}
)

func eventsMapping(content []byte) ([]common.MapStr, error) {
var exchanges []map[string]interface{}
err := json.Unmarshal(content, &exchanges)
if err != nil {
logp.Err("Error: ", err)
return nil, err
}

events := []common.MapStr{}
errors := s.NewErrors()

for _, exchange := range exchanges {
event, errs := eventMapping(exchange)
events = append(events, event)
errors.AddErrors(errs)
}

return events, errors
}

func eventMapping(exchange map[string]interface{}) (common.MapStr, *s.Errors) {
return schema.Apply(exchange)
}
Loading