Skip to content

Commit

Permalink
Add csv parser (influxdata#4439)
Browse files Browse the repository at this point in the history
  • Loading branch information
maxunt authored and otherpirate committed Mar 15, 2019
1 parent 32ae433 commit a9c81cd
Show file tree
Hide file tree
Showing 5 changed files with 733 additions and 20 deletions.
122 changes: 102 additions & 20 deletions docs/DATA_FORMATS_INPUT.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ Telegraf is able to parse the following input data formats into metrics:
1. [Grok](#grok)
1. [Logfmt](#logfmt)
1. [Wavefront](#wavefront)
1. [CSV](#csv)

Telegraf metrics, like InfluxDB
[points](https://docs.influxdata.com/influxdb/v0.10/write_protocols/line/),
Expand Down Expand Up @@ -107,28 +108,28 @@ but can be overridden using the `name_override` config option.

#### JSON Configuration:

The JSON data format supports specifying "tag_keys", "string_keys", and "json_query".
If specified, keys in "tag_keys" and "string_keys" will be searched for in the root-level
and any nested lists of the JSON blob. All int and float values are added to fields by default.
If the key(s) exist, they will be applied as tags or fields to the Telegraf metrics.
The JSON data format supports specifying "tag_keys", "string_keys", and "json_query".
If specified, keys in "tag_keys" and "string_keys" will be searched for in the root-level
and any nested lists of the JSON blob. All int and float values are added to fields by default.
If the key(s) exist, they will be applied as tags or fields to the Telegraf metrics.
If "string_keys" is specified, the string will be added as a field.

The "json_query" configuration is a gjson path to an JSON object or
list of JSON objects. If this path leads to an array of values or
single data point an error will be thrown. If this configuration
The "json_query" configuration is a gjson path to an JSON object or
list of JSON objects. If this path leads to an array of values or
single data point an error will be thrown. If this configuration
is specified, only the result of the query will be parsed and returned as metrics.

The "json_name_key" configuration specifies the key of the field whos value will be
added as the metric name.

Object paths are specified using gjson path format, which is denoted by object keys
concatenated with "." to go deeper in nested JSON objects.
Object paths are specified using gjson path format, which is denoted by object keys
concatenated with "." to go deeper in nested JSON objects.
Additional information on gjson paths can be found here: https://github.com/tidwall/gjson#path-syntax

The JSON data format also supports extracting time values through the
config "json_time_key" and "json_time_format". If "json_time_key" is set,
"json_time_format" must be specified. The "json_time_key" describes the
name of the field containing time information. The "json_time_format"
The JSON data format also supports extracting time values through the
config "json_time_key" and "json_time_format". If "json_time_key" is set,
"json_time_format" must be specified. The "json_time_key" describes the
name of the field containing time information. The "json_time_format"
must be a recognized Go time format.
If there is no year provided, the metrics will have the current year.
More info on time formats can be found here: https://golang.org/pkg/time/#Parse
Expand Down Expand Up @@ -161,8 +162,8 @@ For example, if you had this configuration:
## List of field names to extract from JSON and add as string fields
# json_string_fields = []

## gjson query path to specify a specific chunk of JSON to be parsed with
## the above configuration. If not specified, the whole file will be parsed.
## gjson query path to specify a specific chunk of JSON to be parsed with
## the above configuration. If not specified, the whole file will be parsed.
## gjson query paths are described here: https://github.com/tidwall/gjson#path-syntax
# json_query = ""

Expand Down Expand Up @@ -191,8 +192,8 @@ Your Telegraf metrics would get tagged with "my_tag_1"
exec_mycollector,my_tag_1=foo a=5,b_c=6
```

If the JSON data is an array, then each element of the array is
parsed with the configured settings. Each resulting metric will
If the JSON data is an array, then each element of the array is
parsed with the configured settings. Each resulting metric will
be output with the same timestamp.

For example, if the following configuration:
Expand Down Expand Up @@ -220,7 +221,7 @@ For example, if the following configuration:
## List of field names to extract from JSON and add as string fields
# string_fields = []

## gjson query path to specify a specific chunk of JSON to be parsed with
## gjson query path to specify a specific chunk of JSON to be parsed with
## the above configuration. If not specified, the whole file will be parsed
# json_query = ""

Expand Down Expand Up @@ -264,7 +265,7 @@ exec_mycollector,my_tag_1=foo,my_tag_2=baz b_c=6 1136387040000000000
exec_mycollector,my_tag_1=bar,my_tag_2=baz b_c=8 1168527840000000000
```

If you want to only use a specific portion of your JSON, use the "json_query"
If you want to only use a specific portion of your JSON, use the "json_query"
configuration to specify a path to a JSON object.

For example, with the following config:
Expand All @@ -288,7 +289,7 @@ For example, with the following config:
## List of field names to extract from JSON and add as string fields
string_fields = ["last"]

## gjson query path to specify a specific chunk of JSON to be parsed with
## gjson query path to specify a specific chunk of JSON to be parsed with
## the above configuration. If not specified, the whole file will be parsed
json_query = "obj.friends"

Expand Down Expand Up @@ -1038,3 +1039,84 @@ There are no additional configuration options for Wavefront Data Format line-pro
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "wavefront"
```

# CSV
Parse out metrics from a CSV formatted table. By default, the parser assumes there is no header and
will read data from the first line. If `csv_header_row_count` is set to anything besides 0, the parser
will extract column names from the first number of rows. Headers of more than 1 row will have their
names concatenated together. Any unnamed columns will be ignored by the parser.

The `csv_skip_rows` config indicates the number of rows to skip before looking for header information or data
to parse. By default, no rows will be skipped.

The `csv_skip_columns` config indicates the number of columns to be skipped before parsing data. These
columns will not be read out of the header. Naming with the `csv_column_names` will begin at the first
parsed column after skipping the indicated columns. By default, no columns are skipped.

To assign custom column names, the `csv_column_names` config is available. If the `csv_column_names`
config is used, all columns must be named as additional columns will be ignored. If `csv_header_row_count`
is set to 0, `csv_column_names` must be specified. Names listed in `csv_column_names` will override names extracted
from the header.

The `csv_tag_columns` and `csv_field_columns` configs are available to add the column data to the metric.
The name used to specify the column is the name in the header, or if specified, the corresponding
name assigned in `csv_column_names`. If neither config is specified, no data will be added to the metric.

Additional configs are available to dynamically name metrics and set custom timestamps. If the
`csv_column_names` config is specified, the parser will assign the metric name to the value found
in that column. If the `csv_timestamp_column` is specified, the parser will extract the timestamp from
that column. If `csv_timestamp_column` is specified, the `csv_timestamp_format` must also be specified
or an error will be thrown.

#### CSV Configuration
```toml
data_format = "csv"

## Indicates how many rows to treat as a header. By default, the parser assumes
## there is no header and will parse the first row as data. If set to anything more
## than 1, column names will be concatenated with the name listed in the next header row.
## If `csv_column_names` is specified, the column names in header will be overridden.
# csv_header_row_count = 0

## Indicates the number of rows to skip before looking for header information.
# csv_skip_rows = 0

## Indicates the number of columns to skip before looking for data to parse.
## These columns will be skipped in the header as well.
# csv_skip_columns = 0

## The seperator between csv fields
## By default, the parser assumes a comma (",")
# csv_delimiter = ","

## The character reserved for marking a row as a comment row
## Commented rows are skipped and not parsed
# csv_comment = ""

## If set to true, the parser will remove leading whitespace from fields
## By default, this is false
# csv_trim_space = false

## For assigning custom names to columns
## If this is specified, all columns should have a name
## Unnamed columns will be ignored by the parser.
## If `csv_header_row_count` is set to 0, this config must be used
csv_column_names = []

## Columns listed here will be added as tags. Any other columns
## will be added as fields.
csv_tag_columns = []

## The column to extract the name of the metric from
## By default, this is the name of the plugin
## the `name_override` config overrides this
# csv_measurement_column = ""

## The column to extract time information for the metric
## `csv_timestamp_format` must be specified if this is used
# csv_timestamp_column = ""

## The format of time data extracted from `csv_timestamp_column`
## this must be specified if `csv_timestamp_column` is specified
# csv_timestamp_format = ""
```
122 changes: 122 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1443,6 +1443,120 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) {
}
}

//for csv parser
if node, ok := tbl.Fields["csv_column_names"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if ary, ok := kv.Value.(*ast.Array); ok {
for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok {
c.CSVColumnNames = append(c.CSVColumnNames, str.Value)
}
}
}
}
}

if node, ok := tbl.Fields["csv_tag_columns"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if ary, ok := kv.Value.(*ast.Array); ok {
for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok {
c.CSVTagColumns = append(c.CSVTagColumns, str.Value)
}
}
}
}
}

if node, ok := tbl.Fields["csv_delimiter"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
c.CSVDelimiter = str.Value
}
}
}

if node, ok := tbl.Fields["csv_comment"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
c.CSVComment = str.Value
}
}
}

if node, ok := tbl.Fields["csv_measurement_column"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
c.CSVMeasurementColumn = str.Value
}
}
}

if node, ok := tbl.Fields["csv_timestamp_column"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
c.CSVTimestampColumn = str.Value
}
}
}

if node, ok := tbl.Fields["csv_timestamp_format"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
c.CSVTimestampFormat = str.Value
}
}
}

if node, ok := tbl.Fields["csv_header_row_count"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
iVal, err := strconv.Atoi(str.Value)
c.CSVHeaderRowCount = iVal
if err != nil {
return nil, fmt.Errorf("E! parsing to int: %v", err)
}
}
}
}

if node, ok := tbl.Fields["csv_skip_rows"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
iVal, err := strconv.Atoi(str.Value)
c.CSVSkipRows = iVal
if err != nil {
return nil, fmt.Errorf("E! parsing to int: %v", err)
}
}
}
}

if node, ok := tbl.Fields["csv_skip_columns"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
iVal, err := strconv.Atoi(str.Value)
c.CSVSkipColumns = iVal
if err != nil {
return nil, fmt.Errorf("E! parsing to int: %v", err)
}
}
}
}

if node, ok := tbl.Fields["csv_trim_space"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.Boolean); ok {
//for config with no quotes
val, err := strconv.ParseBool(str.Value)
c.CSVTrimSpace = val
if err != nil {
return nil, fmt.Errorf("E! parsing to bool: %v", err)
}
}
}
}

c.MetricName = name

delete(tbl.Fields, "data_format")
Expand All @@ -1469,6 +1583,14 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) {
delete(tbl.Fields, "grok_custom_patterns")
delete(tbl.Fields, "grok_custom_pattern_files")
delete(tbl.Fields, "grok_timezone")
delete(tbl.Fields, "csv_data_columns")
delete(tbl.Fields, "csv_tag_columns")
delete(tbl.Fields, "csv_field_columns")
delete(tbl.Fields, "csv_name_column")
delete(tbl.Fields, "csv_timestamp_column")
delete(tbl.Fields, "csv_timestamp_format")
delete(tbl.Fields, "csv_delimiter")
delete(tbl.Fields, "csv_header")

return parsers.NewParser(c)
}
Expand Down
Loading

0 comments on commit a9c81cd

Please sign in to comment.