Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Continue with batch if a metric is unserializable in InfluxDB UDP output #4534

Merged
merged 1 commit into from
Aug 14, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions plugins/outputs/influxdb/udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ package influxdb
import (
"context"
"fmt"
"log"
"net"
"net/url"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/serializers"
"github.com/influxdata/telegraf/plugins/serializers/influx"
)

Expand All @@ -28,7 +28,7 @@ type Conn interface {
type UDPConfig struct {
MaxPayloadSize int
URL *url.URL
Serializer serializers.Serializer
Serializer *influx.Serializer
Dialer Dialer
}

Expand Down Expand Up @@ -65,7 +65,7 @@ func NewUDPClient(config *UDPConfig) (*udpClient, error) {
type udpClient struct {
conn Conn
dialer Dialer
serializer serializers.Serializer
serializer *influx.Serializer
url *url.URL
}

Expand All @@ -89,7 +89,11 @@ func (c *udpClient) Write(ctx context.Context, metrics []telegraf.Metric) error
for _, metric := range metrics {
octets, err := c.serializer.Serialize(metric)
if err != nil {
return fmt.Errorf("could not serialize metric: %v", err)
// Since we are serializing multiple metrics, don't fail the
// entire batch just because of one unserializable metric.
log.Printf("E! [outputs.influxdb] when writing to [%s] could not serialize metric: %v",
c.URL(), err)
continue
}

_, err = c.conn.Write(octets)
Expand Down
90 changes: 59 additions & 31 deletions plugins/outputs/influxdb/udp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"
"log"
"net"
"net/url"
"sync"
Expand All @@ -13,7 +14,6 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/outputs/influxdb"
"github.com/influxdata/telegraf/plugins/serializers/influx"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -65,19 +65,6 @@ func (d *MockDialer) DialContext(ctx context.Context, network string, address st
return d.DialContextF(network, address)
}

type MockSerializer struct {
SerializeF func(metric telegraf.Metric) ([]byte, error)
SerializeBatchF func(metrics []telegraf.Metric) ([]byte, error)
}

func (s *MockSerializer) Serialize(metric telegraf.Metric) ([]byte, error) {
return s.SerializeF(metric)
}

func (s *MockSerializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
return s.SerializeBatchF(metrics)
}

func TestUDP_NewUDPClientNoURL(t *testing.T) {
config := &influxdb.UDPConfig{}
_, err := influxdb.NewUDPClient(config)
Expand Down Expand Up @@ -177,28 +164,69 @@ func TestUDP_WriteError(t *testing.T) {
require.True(t, closed)
}

func TestUDP_SerializeError(t *testing.T) {
config := &influxdb.UDPConfig{
URL: getURL(),
Dialer: &MockDialer{
DialContextF: func(network, address string) (influxdb.Conn, error) {
conn := &MockConn{}
return conn, nil
func TestUDP_ErrorLogging(t *testing.T) {
tests := []struct {
name string
config *influxdb.UDPConfig
metrics []telegraf.Metric
logContains string
}{
{
name: "logs need more space",
config: &influxdb.UDPConfig{
MaxPayloadSize: 1,
URL: getURL(),
Dialer: &MockDialer{
DialContextF: func(network, address string) (influxdb.Conn, error) {
conn := &MockConn{}
return conn, nil
},
},
},
metrics: []telegraf.Metric{getMetric()},
logContains: `could not serialize metric: "cpu": need more space`,
},
Serializer: &MockSerializer{
SerializeF: func(metric telegraf.Metric) ([]byte, error) {
return nil, influx.ErrNeedMoreSpace
{
name: "logs series name",
config: &influxdb.UDPConfig{
URL: getURL(),
Dialer: &MockDialer{
DialContextF: func(network, address string) (influxdb.Conn, error) {
conn := &MockConn{}
return conn, nil
},
},
},
metrics: []telegraf.Metric{
func() telegraf.Metric {
metric, _ := metric.New(
"cpu",
map[string]string{
"host": "example.org",
},
map[string]interface{}{},
time.Unix(0, 0),
)
return metric
}(),
},
logContains: `could not serialize metric: "cpu,host=example.org": no serializable fields`,
},
}
client, err := influxdb.NewUDPClient(config)
require.NoError(t, err)

ctx := context.Background()
err = client.Write(ctx, []telegraf.Metric{getMetric()})
require.Error(t, err)
require.Contains(t, err.Error(), influx.ErrNeedMoreSpace.Error())
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var b bytes.Buffer
log.SetOutput(&b)

client, err := influxdb.NewUDPClient(tt.config)
require.NoError(t, err)

ctx := context.Background()
err = client.Write(ctx, tt.metrics)
require.NoError(t, err)
require.Contains(t, b.String(), tt.logContains)
})
}
}

func TestUDP_WriteWithRealConn(t *testing.T) {
Expand Down
73 changes: 41 additions & 32 deletions plugins/serializers/influx/influx.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,30 +27,34 @@ const (
UintSupport FieldTypeSupport = 1 << iota
)

// MetricError is an error causing a metric to be unserializable.
var (
NeedMoreSpace = "need more space"
InvalidName = "invalid name"
NoFields = "no serializable fields"
)

// MetricError is an error causing an entire metric to be unserializable.
type MetricError struct {
s string
series string
reason string
}

func (e MetricError) Error() string {
return e.s
if e.series != "" {
return fmt.Sprintf("%q: %s", e.series, e.reason)
}
return e.reason
}

// FieldError is an error causing a field to be unserializable.
type FieldError struct {
s string
reason string
}

func (e FieldError) Error() string {
return e.s
return e.reason
}

var (
ErrNeedMoreSpace = &MetricError{"need more space"}
ErrInvalidName = &MetricError{"invalid name"}
ErrNoFields = &MetricError{"no serializable fields"}
)

// Serializer is a serializer for line protocol.
type Serializer struct {
maxLineBytes int
Expand Down Expand Up @@ -102,17 +106,20 @@ func (s *Serializer) Serialize(m telegraf.Metric) ([]byte, error) {
return out, nil
}

// SerializeBatch writes the slice of metrics and returns a byte slice of the
// results. The returned byte slice may contain multiple lines of data.
func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
var batch bytes.Buffer
s.buf.Reset()
for _, m := range metrics {
_, err := s.Write(&batch, m)
_, err := s.Write(&s.buf, m)
if err != nil {
return nil, err
}
}
return batch.Bytes(), nil
out := make([]byte, s.buf.Len())
copy(out, s.buf.Bytes())
return out, nil
}

func (s *Serializer) Write(w io.Writer, m telegraf.Metric) (int, error) {
err := s.writeMetric(w, m)
return s.bytesWritten, err
Expand All @@ -135,7 +142,7 @@ func (s *Serializer) buildHeader(m telegraf.Metric) error {

name := nameEscape(m.Name())
if name == "" {
return ErrInvalidName
return s.newMetricError(InvalidName)
}

s.header = append(s.header, name...)
Expand Down Expand Up @@ -222,31 +229,23 @@ func (s *Serializer) writeMetric(w io.Writer, m telegraf.Metric) error {
}

if s.maxLineBytes > 0 && bytesNeeded > s.maxLineBytes {
// Need at least one field per line
// Need at least one field per line, this metric cannot be fit
// into the max line bytes.
if firstField {
return ErrNeedMoreSpace
return s.newMetricError(NeedMoreSpace)
}

err = s.write(w, s.footer)
if err != nil {
return err
}

firstField = true
bytesNeeded = len(s.header) + len(s.pair) + len(s.footer)

if s.maxLineBytes > 0 && bytesNeeded > s.maxLineBytes {
return ErrNeedMoreSpace
if bytesNeeded > s.maxLineBytes {
return s.newMetricError(NeedMoreSpace)
}

err = s.write(w, s.header)
if err != nil {
return err
}

s.write(w, s.pair)
pairsLen += len(s.pair)
firstField = false
continue
}

if firstField {
Expand All @@ -261,18 +260,28 @@ func (s *Serializer) writeMetric(w io.Writer, m telegraf.Metric) error {
}
}

s.write(w, s.pair)
err = s.write(w, s.pair)
if err != nil {
return err
}

pairsLen += len(s.pair)
firstField = false
}

if firstField {
return ErrNoFields
return s.newMetricError(NoFields)
}

return s.write(w, s.footer)
}

func (s *Serializer) newMetricError(reason string) *MetricError {
if len(s.header) != 0 {
series := bytes.TrimRight(s.header, " ")
return &MetricError{series: string(series), reason: reason}
}
return &MetricError{reason: reason}
}

func (s *Serializer) appendFieldValue(buf []byte, value interface{}) ([]byte, error) {
Expand Down
15 changes: 9 additions & 6 deletions plugins/serializers/influx/influx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ var tests = []struct {
typeSupport FieldTypeSupport
input telegraf.Metric
output []byte
err error
errReason string
}{
{
name: "minimal",
Expand Down Expand Up @@ -98,7 +98,7 @@ var tests = []struct {
time.Unix(0, 0),
),
),
err: ErrNoFields,
errReason: NoFields,
},
{
name: "float Inf",
Expand Down Expand Up @@ -333,8 +333,8 @@ var tests = []struct {
time.Unix(1519194109, 42),
),
),
output: nil,
err: ErrNeedMoreSpace,
output: nil,
errReason: NeedMoreSpace,
},
{
name: "no fields",
Expand All @@ -346,7 +346,7 @@ var tests = []struct {
time.Unix(0, 0),
),
),
err: ErrNoFields,
errReason: NoFields,
},
{
name: "procstat",
Expand Down Expand Up @@ -427,7 +427,10 @@ func TestSerializer(t *testing.T) {
serializer.SetFieldSortOrder(SortFields)
serializer.SetFieldTypeSupport(tt.typeSupport)
output, err := serializer.Serialize(tt.input)
require.Equal(t, tt.err, err)
if tt.errReason != "" {
require.Error(t, err)
require.Contains(t, err.Error(), tt.errReason)
}
require.Equal(t, string(tt.output), string(output))
})
}
Expand Down
13 changes: 3 additions & 10 deletions plugins/serializers/influx/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package influx

import (
"bytes"
"fmt"
"io"
"log"

Expand Down Expand Up @@ -54,17 +53,11 @@ func (r *reader) Read(p []byte) (int, error) {
r.offset += 1
if err != nil {
r.buf.Reset()
switch err.(type) {
case *MetricError:
// Since we are serializing an array of metrics, don't fail
if err != nil {
// Since we are serializing multiple metrics, don't fail the
// the entire batch just because of one unserializable metric.
log.Printf(
"D! [serializers.influx] could not serialize metric %q: %v; discarding metric",
metric.Name(), err)
log.Printf("E! [serializers.influx] could not serialize metric: %v; discarding metric", err)
continue
default:
fmt.Println(err)
return 0, err
}
}
break
Expand Down