Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Emqtt batch output #4094

Merged
merged 46 commits into from
May 19, 2018
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
c8b3f7a
Adds code to batch messages sent on a topic in a flush interval
jvrahav May 3, 2018
3cd8a34
Adds code to batch messages sent on a topic in a flush interval
jvrahav May 3, 2018
df6fea7
modified telegraf configuration file entry
jvrahav May 3, 2018
821f5c6
formatted code
jvrahav May 3, 2018
ea98635
Ignore UTF8 BOM in JSON parser (#4099)
dmeiners88 May 3, 2018
468ff6f
Update changelog
danielnelson May 3, 2018
f957ca6
Update gopsutil version
danielnelson May 3, 2018
96146a7
Clarify max_retry option in kafka output
danielnelson May 4, 2018
7724344
Fix grammar
danielnelson May 4, 2018
7b14329
Remove -i flag from `make telegraf`
danielnelson May 4, 2018
7e2f219
Move usage string to internal to fix `go run`
danielnelson May 4, 2018
8df34fe
Fix platform not supported error in build.py (#4102)
otherpirate May 4, 2018
a529da0
Fix name_override example in mysql rreadme (#4100)
nsteinmetz May 4, 2018
7845d10
Only lowercase mysql slave metrics with metric_version = 2
danielnelson May 4, 2018
a863b3e
Update kafka readme
danielnelson May 4, 2018
5bb797e
Simplify testing with TLS (#4095)
danielnelson May 4, 2018
403b515
Add SerializeBatch method to the Serializer interface (#4107)
danielnelson May 5, 2018
6875f34
Update changelog
danielnelson May 5, 2018
77ae07b
Add topk processor plugin (#4096)
mirath May 5, 2018
3f09c69
Update changelog
danielnelson May 5, 2018
5f1fafd
Update issue templates (#4116)
danielnelson May 7, 2018
e2e6fe0
Remove combined issue template
danielnelson May 7, 2018
eb15ece
Add cursor metrics to mongodb input (#4114)
grubernaut May 7, 2018
e5216d3
Update changelog
danielnelson May 7, 2018
5c9c40f
Run apt-get update in release.sh
danielnelson May 7, 2018
9611788
Don't report 0ms on timeout in dns_query (#4118)
danielnelson May 8, 2018
d43acc1
Update changelog
danielnelson May 8, 2018
ad9dfdf
Add instructions on how to repair windows performance counters
danielnelson May 8, 2018
9b0e011
Add uint/bool support to cratedb output (#4117)
danielnelson May 8, 2018
814bf15
Update changelog
danielnelson May 8, 2018
e3cabe3
Skip fields that report "not supported" in nvidia-smi (#4123)
danielnelson May 8, 2018
c3bad9d
Add tag/integer pair for result to net_response (#3455)
morfien101 May 8, 2018
9a0fc78
Use result and result_code in net_response
danielnelson May 8, 2018
7ec960e
Update changelog
danielnelson May 8, 2018
ad40c58
Clarify max_retry option in kafka output
danielnelson May 4, 2018
0b0d466
Fix grammar
danielnelson May 4, 2018
10128c5
Update kafka readme
danielnelson May 4, 2018
b1db50f
Add tag/integer pair for result to net_response (#3455)
morfien101 May 8, 2018
322dfa2
Use result and result_code in net_response
danielnelson May 8, 2018
509d5e1
Merge branch 'master' into emqtt-batch-output
jvrahav May 9, 2018
64ad442
Modifies emqtt to use SerializeBatch for batch mode of operation
jvrahav May 9, 2018
150b344
removed serializeBatchMetric as we need to process each metric at a time
jvrahav May 10, 2018
95ac23e
removed serializeBatchMetric as we need to process each metric at a time
jvrahav May 10, 2018
266cdad
removed serializeBatchMetric as we need to process each metric at a time
jvrahav May 10, 2018
8e2a408
updated Readme
jvrahav May 11, 2018
f6d067f
modified code to use serializeBatch method for all metrics
jvrahav May 18, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions etc/telegraf.conf
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,13 @@
# ## Use SSL but skip chain & host verification
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Revert this file, I'll generate the telegraf.conf later from the sample configurations.

# # insecure_skip_verify = false
#
# ## Batch messages in a topic
# ## batch = false
# ## Flag to determine if messages sent in a topic in a flush interval,
# ## need to be batched into one message.
# ## batch = true, batches the messages in a topic to one messages
# ## batch = false, default behaviour
#
# ## Data format to output.
# ## Each data format has its own unique set of configuration options, read
# ## more about them here:
Expand Down Expand Up @@ -1740,19 +1747,19 @@
# ## List of metrics collected on above servers
# ## Each metric consists in a name, a jmx path and either
# ## a pass or drop slice attribute.
# ## This collect all heap memory usage metrics.
# ## This collect all heap memory usage metrics.
# [[inputs.jolokia.metrics]]
# name = "heap_memory_usage"
# mbean = "java.lang:type=Memory"
# attribute = "HeapMemoryUsage"
#
# ## This collect thread counts metrics.
# ## This collect thread counts metrics.
# [[inputs.jolokia.metrics]]
# name = "thread_count"
# mbean = "java.lang:type=Threading"
# attribute = "TotalStartedThreadCount,ThreadCount,DaemonThreadCount,PeakThreadCount"
#
# ## This collect number of class loaded/unloaded counts metrics.
# ## This collect number of class loaded/unloaded counts metrics.
# [[inputs.jolokia.metrics]]
# name = "class_count"
# mbean = "java.lang:type=ClassLoading"
Expand Down Expand Up @@ -2201,7 +2208,7 @@
# reverse_metric_names = true


# # A plugin to collect stats from Opensmtpd - a validating, recursive, and caching DNS resolver
# # A plugin to collect stats from Opensmtpd - a validating, recursive, and caching DNS resolver
# [[inputs.opensmtpd]]
# ## If running as a restricted user you can prepend sudo for additional access:
# #use_sudo = false
Expand Down Expand Up @@ -3456,4 +3463,3 @@
# [[inputs.zipkin]]
# # path = "/api/v1/spans" # URL path for span data
# # port = 9411 # Port on which Telegraf listens

4 changes: 4 additions & 0 deletions plugins/outputs/mqtt/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ This plugin writes to a [MQTT Broker](http://http://mqtt.org/) acting as a mqtt
## Use SSL but skip chain & host verification
# insecure_skip_verify = false

## Batch messages sent on a topic in a flush interval
# batch = false

## Data format to output.
data_format = "influx"
```
Expand All @@ -49,4 +52,5 @@ This plugin writes to a [MQTT Broker](http://http://mqtt.org/) acting as a mqtt
* `ssl_cert`: SSL CERT
* `ssl_key`: SSL key
* `insecure_skip_verify`: Use SSL but skip chain & host verification (default: false)
* `batch`: Batch messages sent on a topic within a flush interval (default: false)
* `data_format`: [About Telegraf data formats](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md)
32 changes: 23 additions & 9 deletions plugins/outputs/mqtt/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,15 @@ var sampleConfig = `
`

type MQTT struct {
Servers []string `toml:"servers"`
Username string
Password string
Database string
Timeout internal.Duration
TopicPrefix string
QoS int `toml:"qos"`
ClientID string `toml:"client_id"`
Servers []string `toml:"servers"`
Username string
Password string
Database string
Timeout internal.Duration
TopicPrefix string
QoS int `toml:"qos"`
ClientID string `toml:"client_id"`
BatchMessage bool `toml:"batch"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add documentation for the new option into the SampleConfig(), then run telegraf -usage mqtt to get the output for updating the README.


// Path to CA file
SSLCA string `toml:"ssl_ca"`
Expand Down Expand Up @@ -124,6 +125,8 @@ func (m *MQTT) Write(metrics []telegraf.Metric) error {
hostname = ""
}

metricsmap := make(map[string][]byte)

for _, metric := range metrics {
var t []string
if m.TopicPrefix != "" {
Expand All @@ -141,7 +144,18 @@ func (m *MQTT) Write(metrics []telegraf.Metric) error {
return err
}

err = m.publish(topic, buf)
if m.BatchMessage {
metricsmap[topic] = append(metricsmap[topic], buf...)
} else {
err = m.publish(topic, buf)
if err != nil {
return fmt.Errorf("Could not write to MQTT server, %s", err)
}
}
}

for key := range metricsmap {
err := m.publish(key, metricsmap[key])
if err != nil {
return fmt.Errorf("Could not write to MQTT server, %s", err)
}
Expand Down