-
Notifications
You must be signed in to change notification settings - Fork 5.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
TCP listener for statsd input #2293
Merged
Merged
Changes from 22 commits
Commits
Show all changes
29 commits
Select commit
Hold shift + click to select a range
6623b4b
test for disable tag sorting
szibis 728ba11
Add tcp listener to statsd input
szibis d0ab7cd
Readme and example config updates
szibis 4282b4a
Adding default values for new arguments in doc
szibis 589e4db
Sefl stats with tcp information in name
szibis d8b20e8
Add TCP listener tests for statsd input
szibis 4be8bf0
Revert "test for disable tag sorting"
szibis 123428d
Merge remote-tracking branch 'upstream/master'
szibis a1f11b7
Add TCP listenedr for statsd PR to Changelog
szibis 07cdccb
Merge remote-tracking branch 'upstream/master'
szibis a104281
Merge remote-tracking branch 'upstream/master'
szibis 12614ca
Move changelog line from 1.3 to 1.4 info
szibis b0b459e
test for disable tag sorting
szibis f41b247
Add tcp listener to statsd input
szibis a8121e6
Readme and example config updates
szibis 4c13754
Adding default values for new arguments in doc
szibis c2e0aa0
Sefl stats with tcp information in name
szibis aba92e3
Add TCP listener tests for statsd input
szibis 228cf6a
Revert "test for disable tag sorting"
szibis dc1d190
Add TCP listenedr for statsd PR to Changelog
szibis 60ad86d
Move changelog line from 1.3 to 1.4 info
szibis 69f21fc
Merge branch 'master' of github.com:szibis/telegraf
szibis e595a04
Merge remote-tracking branch 'upstream/master'
szibis 5dc888f
Changelog update
szibis 7eb4a0f
Just closing connection on refuse with log info
szibis 7285190
Fix test by removing info in return on refuse
szibis d7095f9
Fix test by removing info in return on refuse - remove unused variables
szibis e7b4c32
Merge remote-tracking branch 'upstream/master'
szibis 036fe5d
remove duplicate line in changelog
szibis File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,7 @@ | ||
package statsd | ||
|
||
import ( | ||
"bufio" | ||
"errors" | ||
"fmt" | ||
"log" | ||
|
@@ -14,7 +15,9 @@ import ( | |
"github.com/influxdata/telegraf/plugins/parsers/graphite" | ||
|
||
"github.com/influxdata/telegraf" | ||
"github.com/influxdata/telegraf/internal" | ||
"github.com/influxdata/telegraf/plugins/inputs" | ||
"github.com/influxdata/telegraf/selfstat" | ||
) | ||
|
||
const ( | ||
|
@@ -24,15 +27,24 @@ const ( | |
|
||
defaultFieldName = "value" | ||
|
||
defaultProtocol = "udp" | ||
|
||
defaultSeparator = "_" | ||
defaultAllowPendingMessage = 10000 | ||
MaxTCPConnections = 250 | ||
) | ||
|
||
var dropwarn = "E! Error: statsd message queue full. " + | ||
"We have dropped %d messages so far. " + | ||
"You may want to increase allowed_pending_messages in the config\n" | ||
|
||
var malformedwarn = "E! Statsd over TCP has received %d malformed packets" + | ||
" thus far." | ||
|
||
type Statsd struct { | ||
// Protocol used on listener - udp or tcp | ||
Protocol string `toml:"protocol"` | ||
|
||
// Address & Port to serve from | ||
ServiceAddress string | ||
|
||
|
@@ -64,9 +76,17 @@ type Statsd struct { | |
UDPPacketSize int `toml:"udp_packet_size"` | ||
|
||
sync.Mutex | ||
wg sync.WaitGroup | ||
// Lock for preventing a data race during resource cleanup | ||
cleanup sync.Mutex | ||
wg sync.WaitGroup | ||
// accept channel tracks how many active connections there are, if there | ||
// is an available bool in accept, then we are below the maximum and can | ||
// accept the connection | ||
accept chan bool | ||
// drops tracks the number of dropped metrics. | ||
drops int | ||
// malformed tracks the number of malformed packets | ||
malformed int | ||
|
||
// Channel for all incoming statsd packets | ||
in chan []byte | ||
|
@@ -83,9 +103,24 @@ type Statsd struct { | |
// bucket -> influx templates | ||
Templates []string | ||
|
||
listener *net.UDPConn | ||
// Protocol listeners | ||
UDPlistener *net.UDPConn | ||
TCPlistener *net.TCPListener | ||
|
||
// track current connections so we can close them in Stop() | ||
conns map[string]*net.TCPConn | ||
|
||
MaxTCPConnections int `toml:"max_tcp_connections"` | ||
|
||
graphiteParser *graphite.GraphiteParser | ||
|
||
acc telegraf.Accumulator | ||
|
||
MaxConnections selfstat.Stat | ||
CurrentConnections selfstat.Stat | ||
TotalConnections selfstat.Stat | ||
PacketsRecv selfstat.Stat | ||
BytesRecv selfstat.Stat | ||
} | ||
|
||
// One statsd metric, form is <bucket>:<value>|<mtype>|@<samplerate> | ||
|
@@ -128,10 +163,16 @@ type cachedtimings struct { | |
} | ||
|
||
func (_ *Statsd) Description() string { | ||
return "Statsd Server" | ||
return "Statsd UDP/TCP Server" | ||
} | ||
|
||
const sampleConfig = ` | ||
## Protocol, must be "tcp" or "udp" (default=udp) | ||
protocol = "udp" | ||
|
||
## MaxTCPConnection - applicable when protocol is set to tcp (default=250) | ||
max_tcp_connections = 250 | ||
|
||
## Address and port to host UDP listener on | ||
service_address = ":8125" | ||
|
||
|
@@ -247,6 +288,27 @@ func (s *Statsd) Start(_ telegraf.Accumulator) error { | |
s.sets = make(map[string]cachedset) | ||
s.timings = make(map[string]cachedtimings) | ||
|
||
s.Lock() | ||
defer s.Unlock() | ||
// | ||
tags := map[string]string{ | ||
"address": s.ServiceAddress, | ||
} | ||
s.MaxConnections = selfstat.Register("statsd", "tcp_max_connections", tags) | ||
s.MaxConnections.Set(int64(s.MaxTCPConnections)) | ||
s.CurrentConnections = selfstat.Register("statsd", "tcp_current_connections", tags) | ||
s.TotalConnections = selfstat.Register("statsd", "tcp_total_connections", tags) | ||
s.PacketsRecv = selfstat.Register("statsd", "tcp_packets_received", tags) | ||
s.BytesRecv = selfstat.Register("statsd", "tcp_bytes_received", tags) | ||
|
||
s.in = make(chan []byte, s.AllowedPendingMessages) | ||
s.done = make(chan struct{}) | ||
s.accept = make(chan bool, s.MaxTCPConnections) | ||
s.conns = make(map[string]*net.TCPConn) | ||
for i := 0; i < s.MaxTCPConnections; i++ { | ||
s.accept <- true | ||
} | ||
|
||
if s.ConvertNames { | ||
log.Printf("I! WARNING statsd: convert_names config option is deprecated," + | ||
" please use metric_separator instead") | ||
|
@@ -258,31 +320,75 @@ func (s *Statsd) Start(_ telegraf.Accumulator) error { | |
|
||
s.wg.Add(2) | ||
// Start the UDP listener | ||
go s.udpListen() | ||
switch s.Protocol { | ||
case "udp": | ||
go s.udpListen() | ||
case "tcp": | ||
go s.tcpListen() | ||
} | ||
// Start the line parser | ||
go s.parser() | ||
log.Printf("I! Started the statsd service on %s\n", s.ServiceAddress) | ||
return nil | ||
} | ||
|
||
// tcpListen() starts listening for udp packets on the configured port. | ||
func (s *Statsd) tcpListen() error { | ||
defer s.wg.Done() | ||
// Start listener | ||
var err error | ||
address, _ := net.ResolveTCPAddr("tcp", s.ServiceAddress) | ||
s.TCPlistener, err = net.ListenTCP("tcp", address) | ||
if err != nil { | ||
log.Fatalf("ERROR: ListenTCP - %s", err) | ||
return err | ||
} | ||
log.Println("I! TCP Statsd listening on: ", s.TCPlistener.Addr().String()) | ||
for { | ||
select { | ||
case <-s.done: | ||
return nil | ||
default: | ||
// Accept connection: | ||
conn, err := s.TCPlistener.AcceptTCP() | ||
if err != nil { | ||
return err | ||
} | ||
|
||
select { | ||
case <-s.accept: | ||
// not over connection limit, handle the connection properly. | ||
s.wg.Add(1) | ||
// generate a random id for this TCPConn | ||
id := internal.RandomString(6) | ||
s.remember(id, conn) | ||
go s.handler(conn, id) | ||
default: | ||
// We are over the connection limit, refuse & close. | ||
s.refuser(conn) | ||
} | ||
} | ||
} | ||
} | ||
|
||
// udpListen starts listening for udp packets on the configured port. | ||
func (s *Statsd) udpListen() error { | ||
defer s.wg.Done() | ||
var err error | ||
address, _ := net.ResolveUDPAddr("udp", s.ServiceAddress) | ||
s.listener, err = net.ListenUDP("udp", address) | ||
s.UDPlistener, err = net.ListenUDP("udp", address) | ||
if err != nil { | ||
log.Fatalf("ERROR: ListenUDP - %s", err) | ||
} | ||
log.Println("I! Statsd listener listening on: ", s.listener.LocalAddr().String()) | ||
log.Println("I! Statsd UDP listener listening on: ", s.UDPlistener.LocalAddr().String()) | ||
|
||
buf := make([]byte, UDP_MAX_PACKET_SIZE) | ||
for { | ||
select { | ||
case <-s.done: | ||
return nil | ||
default: | ||
n, _, err := s.listener.ReadFromUDP(buf) | ||
n, _, err := s.UDPlistener.ReadFromUDP(buf) | ||
if err != nil && !strings.Contains(err.Error(), "closed network") { | ||
log.Printf("E! Error READ: %s\n", err.Error()) | ||
continue | ||
|
@@ -636,20 +742,115 @@ func (s *Statsd) aggregate(m metric) { | |
} | ||
} | ||
|
||
// handler handles a single TCP Connection | ||
func (s *Statsd) handler(conn *net.TCPConn, id string) { | ||
s.CurrentConnections.Incr(1) | ||
s.TotalConnections.Incr(1) | ||
// connection cleanup function | ||
defer func() { | ||
s.wg.Done() | ||
conn.Close() | ||
// Add one connection potential back to channel when this one closes | ||
s.accept <- true | ||
s.forget(id) | ||
s.CurrentConnections.Incr(-1) | ||
}() | ||
|
||
var n int | ||
scanner := bufio.NewScanner(conn) | ||
for { | ||
select { | ||
case <-s.done: | ||
return | ||
default: | ||
if !scanner.Scan() { | ||
return | ||
} | ||
n = len(scanner.Bytes()) | ||
if n == 0 { | ||
continue | ||
} | ||
s.BytesRecv.Incr(int64(n)) | ||
s.PacketsRecv.Incr(1) | ||
bufCopy := make([]byte, n+1) | ||
copy(bufCopy, scanner.Bytes()) | ||
bufCopy[n] = '\n' | ||
|
||
select { | ||
case s.in <- bufCopy: | ||
default: | ||
s.drops++ | ||
if s.drops == 1 || s.drops%s.AllowedPendingMessages == 0 { | ||
log.Printf(dropwarn, s.drops) | ||
} | ||
} | ||
} | ||
} | ||
} | ||
|
||
// refuser refuses a TCP connection | ||
func (s *Statsd) refuser(conn *net.TCPConn) { | ||
// Tell the connection why we are closing. | ||
fmt.Fprintf(conn, "Telegraf maximum concurrent TCP connections (%d)"+ | ||
" reached, closing.\nYou may want to increase max_tcp_connections in"+ | ||
" the Telegraf tcp listener configuration.\n", s.MaxTCPConnections) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am assuming this is not part of the statsd protocol, any chance this will confuse a tcp statsd implementation? Maybe we should just close the connection? |
||
conn.Close() | ||
log.Printf("I! Refused TCP Connection from %s", conn.RemoteAddr()) | ||
log.Printf("I! WARNING: Maximum TCP Connections reached, you may want to" + | ||
" adjust max_tcp_connections") | ||
} | ||
|
||
// forget a TCP connection | ||
func (s *Statsd) forget(id string) { | ||
s.cleanup.Lock() | ||
defer s.cleanup.Unlock() | ||
delete(s.conns, id) | ||
} | ||
|
||
// remember a TCP connection | ||
func (s *Statsd) remember(id string, conn *net.TCPConn) { | ||
s.cleanup.Lock() | ||
defer s.cleanup.Unlock() | ||
s.conns[id] = conn | ||
} | ||
|
||
func (s *Statsd) Stop() { | ||
s.Lock() | ||
defer s.Unlock() | ||
log.Println("I! Stopping the statsd service") | ||
close(s.done) | ||
s.listener.Close() | ||
switch s.Protocol { | ||
case "udp": | ||
s.UDPlistener.Close() | ||
case "tcp": | ||
s.TCPlistener.Close() | ||
// Close all open TCP connections | ||
// - get all conns from the s.conns map and put into slice | ||
// - this is so the forget() function doesnt conflict with looping | ||
// over the s.conns map | ||
var conns []*net.TCPConn | ||
s.cleanup.Lock() | ||
for _, conn := range s.conns { | ||
conns = append(conns, conn) | ||
} | ||
s.cleanup.Unlock() | ||
for _, conn := range conns { | ||
conn.Close() | ||
} | ||
default: | ||
s.UDPlistener.Close() | ||
} | ||
s.wg.Wait() | ||
close(s.in) | ||
log.Println("I! Stopped Statsd listener service on ", s.ServiceAddress) | ||
} | ||
|
||
func init() { | ||
inputs.Add("statsd", func() telegraf.Input { | ||
return &Statsd{ | ||
Protocol: defaultProtocol, | ||
ServiceAddress: ":8125", | ||
MaxTCPConnections: 250, | ||
MetricSeparator: "_", | ||
AllowedPendingMessages: defaultAllowPendingMessage, | ||
DeleteCounters: true, | ||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove this line, looks a leftover from a rebase.