From 21555b2787541758b4ae57a332776d4026b89454 Mon Sep 17 00:00:00 2001 From: Suraj Soni Date: Thu, 27 Oct 2016 00:41:44 -0700 Subject: [PATCH 1/3] decode_json_fields processor --- .gitignore | 1 + CHANGELOG.asciidoc | 1 + filebeat/harvester/reader/json.go | 43 +------- filebeat/tests/system/test_json.py | 4 +- filebeat/tests/system/test_registrar.py | 8 ++ libbeat/common/json_transform.go | 46 ++++++++ .../processors/actions/decode_json_fields.go | 100 ++++++++++++++++++ .../actions/decode_json_fields_test.go | 100 ++++++++++++++++++ .../github.com/elastic/go-ucfg/CHANGELOG.md | 1 + 9 files changed, 260 insertions(+), 44 deletions(-) create mode 100644 libbeat/common/json_transform.go create mode 100644 libbeat/processors/actions/decode_json_fields.go create mode 100644 libbeat/processors/actions/decode_json_fields_test.go diff --git a/.gitignore b/.gitignore index 2da59e27668..6e70e3d842c 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,7 @@ /build /*/data /*/logs +/.vscode # Files .DS_Store diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 5c0c411fcee..f7bb5a24838 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -28,6 +28,7 @@ https://github.com/elastic/beats/compare/v5.0.0...master[Check the HEAD diff] ==== Bugfixes *Affecting all Beats* +- Added decode_json_fields processor for decoding fields containing JSON strings. {pull}2605[2605] *Metricbeat* diff --git a/filebeat/harvester/reader/json.go b/filebeat/harvester/reader/json.go index 67deebdd5ff..4dec78d9e24 100644 --- a/filebeat/harvester/reader/json.go +++ b/filebeat/harvester/reader/json.go @@ -69,51 +69,10 @@ func unmarshal(text []byte, fields *map[string]interface{}) error { if err != nil { return err } - transformNumbersDict(*fields) + common.TransformNumbersDict(*fields) return nil } -// transformNumbersDict walks a json decoded tree an replaces json.Number -// with int64, float64, or string, in this order of preference (i.e. if it -// parses as an int, use int. if it parses as a float, use float. etc). -func transformNumbersDict(dict common.MapStr) { - for k, v := range dict { - switch vv := v.(type) { - case json.Number: - dict[k] = transformNumber(vv) - case map[string]interface{}: - transformNumbersDict(vv) - case []interface{}: - transformNumbersArray(vv) - } - } -} - -func transformNumber(value json.Number) interface{} { - i64, err := value.Int64() - if err == nil { - return i64 - } - f64, err := value.Float64() - if err == nil { - return f64 - } - return value.String() -} - -func transformNumbersArray(arr []interface{}) { - for i, v := range arr { - switch vv := v.(type) { - case json.Number: - arr[i] = transformNumber(vv) - case map[string]interface{}: - transformNumbersDict(vv) - case []interface{}: - transformNumbersArray(vv) - } - } -} - // Next decodes JSON and returns the filled Line object. func (r *JSON) Next() (Message, error) { message, err := r.reader.Next() diff --git a/filebeat/tests/system/test_json.py b/filebeat/tests/system/test_json.py index a7a36218916..c708292888a 100644 --- a/filebeat/tests/system/test_json.py +++ b/filebeat/tests/system/test_json.py @@ -261,7 +261,7 @@ def test_with_generic_filtering(self): message_key="message", keys_under_root=True, overwrite_keys=True, - add_error_key=True, + add_error_key=True ), processors=[{ "drop_fields": { @@ -305,7 +305,7 @@ def test_with_generic_filtering_remove_headers(self): message_key="message", keys_under_root=True, overwrite_keys=True, - add_error_key=True, + add_error_key=True ), processors=[{ "drop_fields": { diff --git a/filebeat/tests/system/test_registrar.py b/filebeat/tests/system/test_registrar.py index daba757c385..ddc1589ad6f 100644 --- a/filebeat/tests/system/test_registrar.py +++ b/filebeat/tests/system/test_registrar.py @@ -766,6 +766,10 @@ def test_clean_inactive(self): lambda: self.log_contains_count("Registry file updated") > 1, max_timeout=15) + if os.name == "nt": + # On windows registry recreation can take a bit longer + time.sleep(1) + data = self.get_registry() assert len(data) == 2 @@ -834,6 +838,10 @@ def test_clean_removed(self): lambda: self.log_contains_count("Registry file updated") > 1, max_timeout=15) + if os.name == "nt": + # On windows registry recration can take a bit longer + time.sleep(1) + data = self.get_registry() assert len(data) == 2 diff --git a/libbeat/common/json_transform.go b/libbeat/common/json_transform.go new file mode 100644 index 00000000000..2a9e5a6a22c --- /dev/null +++ b/libbeat/common/json_transform.go @@ -0,0 +1,46 @@ +package common + +import ( + "encoding/json" +) + +// transformNumbersDict walks a json decoded tree an replaces json.Number +// with int64, float64, or string, in this order of preference (i.e. if it +// parses as an int, use int. if it parses as a float, use float. etc). +func TransformNumbersDict(dict MapStr) { + for k, v := range dict { + switch vv := v.(type) { + case json.Number: + dict[k] = TransformNumber(vv) + case map[string]interface{}: + TransformNumbersDict(vv) + case []interface{}: + TransformNumbersArray(vv) + } + } +} + +func TransformNumber(value json.Number) interface{} { + i64, err := value.Int64() + if err == nil { + return i64 + } + f64, err := value.Float64() + if err == nil { + return f64 + } + return value.String() +} + +func TransformNumbersArray(arr []interface{}) { + for i, v := range arr { + switch vv := v.(type) { + case json.Number: + arr[i] = TransformNumber(vv) + case map[string]interface{}: + TransformNumbersDict(vv) + case []interface{}: + TransformNumbersArray(vv) + } + } +} diff --git a/libbeat/processors/actions/decode_json_fields.go b/libbeat/processors/actions/decode_json_fields.go new file mode 100644 index 00000000000..58736228e71 --- /dev/null +++ b/libbeat/processors/actions/decode_json_fields.go @@ -0,0 +1,100 @@ +package actions + +import ( + "bytes" + "encoding/json" + "fmt" + "strings" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/processors" + "github.com/pkg/errors" +) + +type decodeJSONFields struct { + Fields []string +} + +var debug = logp.MakeDebug("filters") + +func init() { + processors.RegisterPlugin("decode_json_fields", configChecked(newDecodeJSONFields, + requireFields("fields"), allowedFields("fields", "when"))) +} + +func newDecodeJSONFields(c common.Config) (processors.Processor, error) { + config := struct { + Fields []string `config:"fields"` + }{} + err := c.Unpack(&config) + if err != nil { + logp.Warn("Error unpacking config for decode_json_fields") + return nil, fmt.Errorf("fail to unpack the decode_json_fields configuration: %s", err) + } + + f := decodeJSONFields{Fields: config.Fields} + return f, nil +} + +func (f decodeJSONFields) Run(event common.MapStr) (common.MapStr, error) { + var errs []string + + for _, field := range f.Fields { + data, err := event.GetValue(field) + if err != nil && errors.Cause(err) != common.ErrKeyNotFound { + debug("Error trying to GetValue for field : %s in event : %v", field, event) + errs = append(errs, err.Error()) + continue + } + text, ok := data.(string) + if ok { + var output map[string]interface{} + err := unmarshal([]byte(text), &output) + if err != nil { + debug("Error trying to unmarshal %s", event[field]) + errs = append(errs, err.Error()) + continue + } + + _, err = event.Put(field, output) + if err != nil { + debug("Error trying to Put value %v for field : %s", output, field) + errs = append(errs, err.Error()) + continue + } + } + } + + return event, fmt.Errorf(strings.Join(errs, ", ")) +} + +// unmarshal is equivalent with json.Unmarshal but it converts numbers +// to int64 where possible, instead of using always float64. +func unmarshal(text []byte, fields *map[string]interface{}) error { + dec := json.NewDecoder(bytes.NewReader(text)) + dec.UseNumber() + err := dec.Decode(fields) + if err != nil { + return err + } + + //Iterate through all the fields to perform deep parsing + for k, v := range *fields { + switch vv := v.(type) { + case string: + var output map[string]interface{} + sErr := unmarshal([]byte(vv), &output) + if sErr == nil { + (*fields)[k] = output + } + } + } + + common.TransformNumbersDict(*fields) + return nil +} + +func (f decodeJSONFields) String() string { + return "decode_json_fields=" + strings.Join(f.Fields, ", ") +} diff --git a/libbeat/processors/actions/decode_json_fields_test.go b/libbeat/processors/actions/decode_json_fields_test.go new file mode 100644 index 00000000000..7c8a7b9633c --- /dev/null +++ b/libbeat/processors/actions/decode_json_fields_test.go @@ -0,0 +1,100 @@ +package actions + +import ( + "testing" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" + "github.com/stretchr/testify/assert" +) + +var fields = [1]string{"msg"} +var config, _ = common.NewConfigFrom(map[string]interface{}{ + "fields": fields, +}) + +func TestMissingKey(t *testing.T) { + input := common.MapStr{ + "pipeline": "us1", + } + + actual := getActualValue(t, config, input) + + expected := common.MapStr{ + "pipeline": "us1", + } + + assert.Equal(t, expected.String(), actual.String()) +} + +func TestFieldNotString(t *testing.T) { + input := common.MapStr{ + "msg": 123, + "pipeline": "us1", + } + + actual := getActualValue(t, config, input) + + expected := common.MapStr{ + "msg": 123, + "pipeline": "us1", + } + + assert.Equal(t, expected.String(), actual.String()) + +} + +func TestInvalidJSON(t *testing.T) { + input := common.MapStr{ + "msg": "{\"log\":\"{\\\"level\\\":\\\"info\\\"}\",\"stream\":\"stderr\",\"count\":3", + "pipeline": "us1", + } + + actual := getActualValue(t, config, input) + + expected := common.MapStr{ + "msg": "{\"log\":\"{\\\"level\\\":\\\"info\\\"}\",\"stream\":\"stderr\",\"count\":3", + "pipeline": "us1", + } + assert.Equal(t, expected.String(), actual.String()) + +} + +func TestValidJSON(t *testing.T) { + input := common.MapStr{ + "msg": "{\"log\":\"{\\\"level\\\":\\\"info\\\"}\",\"stream\":\"stderr\",\"count\":3}", + "pipeline": "us1", + } + + actual := getActualValue(t, config, input) + + expected := common.MapStr{ + "msg": map[string]interface{}{ + "log": map[string]interface{}{ + "level": "info", + }, + "stream": "stderr", + "count": 3, + }, + "pipeline": "us1", + } + + assert.Equal(t, expected.String(), actual.String()) + +} + +func getActualValue(t *testing.T, config *common.Config, input common.MapStr) common.MapStr { + if testing.Verbose() { + logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"*"}) + } + + p, err := newDecodeJSONFields(*config) + if err != nil { + logp.Err("Error initializing decode_json_fields") + t.Fatal(err) + } + + actual, err := p.Run(input) + + return actual +} diff --git a/vendor/github.com/elastic/go-ucfg/CHANGELOG.md b/vendor/github.com/elastic/go-ucfg/CHANGELOG.md index 1d5c8942987..4a53aa362b1 100644 --- a/vendor/github.com/elastic/go-ucfg/CHANGELOG.md +++ b/vendor/github.com/elastic/go-ucfg/CHANGELOG.md @@ -120,6 +120,7 @@ This project adheres to [Semantic Versioning](http://semver.org/). [Unreleased]: https://github.com/elastic/go-ucfg/compare/v0.3.7...HEAD [0.3.7]: https://github.com/elastic/go-ucfg/compare/v0.3.6...v0.3.7 +[Unreleased]: https://github.com/elastic/go-ucfg/compare/v0.3.6...HEAD [0.3.6]: https://github.com/elastic/go-ucfg/compare/v0.3.5...v0.3.6 [0.3.5]: https://github.com/elastic/go-ucfg/compare/v0.3.4...v0.3.5 [0.3.4]: https://github.com/elastic/go-ucfg/compare/v0.3.3...v0.3.4 From 91b64a882484f0f4fb27506b49ef4d941ff6b3c6 Mon Sep 17 00:00:00 2001 From: Suraj Soni Date: Thu, 10 Nov 2016 11:15:38 -0800 Subject: [PATCH 2/3] Refactored to pull json_tranform into its own module --- filebeat/harvester/reader/json.go | 3 ++- .../transform.go} | 24 ++++++++++--------- .../processors/actions/decode_json_fields.go | 3 ++- 3 files changed, 17 insertions(+), 13 deletions(-) rename libbeat/common/{json_transform.go => jsontransform/transform.go} (56%) diff --git a/filebeat/harvester/reader/json.go b/filebeat/harvester/reader/json.go index 4dec78d9e24..6157b551db2 100644 --- a/filebeat/harvester/reader/json.go +++ b/filebeat/harvester/reader/json.go @@ -6,6 +6,7 @@ import ( "fmt" "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/jsontransform" "github.com/elastic/beats/libbeat/logp" ) @@ -69,7 +70,7 @@ func unmarshal(text []byte, fields *map[string]interface{}) error { if err != nil { return err } - common.TransformNumbersDict(*fields) + jsontransform.TransformNumbers(*fields) return nil } diff --git a/libbeat/common/json_transform.go b/libbeat/common/jsontransform/transform.go similarity index 56% rename from libbeat/common/json_transform.go rename to libbeat/common/jsontransform/transform.go index 2a9e5a6a22c..a6d065ee430 100644 --- a/libbeat/common/json_transform.go +++ b/libbeat/common/jsontransform/transform.go @@ -1,26 +1,28 @@ -package common +package jsontransform import ( "encoding/json" + + "github.com/elastic/beats/libbeat/common" ) -// transformNumbersDict walks a json decoded tree an replaces json.Number +// TransformNumbers walks a json decoded tree an replaces json.Number // with int64, float64, or string, in this order of preference (i.e. if it // parses as an int, use int. if it parses as a float, use float. etc). -func TransformNumbersDict(dict MapStr) { +func TransformNumbers(dict common.MapStr) { for k, v := range dict { switch vv := v.(type) { case json.Number: - dict[k] = TransformNumber(vv) + dict[k] = transformNumber(vv) case map[string]interface{}: - TransformNumbersDict(vv) + TransformNumbers(vv) case []interface{}: - TransformNumbersArray(vv) + transformNumbersArray(vv) } } } -func TransformNumber(value json.Number) interface{} { +func transformNumber(value json.Number) interface{} { i64, err := value.Int64() if err == nil { return i64 @@ -32,15 +34,15 @@ func TransformNumber(value json.Number) interface{} { return value.String() } -func TransformNumbersArray(arr []interface{}) { +func transformNumbersArray(arr []interface{}) { for i, v := range arr { switch vv := v.(type) { case json.Number: - arr[i] = TransformNumber(vv) + arr[i] = transformNumber(vv) case map[string]interface{}: - TransformNumbersDict(vv) + TransformNumbers(vv) case []interface{}: - TransformNumbersArray(vv) + transformNumbersArray(vv) } } } diff --git a/libbeat/processors/actions/decode_json_fields.go b/libbeat/processors/actions/decode_json_fields.go index 58736228e71..2168b5ac6ab 100644 --- a/libbeat/processors/actions/decode_json_fields.go +++ b/libbeat/processors/actions/decode_json_fields.go @@ -7,6 +7,7 @@ import ( "strings" "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/jsontransform" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/processors" "github.com/pkg/errors" @@ -91,7 +92,7 @@ func unmarshal(text []byte, fields *map[string]interface{}) error { } } - common.TransformNumbersDict(*fields) + jsontransform.TransformNumbers(*fields) return nil } From 3e530223e02825dda7ccb7f7f31ec5f993e9cf45 Mon Sep 17 00:00:00 2001 From: Suraj Soni Date: Tue, 15 Nov 2016 16:04:52 -0800 Subject: [PATCH 3/3] Refactored decode_json_fields to accomodate feedback --- .../processors/actions/decode_json_fields.go | 109 +++++++++++++----- .../actions/decode_json_fields_test.go | 42 +++++-- 2 files changed, 117 insertions(+), 34 deletions(-) diff --git a/libbeat/processors/actions/decode_json_fields.go b/libbeat/processors/actions/decode_json_fields.go index 2168b5ac6ab..1eb4167e45e 100644 --- a/libbeat/processors/actions/decode_json_fields.go +++ b/libbeat/processors/actions/decode_json_fields.go @@ -14,34 +14,51 @@ import ( ) type decodeJSONFields struct { - Fields []string + fields []string + maxDepth int + processArray bool } +type config struct { + Fields []string `config:"fields"` + MaxDepth int `config:"maxDepth" validate:"min=1"` + ProcessArray bool `config:"processArray"` +} + +var ( + defaultConfig = config{ + MaxDepth: 1, + ProcessArray: false, + } +) + var debug = logp.MakeDebug("filters") func init() { - processors.RegisterPlugin("decode_json_fields", configChecked(newDecodeJSONFields, - requireFields("fields"), allowedFields("fields", "when"))) + processors.RegisterPlugin("decode_json_fields", + configChecked(newDecodeJSONFields, + requireFields("fields"), + allowedFields("fields", "maxDepth", "processArray"))) } func newDecodeJSONFields(c common.Config) (processors.Processor, error) { - config := struct { - Fields []string `config:"fields"` - }{} + config := defaultConfig + err := c.Unpack(&config) + if err != nil { logp.Warn("Error unpacking config for decode_json_fields") return nil, fmt.Errorf("fail to unpack the decode_json_fields configuration: %s", err) } - f := decodeJSONFields{Fields: config.Fields} + f := decodeJSONFields{fields: config.Fields, maxDepth: config.MaxDepth, processArray: config.ProcessArray} return f, nil } func (f decodeJSONFields) Run(event common.MapStr) (common.MapStr, error) { var errs []string - for _, field := range f.Fields { + for _, field := range f.fields { data, err := event.GetValue(field) if err != nil && errors.Cause(err) != common.ErrKeyNotFound { debug("Error trying to GetValue for field : %s in event : %v", field, event) @@ -50,8 +67,8 @@ func (f decodeJSONFields) Run(event common.MapStr) (common.MapStr, error) { } text, ok := data.(string) if ok { - var output map[string]interface{} - err := unmarshal([]byte(text), &output) + var output interface{} + err := unmarshal(f.maxDepth, []byte(text), &output, f.processArray) if err != nil { debug("Error trying to unmarshal %s", event[field]) errs = append(errs, err.Error()) @@ -70,32 +87,70 @@ func (f decodeJSONFields) Run(event common.MapStr) (common.MapStr, error) { return event, fmt.Errorf(strings.Join(errs, ", ")) } -// unmarshal is equivalent with json.Unmarshal but it converts numbers -// to int64 where possible, instead of using always float64. -func unmarshal(text []byte, fields *map[string]interface{}) error { - dec := json.NewDecoder(bytes.NewReader(text)) - dec.UseNumber() - err := dec.Decode(fields) - if err != nil { +func unmarshal(maxDepth int, text []byte, fields *interface{}, processArray bool) error { + if err := DecodeJSON(text, fields); err != nil { return err } - //Iterate through all the fields to perform deep parsing - for k, v := range *fields { - switch vv := v.(type) { - case string: - var output map[string]interface{} - sErr := unmarshal([]byte(vv), &output) - if sErr == nil { - (*fields)[k] = output + maxDepth-- + if maxDepth == 0 { + return nil + } + + tryUnmarshal := func(v interface{}) (interface{}, bool) { + str, isString := v.(string) + if !isString { + return v, false + } + + var tmp interface{} + err := unmarshal(maxDepth, []byte(str), &tmp, processArray) + if err != nil { + return v, false + } + + return tmp, true + } + + // try to deep unmarshal fields + switch O := interface{}(*fields).(type) { + case map[string]interface{}: + for k, v := range O { + if decoded, ok := tryUnmarshal(v); ok { + O[k] = decoded + } + } + // We want to process arrays here + case []interface{}: + if !processArray { + break + } + + for i, v := range O { + if decoded, ok := tryUnmarshal(v); ok { + O[i] = decoded } } } + return nil +} + +func DecodeJSON(text []byte, to *interface{}) error { + dec := json.NewDecoder(bytes.NewReader(text)) + dec.UseNumber() + err := dec.Decode(to) + + if err != nil { + return err + } - jsontransform.TransformNumbers(*fields) + switch O := interface{}(*to).(type) { + case map[string]interface{}: + jsontransform.TransformNumbers(O) + } return nil } func (f decodeJSONFields) String() string { - return "decode_json_fields=" + strings.Join(f.Fields, ", ") + return "decode_json_fields=" + strings.Join(f.fields, ", ") } diff --git a/libbeat/processors/actions/decode_json_fields_test.go b/libbeat/processors/actions/decode_json_fields_test.go index 7c8a7b9633c..70b0bdd5df6 100644 --- a/libbeat/processors/actions/decode_json_fields_test.go +++ b/libbeat/processors/actions/decode_json_fields_test.go @@ -9,8 +9,9 @@ import ( ) var fields = [1]string{"msg"} -var config, _ = common.NewConfigFrom(map[string]interface{}{ - "fields": fields, +var testConfig, _ = common.NewConfigFrom(map[string]interface{}{ + "fields": fields, + "processArray": false, }) func TestMissingKey(t *testing.T) { @@ -18,7 +19,7 @@ func TestMissingKey(t *testing.T) { "pipeline": "us1", } - actual := getActualValue(t, config, input) + actual := getActualValue(t, testConfig, input) expected := common.MapStr{ "pipeline": "us1", @@ -33,7 +34,7 @@ func TestFieldNotString(t *testing.T) { "pipeline": "us1", } - actual := getActualValue(t, config, input) + actual := getActualValue(t, testConfig, input) expected := common.MapStr{ "msg": 123, @@ -50,7 +51,7 @@ func TestInvalidJSON(t *testing.T) { "pipeline": "us1", } - actual := getActualValue(t, config, input) + actual := getActualValue(t, testConfig, input) expected := common.MapStr{ "msg": "{\"log\":\"{\\\"level\\\":\\\"info\\\"}\",\"stream\":\"stderr\",\"count\":3", @@ -60,13 +61,40 @@ func TestInvalidJSON(t *testing.T) { } -func TestValidJSON(t *testing.T) { +func TestValidJSONDepthOne(t *testing.T) { input := common.MapStr{ "msg": "{\"log\":\"{\\\"level\\\":\\\"info\\\"}\",\"stream\":\"stderr\",\"count\":3}", "pipeline": "us1", } - actual := getActualValue(t, config, input) + actual := getActualValue(t, testConfig, input) + + expected := common.MapStr{ + "msg": map[string]interface{}{ + "log": "{\"level\":\"info\"}", + "stream": "stderr", + "count": 3, + }, + "pipeline": "us1", + } + + assert.Equal(t, expected.String(), actual.String()) + +} + +func TestValidJSONDepthTwo(t *testing.T) { + input := common.MapStr{ + "msg": "{\"log\":\"{\\\"level\\\":\\\"info\\\"}\",\"stream\":\"stderr\",\"count\":3}", + "pipeline": "us1", + } + + testConfig, _ = common.NewConfigFrom(map[string]interface{}{ + "fields": fields, + "processArray": false, + "maxDepth": 2, + }) + + actual := getActualValue(t, testConfig, input) expected := common.MapStr{ "msg": map[string]interface{}{