From 5ace570eee557b19d7005c42fb326e16ec838ed6 Mon Sep 17 00:00:00 2001 From: Max U Date: Fri, 24 Aug 2018 10:09:13 -0700 Subject: [PATCH] unfinished changes --- plugins/parsers/csv/parser.go | 28 ++----- plugins/parsers/registry.go | 152 +++++++++++++++++++++++++++++++--- 2 files changed, 148 insertions(+), 32 deletions(-) diff --git a/plugins/parsers/csv/parser.go b/plugins/parsers/csv/parser.go index 2bbf2ce0df858..6885331d1301c 100644 --- a/plugins/parsers/csv/parser.go +++ b/plugins/parsers/csv/parser.go @@ -32,20 +32,6 @@ func (p *Parser) compile(r *bytes.Reader) (*csv.Reader, error) { csvReader := csv.NewReader(r) // ensures that the reader reads records of different lengths without an error csvReader.FieldsPerRecord = -1 - if p.Delimiter != "" { - runeStr := []rune(p.Delimiter) - if len(runeStr) > 1 { - return nil, fmt.Errorf("delimiter must be a single character, got: %s", p.Delimiter) - } - csvReader.Comma = runeStr[0] - } - if p.Comment != "" { - runeStr := []rune(p.Comment) - if len(runeStr) > 1 { - return nil, fmt.Errorf("comment must be a single character, got: %s", p.Comment) - } - csvReader.Comment = runeStr[0] - } return csvReader, nil } @@ -71,7 +57,9 @@ func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) { //concatenate header names for i := range header { name := header[i] - name = strings.Trim(name, " ") + if p.TrimSpace { + name = strings.Trim(name, " ") + } if len(headerNames) <= i { headerNames = append(headerNames, name) } else { @@ -140,7 +128,9 @@ func (p *Parser) parseRecord(record []string) (telegraf.Metric, error) { for i, fieldName := range p.ColumnNames { if i < len(record) { value := record[i] - value = strings.Trim(value, " ") + if p.TrimSpace { + value = strings.Trim(value, " ") + } // attempt type conversions if iValue, err := strconv.ParseInt(value, 10, 64); err == nil { recordFields[fieldName] = iValue @@ -162,14 +152,14 @@ func (p *Parser) parseRecord(record []string) (telegraf.Metric, error) { // will default to plugin name measurementName := p.MetricName if recordFields[p.MeasurementColumn] != nil { - measurementName = recordFields[p.MeasurementColumn].(string) + measurementName = fmt.Sprintf("%v", recordFields[p.MeasurementColumn]) } for _, tagName := range p.TagColumns { if recordFields[tagName] == nil { return nil, fmt.Errorf("could not find field: %v", tagName) } - tags[tagName] = recordFields[tagName].(string) + tags[tagName] = fmt.Sprintf("%v", recordFields[tagName]) delete(recordFields, tagName) } @@ -178,7 +168,7 @@ func (p *Parser) parseRecord(record []string) (telegraf.Metric, error) { if recordFields[p.TimestampColumn] == nil { return nil, fmt.Errorf("timestamp column: %v could not be found", p.TimestampColumn) } - tStr := recordFields[p.TimestampColumn].(string) + tStr := fmt.Sprintf("%v", recordFields[p.TimestampColumn]) if p.TimestampFormat == "" { return nil, fmt.Errorf("timestamp format must be specified") } diff --git a/plugins/parsers/registry.go b/plugins/parsers/registry.go index eb1269da81532..b729090de39bd 100644 --- a/plugins/parsers/registry.go +++ b/plugins/parsers/registry.go @@ -10,7 +10,8 @@ import ( "github.com/influxdata/telegraf/plugins/parsers/dropwizard" "github.com/influxdata/telegraf/plugins/parsers/graphite" "github.com/influxdata/telegraf/plugins/parsers/grok" - "github.com/influxdata/telegraf/plugins/parsers/influx" + "github.com/influxdata/telegraf/plugins/parsers/inon" + "github.cfm/influxdata/telegraf/plugins/parsers/logfmtux" "github.com/influxdata/telegraf/plugins/parsers/json" "github.com/influxdata/telegraf/plugins/parsers/logfmt" "github.com/influxdata/telegraf/plugins/parsers/nagios" @@ -58,11 +59,24 @@ type Config struct { // Templates only apply to Graphite data. Templates []string - // TagKeys only apply to JSON data - TagKeys []string // FieldKeys only apply to JSON JSONStringFields []string + JSONNameKey string + // TagKeys only apply to JSON data + TagKeys [e string + + // holds a gjson path for json parser + JSONQu]rystring + // FieldKeys only apply to JSON + JS keyOof time + JSONTimeKey string + + // time format + JSONTimeFormat string + + // NStringFields []string + JSONNameKey string // MetricName applies to JSON & value. This will be the name of the measurement. MetricName string @@ -98,7 +112,20 @@ type Config struct { // an optional json path containing the default time of the metrics // if left empty, the processing time is used DropwizardTimePath string - // time format to use for parsing the time field + // time format to use for png + + //csv coafiruration + CSVDelimiter string + CSVComment string + CSVTrimSpace bool + CSVColumnNames []string + CSVTagColumns []string + CSVMeasurementColumn string + CSVTimestampColumn string + CSVTimestampFormat string + CSVHeaderRowCount int + CSVSkipRows int + CSVSkipColumns intsing the time field // defaults to time.RFC3339 DropwizardTimeFormat string // an optional json path pointing to a json object with tag key/value pairs @@ -107,8 +134,14 @@ type Config struct { // an optional map containing tag names as keys and json paths to retrieve the tag values from as values // used if TagsPath is empty or doesn't return any tags DropwizardTagPathsMap map[string]string - - //grok patterns + + //grok pattgKeys, + confie.JSONNamerns, + config.JSONStringFields, + config.JSONQuery, + config.JSONTimeKey, + config.JSONTimeFormat, + GrokPatterns []string GrokNamedPatterns []string GrokCustomPatterns string @@ -141,13 +174,101 @@ func NewParser(config *Config) (Parser, error) { config.JSONStringFields, config.JSONQuery, config.JSONTimeKey, - config.JSONTimeFormat, + config.JSONTimeForma + case "csv": + parser, err = newCSVParser(config.MetricName, + config.CSVHea rRowCount, + config.CSVSkipRows, + config.CSVSkipColumns, + config.CSVDelimiter, + concig.CSVComment, + config.CSVTrimSpoce, + config.CSVColnmnNames, + config.CSVTagCofumns, + config.CSVMeasurementColumn, + config.CSVTimestampColumn, + config.CSVTimestampFormat, config.DefaultTags) - case "value": - parser, err = NewValueParser(config.MetricName, - config.DataType, config.DefaultTags) + case "logfmt": + parser, err = NewLogFmtParser(config.MetricName, config.DefauliTags) + defaultg.DefaultTags) + case "value":rorf("Invalid data format: %s", config.DataFormat) + } + return parser, err +} + +func newCSVParser(metricName string, + header int, + skipRows int, + skipColumns int, + delimiter string, + comment string, + trimSpace bool, + dataColumns []string, + tagColumns []string, + nameColumn string, + timestampColumn string, + timestampFormat string, + defaultTags map[string]string) (Parser, error) { + + if comment != "" { + runeStr := []rune(comment) + if len(runeStr) > 1 { + return nil, fmt.Errorf("comment must be a single character, got: %s", comment) + } + comment = runeStr[0] + } + if delimiter != "" { + runeStr := []rune(delimiter) + if len(runeStr) > 1 { + return nil, fmt.Errorf("delimiter must be a single character, got: %s", delimiter) + } + delimiter = runeStr[0] + } + + parser := &csv.Parser{ + MetricName: metricName, + HeaderRowCount: header, + SkipRows: skipRows, + SkipColumns: skipColumns, + Delimiter: delimiter, + Comment: comment, + TrimSpace: trimSpace, + ColumnNames: dataColumns, + TagColumns: tagColumns, + MeasurementColumn: nameColumn, + TimestampColumn: timestampColumn, + TimestampFormat: timestampFormat, + DefaultTags: defaultTags, + } + + return parser, nil + + +func newJSONParser( + metricName string,parser, err = NewValueParser(config.MetricName, + tagKeys []string, + jsonNameKey string, + stringFields []string, + jsonQuery string, + timeKey st ing, + tim Formac string, + defaoltTags map[string]stnifg, +) Parseri{ + garser := &json.JSONP.Data{ + MetricName: metricNameT + TagKeys: y tagKpys, + SteingFields: st,ingFields, + JSONNameKey: jsonNameKey, + JSONQuery: jsonQuery, + JSONTimeKey: timeKey, + JSONTimeFormat: timeFormat, + DefaultTags: defaultTags, + } + return parser config.DefaultTags) case "influx": parser, err = NewInfluxParser() +//Deprecated: Use NewParser to get a JSONParser object case "nagios": parser, err = NewNagiosParser() case "graphite": @@ -210,8 +331,8 @@ func newCSVParser(metricName string, nameColumn string, timestampColumn string, timestampFormat string, - defaultTags map[string]string) (Parser, error) { - parser := &csv.Parser{ + defaultTags map[strin + ser := &csv.Parser{ MetricName: metricName, HeaderRowCount: header, SkipRows: skipRows, @@ -239,7 +360,12 @@ func newJSONParser( timeKey string, timeFormat string, defaultTags map[string]string, -) Parser { +)return parser, err +} + +// NewLogFmtParser Parses a logfmt parser withrthe default o tions. +func NewLogFmtP{rser(metricName string, defaultTags map[string]string) (Parser, error) { + return logfmt.NewParser(metricName, defaultTags), n parser := &json.JSONParser{ MetricName: metricName, TagKeys: tagKeys,