Skip to content

Commit

Permalink
Add TCP listener for statsd input (#2293)
Browse files Browse the repository at this point in the history
  • Loading branch information
szibis authored and danielnelson committed Aug 8, 2017
1 parent f9573ad commit f3435f1
Show file tree
Hide file tree
Showing 5 changed files with 350 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

- [#2487](https://github.com/influxdata/telegraf/pull/2487): Add Kafka 0.9+ consumer support
- [#2773](https://github.com/influxdata/telegraf/pull/2773): Add support for self-signed certs to InfluxDB input plugin
- [#2293](https://github.com/influxdata/telegraf/pull/2293): Add TCP listener for statsd input
- [#2581](https://github.com/influxdata/telegraf/pull/2581): Add Docker container environment variables as tags. Only whitelisted
- [#2817](https://github.com/influxdata/telegraf/pull/2817): Add timeout option to IPMI sensor plugin
- [#2883](https://github.com/influxdata/telegraf/pull/2883): Add support for an optional SSL/TLS configuration to nginx input plugin
Expand Down
4 changes: 4 additions & 0 deletions etc/telegraf.conf
Original file line number Diff line number Diff line change
Expand Up @@ -2452,6 +2452,10 @@

# # Statsd Server
# [[inputs.statsd]]
# ## Protocol, must be "tcp" or "udp"
# protocol = "udp"
# ## Maximum number of concurrent TCP connections to allow
# max_tcp_connections = 250
# ## Address and port to host UDP listener on
# service_address = ":8125"
#
Expand Down
9 changes: 9 additions & 0 deletions plugins/inputs/statsd/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@
```toml
# Statsd Server
[[inputs.statsd]]
## Protocol, must be "tcp" or "udp" (default=udp)
protocol = "udp"

## MaxTCPConnection - applicable when protocol is set to tcp (default=250)
max_tcp_connections = 250

## Address and port to host UDP listener on
service_address = ":8125"

Expand Down Expand Up @@ -146,6 +152,9 @@ metric type:

### Plugin arguments

- **protocol** string: Protocol used in listener - tcp or udp options
- **max_tcp_connections** []int: Maximum number of concurrent TCP connections
to allow. Used when protocol is set to tcp.
- **service_address** string: Address to listen for statsd UDP packets on
- **delete_gauges** boolean: Delete gauges on every collection interval
- **delete_counters** boolean: Delete counters on every collection interval
Expand Down
213 changes: 205 additions & 8 deletions plugins/inputs/statsd/statsd.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package statsd

import (
"bufio"
"errors"
"fmt"
"log"
Expand All @@ -14,7 +15,9 @@ import (
"github.com/influxdata/telegraf/plugins/parsers/graphite"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/selfstat"
)

const (
Expand All @@ -24,15 +27,24 @@ const (

defaultFieldName = "value"

defaultProtocol = "udp"

defaultSeparator = "_"
defaultAllowPendingMessage = 10000
MaxTCPConnections = 250
)

var dropwarn = "E! Error: statsd message queue full. " +
"We have dropped %d messages so far. " +
"You may want to increase allowed_pending_messages in the config\n"

var malformedwarn = "E! Statsd over TCP has received %d malformed packets" +
" thus far."

type Statsd struct {
// Protocol used on listener - udp or tcp
Protocol string `toml:"protocol"`

// Address & Port to serve from
ServiceAddress string

Expand Down Expand Up @@ -64,9 +76,17 @@ type Statsd struct {
UDPPacketSize int `toml:"udp_packet_size"`

sync.Mutex
wg sync.WaitGroup
// Lock for preventing a data race during resource cleanup
cleanup sync.Mutex
wg sync.WaitGroup
// accept channel tracks how many active connections there are, if there
// is an available bool in accept, then we are below the maximum and can
// accept the connection
accept chan bool
// drops tracks the number of dropped metrics.
drops int
// malformed tracks the number of malformed packets
malformed int

// Channel for all incoming statsd packets
in chan []byte
Expand All @@ -83,9 +103,24 @@ type Statsd struct {
// bucket -> influx templates
Templates []string

listener *net.UDPConn
// Protocol listeners
UDPlistener *net.UDPConn
TCPlistener *net.TCPListener

// track current connections so we can close them in Stop()
conns map[string]*net.TCPConn

MaxTCPConnections int `toml:"max_tcp_connections"`

graphiteParser *graphite.GraphiteParser

acc telegraf.Accumulator

MaxConnections selfstat.Stat
CurrentConnections selfstat.Stat
TotalConnections selfstat.Stat
PacketsRecv selfstat.Stat
BytesRecv selfstat.Stat
}

// One statsd metric, form is <bucket>:<value>|<mtype>|@<samplerate>
Expand Down Expand Up @@ -128,10 +163,16 @@ type cachedtimings struct {
}

func (_ *Statsd) Description() string {
return "Statsd Server"
return "Statsd UDP/TCP Server"
}

const sampleConfig = `
## Protocol, must be "tcp" or "udp" (default=udp)
protocol = "udp"
## MaxTCPConnection - applicable when protocol is set to tcp (default=250)
max_tcp_connections = 250
## Address and port to host UDP listener on
service_address = ":8125"
Expand Down Expand Up @@ -247,6 +288,27 @@ func (s *Statsd) Start(_ telegraf.Accumulator) error {
s.sets = make(map[string]cachedset)
s.timings = make(map[string]cachedtimings)

s.Lock()
defer s.Unlock()
//
tags := map[string]string{
"address": s.ServiceAddress,
}
s.MaxConnections = selfstat.Register("statsd", "tcp_max_connections", tags)
s.MaxConnections.Set(int64(s.MaxTCPConnections))
s.CurrentConnections = selfstat.Register("statsd", "tcp_current_connections", tags)
s.TotalConnections = selfstat.Register("statsd", "tcp_total_connections", tags)
s.PacketsRecv = selfstat.Register("statsd", "tcp_packets_received", tags)
s.BytesRecv = selfstat.Register("statsd", "tcp_bytes_received", tags)

s.in = make(chan []byte, s.AllowedPendingMessages)
s.done = make(chan struct{})
s.accept = make(chan bool, s.MaxTCPConnections)
s.conns = make(map[string]*net.TCPConn)
for i := 0; i < s.MaxTCPConnections; i++ {
s.accept <- true
}

if s.ConvertNames {
log.Printf("I! WARNING statsd: convert_names config option is deprecated," +
" please use metric_separator instead")
Expand All @@ -258,31 +320,75 @@ func (s *Statsd) Start(_ telegraf.Accumulator) error {

s.wg.Add(2)
// Start the UDP listener
go s.udpListen()
switch s.Protocol {
case "udp":
go s.udpListen()
case "tcp":
go s.tcpListen()
}
// Start the line parser
go s.parser()
log.Printf("I! Started the statsd service on %s\n", s.ServiceAddress)
return nil
}

// tcpListen() starts listening for udp packets on the configured port.
func (s *Statsd) tcpListen() error {
defer s.wg.Done()
// Start listener
var err error
address, _ := net.ResolveTCPAddr("tcp", s.ServiceAddress)
s.TCPlistener, err = net.ListenTCP("tcp", address)
if err != nil {
log.Fatalf("ERROR: ListenTCP - %s", err)
return err
}
log.Println("I! TCP Statsd listening on: ", s.TCPlistener.Addr().String())
for {
select {
case <-s.done:
return nil
default:
// Accept connection:
conn, err := s.TCPlistener.AcceptTCP()
if err != nil {
return err
}

select {
case <-s.accept:
// not over connection limit, handle the connection properly.
s.wg.Add(1)
// generate a random id for this TCPConn
id := internal.RandomString(6)
s.remember(id, conn)
go s.handler(conn, id)
default:
// We are over the connection limit, refuse & close.
s.refuser(conn)
}
}
}
}

// udpListen starts listening for udp packets on the configured port.
func (s *Statsd) udpListen() error {
defer s.wg.Done()
var err error
address, _ := net.ResolveUDPAddr("udp", s.ServiceAddress)
s.listener, err = net.ListenUDP("udp", address)
s.UDPlistener, err = net.ListenUDP("udp", address)
if err != nil {
log.Fatalf("ERROR: ListenUDP - %s", err)
}
log.Println("I! Statsd listener listening on: ", s.listener.LocalAddr().String())
log.Println("I! Statsd UDP listener listening on: ", s.UDPlistener.LocalAddr().String())

buf := make([]byte, UDP_MAX_PACKET_SIZE)
for {
select {
case <-s.done:
return nil
default:
n, _, err := s.listener.ReadFromUDP(buf)
n, _, err := s.UDPlistener.ReadFromUDP(buf)
if err != nil && !strings.Contains(err.Error(), "closed network") {
log.Printf("E! Error READ: %s\n", err.Error())
continue
Expand Down Expand Up @@ -637,20 +743,111 @@ func (s *Statsd) aggregate(m metric) {
}
}

// handler handles a single TCP Connection
func (s *Statsd) handler(conn *net.TCPConn, id string) {
s.CurrentConnections.Incr(1)
s.TotalConnections.Incr(1)
// connection cleanup function
defer func() {
s.wg.Done()
conn.Close()
// Add one connection potential back to channel when this one closes
s.accept <- true
s.forget(id)
s.CurrentConnections.Incr(-1)
}()

var n int
scanner := bufio.NewScanner(conn)
for {
select {
case <-s.done:
return
default:
if !scanner.Scan() {
return
}
n = len(scanner.Bytes())
if n == 0 {
continue
}
s.BytesRecv.Incr(int64(n))
s.PacketsRecv.Incr(1)
bufCopy := make([]byte, n+1)
copy(bufCopy, scanner.Bytes())
bufCopy[n] = '\n'

select {
case s.in <- bufCopy:
default:
s.drops++
if s.drops == 1 || s.drops%s.AllowedPendingMessages == 0 {
log.Printf(dropwarn, s.drops)
}
}
}
}
}

// refuser refuses a TCP connection
func (s *Statsd) refuser(conn *net.TCPConn) {
conn.Close()
log.Printf("I! Refused TCP Connection from %s", conn.RemoteAddr())
log.Printf("I! WARNING: Maximum TCP Connections reached, you may want to" +
" adjust max_tcp_connections")
}

// forget a TCP connection
func (s *Statsd) forget(id string) {
s.cleanup.Lock()
defer s.cleanup.Unlock()
delete(s.conns, id)
}

// remember a TCP connection
func (s *Statsd) remember(id string, conn *net.TCPConn) {
s.cleanup.Lock()
defer s.cleanup.Unlock()
s.conns[id] = conn
}

func (s *Statsd) Stop() {
s.Lock()
defer s.Unlock()
log.Println("I! Stopping the statsd service")
close(s.done)
s.listener.Close()
switch s.Protocol {
case "udp":
s.UDPlistener.Close()
case "tcp":
s.TCPlistener.Close()
// Close all open TCP connections
// - get all conns from the s.conns map and put into slice
// - this is so the forget() function doesnt conflict with looping
// over the s.conns map
var conns []*net.TCPConn
s.cleanup.Lock()
for _, conn := range s.conns {
conns = append(conns, conn)
}
s.cleanup.Unlock()
for _, conn := range conns {
conn.Close()
}
default:
s.UDPlistener.Close()
}
s.wg.Wait()
close(s.in)
log.Println("I! Stopped Statsd listener service on ", s.ServiceAddress)
}

func init() {
inputs.Add("statsd", func() telegraf.Input {
return &Statsd{
Protocol: defaultProtocol,
ServiceAddress: ":8125",
MaxTCPConnections: 250,
MetricSeparator: "_",
AllowedPendingMessages: defaultAllowPendingMessage,
DeleteCounters: true,
Expand Down
Loading

0 comments on commit f3435f1

Please sign in to comment.