Skip to content

Commit

Permalink
Merge branch 'main' into draft/sql/fetch_from_all_dbs
Browse files Browse the repository at this point in the history
  • Loading branch information
shmsr committed Jul 12, 2023
2 parents acebe9c + 2dc3f67 commit d18efee
Show file tree
Hide file tree
Showing 6 changed files with 246 additions and 28 deletions.
15 changes: 12 additions & 3 deletions libbeat/beat/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,9 +228,18 @@ func metadataKey(key string) (string, bool) {
return "", false
}

// SetErrorWithOption sets jsonErr value in the event fields according to addErrKey value.
func (e *Event) SetErrorWithOption(jsonErr mapstr.M, addErrKey bool) {
// SetErrorWithOption sets the event error field with the message when the addErrKey is set to true.
// If you want to include the data and field you can pass them as parameters and will be appended into the
// error as fields with the corresponding name.
func (e *Event) SetErrorWithOption(message string, addErrKey bool, data string, field string) {
if addErrKey {
e.Fields["error"] = jsonErr
errorField := mapstr.M{"message": message, "type": "json"}
if data != "" {
errorField["data"] = data
}
if field != "" {
errorField["field"] = field
}
e.Fields["error"] = errorField
}
}
2 changes: 1 addition & 1 deletion libbeat/common/jsontransform/expand.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
func ExpandFields(logger *logp.Logger, event *beat.Event, m mapstr.M, addErrorKey bool) {
if err := expandFields(m); err != nil {
logger.Errorf("JSON: failed to expand fields: %s", err)
event.SetErrorWithOption(createJSONError(err.Error()), addErrorKey)
event.SetErrorWithOption(err.Error(), addErrorKey, "", "")
}
}

Expand Down
23 changes: 6 additions & 17 deletions libbeat/common/jsontransform/jsonhelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"time"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
)

Expand All @@ -39,11 +38,9 @@ var (

// WriteJSONKeys writes the json keys to the given event based on the overwriteKeys option and the addErrKey
func WriteJSONKeys(event *beat.Event, keys map[string]interface{}, expandKeys, overwriteKeys, addErrKey bool) {
logger := logp.NewLogger("jsonhelper")
if expandKeys {
if err := expandFields(keys); err != nil {
logger.Errorf("JSON: failed to expand fields: %s", err)
event.SetErrorWithOption(createJSONError(err.Error()), addErrKey)
event.SetErrorWithOption(err.Error(), addErrKey, "", "")
return
}
}
Expand All @@ -62,16 +59,14 @@ func WriteJSONKeys(event *beat.Event, keys map[string]interface{}, expandKeys, o
case "@timestamp":
vstr, ok := v.(string)
if !ok {
logger.Error("JSON: Won't overwrite @timestamp because value is not string")
event.SetErrorWithOption(createJSONError("@timestamp not overwritten (not string)"), addErrKey)
event.SetErrorWithOption("@timestamp not overwritten (not string)", addErrKey, "", "")
continue
}

// @timestamp must be of format RFC3339 or ISO8601
ts, err := parseTimestamp(vstr)
if err != nil {
logger.Errorf("JSON: Won't overwrite @timestamp because of parsing error: %v", err)
event.SetErrorWithOption(createJSONError(fmt.Sprintf("@timestamp not overwritten (parse error on %s)", vstr)), addErrKey)
event.SetErrorWithOption(fmt.Sprintf("@timestamp not overwritten (parse error on %s)", vstr), addErrKey, "", "")
continue
}
event.Timestamp = ts
Expand All @@ -93,19 +88,17 @@ func WriteJSONKeys(event *beat.Event, keys map[string]interface{}, expandKeys, o
event.Meta.DeepUpdate(mapstr.M(m))

default:
event.SetErrorWithOption(createJSONError("failed to update @metadata"), addErrKey)
event.SetErrorWithOption("failed to update @metadata", addErrKey, "", "")
}

case "type":
vstr, ok := v.(string)
if !ok {
logger.Error("JSON: Won't overwrite type because value is not string")
event.SetErrorWithOption(createJSONError("type not overwritten (not string)"), addErrKey)
event.SetErrorWithOption("type not overwritten (not string)", addErrKey, "", "")
continue
}
if len(vstr) == 0 || vstr[0] == '_' {
logger.Error("JSON: Won't overwrite type because value is empty or starts with an underscore")
event.SetErrorWithOption(createJSONError(fmt.Sprintf("type not overwritten (invalid value [%s])", vstr)), addErrKey)
event.SetErrorWithOption(fmt.Sprintf("type not overwritten (invalid value [%s])", vstr), addErrKey, "", "")
continue
}
event.Fields[k] = vstr
Expand All @@ -118,10 +111,6 @@ func WriteJSONKeys(event *beat.Event, keys map[string]interface{}, expandKeys, o
event.Fields.DeepUpdate(keys)
}

func createJSONError(message string) mapstr.M {
return mapstr.M{"message": message, "type": "json"}
}

func removeKeys(keys map[string]interface{}, names ...string) {
for _, name := range names {
delete(keys, name)
Expand Down
226 changes: 225 additions & 1 deletion libbeat/common/jsontransform/jsonhelper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func TestWriteJSONKeys(t *testing.T) {
expectedMetadata mapstr.M
expectedTimestamp time.Time
expectedFields mapstr.M
addErrorKeys bool
}{
"overwrite_true": {
overwriteKeys: true,
Expand Down Expand Up @@ -192,6 +193,32 @@ func TestWriteJSONKeys(t *testing.T) {
},
},
},
// This benchmark makes sure that when an error is found in the event, the proper fields are defined and measured
"error_case": {
expandKeys: false,
overwriteKeys: true,
keys: map[string]interface{}{
"top_b": map[string]interface{}{
"inner_d.inner_e": "COMPLETELY_NEW_e",
},
"@timestamp": map[string]interface{}{"when": "now", "another": "yesterday"},
},
expectedMetadata: eventMetadata.Clone(),
expectedTimestamp: eventTimestamp,
expectedFields: mapstr.M{
"error": mapstr.M{
"message": "@timestamp not overwritten (not string)",
"type": "json",
},
"top_a": 23,
"top_b": mapstr.M{
"inner_c": "see",
"inner_d": "dee",
"inner_d.inner_e": "COMPLETELY_NEW_e",
},
},
addErrorKeys: true,
},
}

for name, test := range tests {
Expand All @@ -202,10 +229,207 @@ func TestWriteJSONKeys(t *testing.T) {
Fields: eventFields.Clone(),
}

WriteJSONKeys(event, test.keys, test.expandKeys, test.overwriteKeys, false)
WriteJSONKeys(event, test.keys, test.expandKeys, test.overwriteKeys, test.addErrorKeys)
require.Equal(t, test.expectedMetadata, event.Meta)
require.Equal(t, test.expectedTimestamp.UnixNano(), event.Timestamp.UnixNano())
require.Equal(t, test.expectedFields, event.Fields)
})
}
}

func BenchmarkWriteJSONKeys(b *testing.B) {
now := time.Now()
now = now.Round(time.Second)

eventTimestamp := time.Date(2020, 01, 01, 01, 01, 00, 0, time.UTC)
eventMetadata := mapstr.M{
"foo": "bar",
"baz": mapstr.M{
"qux": 17,
},
}
eventFields := mapstr.M{
"top_a": 23,
"top_b": mapstr.M{
"inner_c": "see",
"inner_d": "dee",
},
}

tests := map[string]struct {
keys map[string]interface{}
expandKeys bool
overwriteKeys bool
expectedFields mapstr.M
addErrorKeys bool
}{
"overwrite_true": {
overwriteKeys: true,
keys: map[string]interface{}{
"@metadata": map[string]interface{}{
"foo": "NEW_bar",
"baz": map[string]interface{}{
"qux": "NEW_qux",
"durrr": "COMPLETELY_NEW",
},
},
"@timestamp": now.Format(time.RFC3339),
"top_b": map[string]interface{}{
"inner_d": "NEW_dee",
"inner_e": "COMPLETELY_NEW_e",
},
"top_c": "COMPLETELY_NEW_c",
},
expectedFields: mapstr.M{
"top_a": 23,
"top_b": mapstr.M{
"inner_c": "see",
"inner_d": "NEW_dee",
"inner_e": "COMPLETELY_NEW_e",
},
"top_c": "COMPLETELY_NEW_c",
},
},
"overwrite_true_ISO8601": {
overwriteKeys: true,
keys: map[string]interface{}{
"@metadata": map[string]interface{}{
"foo": "NEW_bar",
"baz": map[string]interface{}{
"qux": "NEW_qux",
"durrr": "COMPLETELY_NEW",
},
},
"@timestamp": now.Format(iso8601),
"top_b": map[string]interface{}{
"inner_d": "NEW_dee",
"inner_e": "COMPLETELY_NEW_e",
},
"top_c": "COMPLETELY_NEW_c",
},
expectedFields: mapstr.M{
"top_a": 23,
"top_b": mapstr.M{
"inner_c": "see",
"inner_d": "NEW_dee",
"inner_e": "COMPLETELY_NEW_e",
},
"top_c": "COMPLETELY_NEW_c",
},
},
"overwrite_false": {
overwriteKeys: false,
keys: map[string]interface{}{
"@metadata": map[string]interface{}{
"foo": "NEW_bar",
"baz": map[string]interface{}{
"qux": "NEW_qux",
"durrr": "COMPLETELY_NEW",
},
},
"@timestamp": now.Format(time.RFC3339),
"top_b": map[string]interface{}{
"inner_d": "NEW_dee",
"inner_e": "COMPLETELY_NEW_e",
},
"top_c": "COMPLETELY_NEW_c",
},
expectedFields: mapstr.M{
"top_a": 23,
"top_b": mapstr.M{
"inner_c": "see",
"inner_d": "dee",
"inner_e": "COMPLETELY_NEW_e",
},
"top_c": "COMPLETELY_NEW_c",
},
},
"expand_true": {
expandKeys: true,
overwriteKeys: true,
keys: map[string]interface{}{
"top_b": map[string]interface{}{
"inner_d.inner_e": "COMPLETELY_NEW_e",
},
},
expectedFields: mapstr.M{
"top_a": 23,
"top_b": mapstr.M{
"inner_c": "see",
"inner_d": mapstr.M{
"inner_e": "COMPLETELY_NEW_e",
},
},
},
},
"expand_false": {
expandKeys: false,
overwriteKeys: true,
keys: map[string]interface{}{
"top_b": map[string]interface{}{
"inner_d.inner_e": "COMPLETELY_NEW_e",
},
},
expectedFields: mapstr.M{
"top_a": 23,
"top_b": mapstr.M{
"inner_c": "see",
"inner_d": "dee",
"inner_d.inner_e": "COMPLETELY_NEW_e",
},
},
},
// This benchmark makes sure that when an error is found in the event, the proper fields are defined and measured
"error_case": {
expandKeys: false,
overwriteKeys: true,
keys: map[string]interface{}{
"top_b": map[string]interface{}{
"inner_d.inner_e": "COMPLETELY_NEW_e",
},
"@timestamp": "invalid string",
},
expectedFields: mapstr.M{
"error": mapstr.M{
"message": "@timestamp not overwritten (parse error on invalid string)",
"type": "json",
},
"top_a": 23,
"top_b": mapstr.M{
"inner_c": "see",
"inner_d": "dee",
"inner_d.inner_e": "COMPLETELY_NEW_e",
},
},
addErrorKeys: true,
},
}

for name, test := range tests {
b.Run(name, func(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
event := &beat.Event{
Timestamp: eventTimestamp,
Meta: eventMetadata.Clone(),
Fields: eventFields.Clone(),
}
// The WriteJSONKeys will override the keys, so we need to clone it.
keys := clone(test.keys)
b.StartTimer()
WriteJSONKeys(event, keys, test.expandKeys, test.overwriteKeys, test.addErrorKeys)
require.Equal(b, test.expectedFields, event.Fields)
}
})
}
}

func clone(a map[string]interface{}) map[string]interface{} {
newMap := make(map[string]interface{})
for k, v := range a {
newMap[k] = v
}
return newMap
}
6 changes: 1 addition & 5 deletions libbeat/processors/actions/decode_json_fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,7 @@ func (f *decodeJSONFields) Run(event *beat.Event) (*beat.Event, error) {
if err != nil {
f.logger.Debugf("Error trying to unmarshal %s", text)
errs = append(errs, err.Error())
event.SetErrorWithOption(mapstr.M{
"message": "parsing input as JSON: " + err.Error(),
"data": text,
"field": field,
}, f.addErrorKey)
event.SetErrorWithOption(fmt.Sprintf("parsing input as JSON: %s", err.Error()), f.addErrorKey, text, field)
continue
}

Expand Down
2 changes: 1 addition & 1 deletion metricbeat/module/system/filesystem/_meta/docs.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ not be collected from filesystems matching these types. This setting also
affects the `fsstats` metricset. If this option is not set, metricbeat ignores
all types for virtual devices in systems where this information is available (e.g.
all types marked as `nodev` in `/proc/filesystems` in Linux systems). This can be set to an empty list (`[]`)
to make filebeat report all filesystems, regardless of type.
to make metricbeat report all filesystems, regardless of type.

[float]
=== Filtering
Expand Down

0 comments on commit d18efee

Please sign in to comment.