diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 17cc560851f..ef625667616 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -33,6 +33,7 @@ https://github.com/elastic/beats/compare/v5.0.0...master[Check the HEAD diff] ==== Bugfixes *Affecting all Beats* +- Added decode_json_fields processor for decoding fields containing JSON strings. {pull}2605[2605] *Metricbeat* diff --git a/filebeat/harvester/reader/json.go b/filebeat/harvester/reader/json.go index 67deebdd5ff..6157b551db2 100644 --- a/filebeat/harvester/reader/json.go +++ b/filebeat/harvester/reader/json.go @@ -6,6 +6,7 @@ import ( "fmt" "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/jsontransform" "github.com/elastic/beats/libbeat/logp" ) @@ -69,51 +70,10 @@ func unmarshal(text []byte, fields *map[string]interface{}) error { if err != nil { return err } - transformNumbersDict(*fields) + jsontransform.TransformNumbers(*fields) return nil } -// transformNumbersDict walks a json decoded tree an replaces json.Number -// with int64, float64, or string, in this order of preference (i.e. if it -// parses as an int, use int. if it parses as a float, use float. etc). -func transformNumbersDict(dict common.MapStr) { - for k, v := range dict { - switch vv := v.(type) { - case json.Number: - dict[k] = transformNumber(vv) - case map[string]interface{}: - transformNumbersDict(vv) - case []interface{}: - transformNumbersArray(vv) - } - } -} - -func transformNumber(value json.Number) interface{} { - i64, err := value.Int64() - if err == nil { - return i64 - } - f64, err := value.Float64() - if err == nil { - return f64 - } - return value.String() -} - -func transformNumbersArray(arr []interface{}) { - for i, v := range arr { - switch vv := v.(type) { - case json.Number: - arr[i] = transformNumber(vv) - case map[string]interface{}: - transformNumbersDict(vv) - case []interface{}: - transformNumbersArray(vv) - } - } -} - // Next decodes JSON and returns the filled Line object. func (r *JSON) Next() (Message, error) { message, err := r.reader.Next() diff --git a/filebeat/tests/system/test_json.py b/filebeat/tests/system/test_json.py index a7a36218916..c708292888a 100644 --- a/filebeat/tests/system/test_json.py +++ b/filebeat/tests/system/test_json.py @@ -261,7 +261,7 @@ def test_with_generic_filtering(self): message_key="message", keys_under_root=True, overwrite_keys=True, - add_error_key=True, + add_error_key=True ), processors=[{ "drop_fields": { @@ -305,7 +305,7 @@ def test_with_generic_filtering_remove_headers(self): message_key="message", keys_under_root=True, overwrite_keys=True, - add_error_key=True, + add_error_key=True ), processors=[{ "drop_fields": { diff --git a/filebeat/tests/system/test_registrar.py b/filebeat/tests/system/test_registrar.py index c2ea05abf14..74f2727e524 100644 --- a/filebeat/tests/system/test_registrar.py +++ b/filebeat/tests/system/test_registrar.py @@ -766,6 +766,10 @@ def test_clean_inactive(self): lambda: self.log_contains_count("Registry file updated") > 1, max_timeout=15) + if os.name == "nt": + # On windows registry recreation can take a bit longer + time.sleep(1) + data = self.get_registry() assert len(data) == 2 @@ -834,6 +838,10 @@ def test_clean_removed(self): lambda: self.log_contains_count("Registry file updated") > 1, max_timeout=15) + if os.name == "nt": + # On windows registry recration can take a bit longer + time.sleep(1) + data = self.get_registry() assert len(data) == 2 diff --git a/libbeat/common/jsontransform/transform.go b/libbeat/common/jsontransform/transform.go new file mode 100644 index 00000000000..a6d065ee430 --- /dev/null +++ b/libbeat/common/jsontransform/transform.go @@ -0,0 +1,48 @@ +package jsontransform + +import ( + "encoding/json" + + "github.com/elastic/beats/libbeat/common" +) + +// TransformNumbers walks a json decoded tree an replaces json.Number +// with int64, float64, or string, in this order of preference (i.e. if it +// parses as an int, use int. if it parses as a float, use float. etc). +func TransformNumbers(dict common.MapStr) { + for k, v := range dict { + switch vv := v.(type) { + case json.Number: + dict[k] = transformNumber(vv) + case map[string]interface{}: + TransformNumbers(vv) + case []interface{}: + transformNumbersArray(vv) + } + } +} + +func transformNumber(value json.Number) interface{} { + i64, err := value.Int64() + if err == nil { + return i64 + } + f64, err := value.Float64() + if err == nil { + return f64 + } + return value.String() +} + +func transformNumbersArray(arr []interface{}) { + for i, v := range arr { + switch vv := v.(type) { + case json.Number: + arr[i] = transformNumber(vv) + case map[string]interface{}: + TransformNumbers(vv) + case []interface{}: + transformNumbersArray(vv) + } + } +} diff --git a/libbeat/processors/actions/decode_json_fields.go b/libbeat/processors/actions/decode_json_fields.go new file mode 100644 index 00000000000..1eb4167e45e --- /dev/null +++ b/libbeat/processors/actions/decode_json_fields.go @@ -0,0 +1,156 @@ +package actions + +import ( + "bytes" + "encoding/json" + "fmt" + "strings" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/jsontransform" + "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/processors" + "github.com/pkg/errors" +) + +type decodeJSONFields struct { + fields []string + maxDepth int + processArray bool +} + +type config struct { + Fields []string `config:"fields"` + MaxDepth int `config:"maxDepth" validate:"min=1"` + ProcessArray bool `config:"processArray"` +} + +var ( + defaultConfig = config{ + MaxDepth: 1, + ProcessArray: false, + } +) + +var debug = logp.MakeDebug("filters") + +func init() { + processors.RegisterPlugin("decode_json_fields", + configChecked(newDecodeJSONFields, + requireFields("fields"), + allowedFields("fields", "maxDepth", "processArray"))) +} + +func newDecodeJSONFields(c common.Config) (processors.Processor, error) { + config := defaultConfig + + err := c.Unpack(&config) + + if err != nil { + logp.Warn("Error unpacking config for decode_json_fields") + return nil, fmt.Errorf("fail to unpack the decode_json_fields configuration: %s", err) + } + + f := decodeJSONFields{fields: config.Fields, maxDepth: config.MaxDepth, processArray: config.ProcessArray} + return f, nil +} + +func (f decodeJSONFields) Run(event common.MapStr) (common.MapStr, error) { + var errs []string + + for _, field := range f.fields { + data, err := event.GetValue(field) + if err != nil && errors.Cause(err) != common.ErrKeyNotFound { + debug("Error trying to GetValue for field : %s in event : %v", field, event) + errs = append(errs, err.Error()) + continue + } + text, ok := data.(string) + if ok { + var output interface{} + err := unmarshal(f.maxDepth, []byte(text), &output, f.processArray) + if err != nil { + debug("Error trying to unmarshal %s", event[field]) + errs = append(errs, err.Error()) + continue + } + + _, err = event.Put(field, output) + if err != nil { + debug("Error trying to Put value %v for field : %s", output, field) + errs = append(errs, err.Error()) + continue + } + } + } + + return event, fmt.Errorf(strings.Join(errs, ", ")) +} + +func unmarshal(maxDepth int, text []byte, fields *interface{}, processArray bool) error { + if err := DecodeJSON(text, fields); err != nil { + return err + } + + maxDepth-- + if maxDepth == 0 { + return nil + } + + tryUnmarshal := func(v interface{}) (interface{}, bool) { + str, isString := v.(string) + if !isString { + return v, false + } + + var tmp interface{} + err := unmarshal(maxDepth, []byte(str), &tmp, processArray) + if err != nil { + return v, false + } + + return tmp, true + } + + // try to deep unmarshal fields + switch O := interface{}(*fields).(type) { + case map[string]interface{}: + for k, v := range O { + if decoded, ok := tryUnmarshal(v); ok { + O[k] = decoded + } + } + // We want to process arrays here + case []interface{}: + if !processArray { + break + } + + for i, v := range O { + if decoded, ok := tryUnmarshal(v); ok { + O[i] = decoded + } + } + } + return nil +} + +func DecodeJSON(text []byte, to *interface{}) error { + dec := json.NewDecoder(bytes.NewReader(text)) + dec.UseNumber() + err := dec.Decode(to) + + if err != nil { + return err + } + + switch O := interface{}(*to).(type) { + case map[string]interface{}: + jsontransform.TransformNumbers(O) + } + return nil +} + +func (f decodeJSONFields) String() string { + return "decode_json_fields=" + strings.Join(f.fields, ", ") +} diff --git a/libbeat/processors/actions/decode_json_fields_test.go b/libbeat/processors/actions/decode_json_fields_test.go new file mode 100644 index 00000000000..70b0bdd5df6 --- /dev/null +++ b/libbeat/processors/actions/decode_json_fields_test.go @@ -0,0 +1,128 @@ +package actions + +import ( + "testing" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" + "github.com/stretchr/testify/assert" +) + +var fields = [1]string{"msg"} +var testConfig, _ = common.NewConfigFrom(map[string]interface{}{ + "fields": fields, + "processArray": false, +}) + +func TestMissingKey(t *testing.T) { + input := common.MapStr{ + "pipeline": "us1", + } + + actual := getActualValue(t, testConfig, input) + + expected := common.MapStr{ + "pipeline": "us1", + } + + assert.Equal(t, expected.String(), actual.String()) +} + +func TestFieldNotString(t *testing.T) { + input := common.MapStr{ + "msg": 123, + "pipeline": "us1", + } + + actual := getActualValue(t, testConfig, input) + + expected := common.MapStr{ + "msg": 123, + "pipeline": "us1", + } + + assert.Equal(t, expected.String(), actual.String()) + +} + +func TestInvalidJSON(t *testing.T) { + input := common.MapStr{ + "msg": "{\"log\":\"{\\\"level\\\":\\\"info\\\"}\",\"stream\":\"stderr\",\"count\":3", + "pipeline": "us1", + } + + actual := getActualValue(t, testConfig, input) + + expected := common.MapStr{ + "msg": "{\"log\":\"{\\\"level\\\":\\\"info\\\"}\",\"stream\":\"stderr\",\"count\":3", + "pipeline": "us1", + } + assert.Equal(t, expected.String(), actual.String()) + +} + +func TestValidJSONDepthOne(t *testing.T) { + input := common.MapStr{ + "msg": "{\"log\":\"{\\\"level\\\":\\\"info\\\"}\",\"stream\":\"stderr\",\"count\":3}", + "pipeline": "us1", + } + + actual := getActualValue(t, testConfig, input) + + expected := common.MapStr{ + "msg": map[string]interface{}{ + "log": "{\"level\":\"info\"}", + "stream": "stderr", + "count": 3, + }, + "pipeline": "us1", + } + + assert.Equal(t, expected.String(), actual.String()) + +} + +func TestValidJSONDepthTwo(t *testing.T) { + input := common.MapStr{ + "msg": "{\"log\":\"{\\\"level\\\":\\\"info\\\"}\",\"stream\":\"stderr\",\"count\":3}", + "pipeline": "us1", + } + + testConfig, _ = common.NewConfigFrom(map[string]interface{}{ + "fields": fields, + "processArray": false, + "maxDepth": 2, + }) + + actual := getActualValue(t, testConfig, input) + + expected := common.MapStr{ + "msg": map[string]interface{}{ + "log": map[string]interface{}{ + "level": "info", + }, + "stream": "stderr", + "count": 3, + }, + "pipeline": "us1", + } + + assert.Equal(t, expected.String(), actual.String()) + +} + +func getActualValue(t *testing.T, config *common.Config, input common.MapStr) common.MapStr { + if testing.Verbose() { + logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"*"}) + } + + p, err := newDecodeJSONFields(*config) + if err != nil { + logp.Err("Error initializing decode_json_fields") + t.Fatal(err) + } + + actual, err := p.Run(input) + + return actual +} diff --git a/vendor/github.com/elastic/go-ucfg/CHANGELOG.md b/vendor/github.com/elastic/go-ucfg/CHANGELOG.md index 1d5c8942987..4a53aa362b1 100644 --- a/vendor/github.com/elastic/go-ucfg/CHANGELOG.md +++ b/vendor/github.com/elastic/go-ucfg/CHANGELOG.md @@ -120,6 +120,7 @@ This project adheres to [Semantic Versioning](http://semver.org/). [Unreleased]: https://github.com/elastic/go-ucfg/compare/v0.3.7...HEAD [0.3.7]: https://github.com/elastic/go-ucfg/compare/v0.3.6...v0.3.7 +[Unreleased]: https://github.com/elastic/go-ucfg/compare/v0.3.6...HEAD [0.3.6]: https://github.com/elastic/go-ucfg/compare/v0.3.5...v0.3.6 [0.3.5]: https://github.com/elastic/go-ucfg/compare/v0.3.4...v0.3.5 [0.3.4]: https://github.com/elastic/go-ucfg/compare/v0.3.3...v0.3.4