diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 2a25847adb8..d74bf68c2a7 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -170,6 +170,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add `proxy_disable` output flag to explicitly ignore proxy environment variables. {issue}11713[11713] {pull}12243[12243] - Processor `add_cloud_metadata` adds fields `cloud.account.id` and `cloud.image.id` for AWS EC2. {pull}12307[12307] - Add configurable bulk_flush_frequency in kafka output. {pull}12254[12254] +- Add `decode_base64_field` processor for decoding base64 field. {pull}11914[11914] *Auditbeat* diff --git a/libbeat/docs/processors-using.asciidoc b/libbeat/docs/processors-using.asciidoc index 86a790af1f3..c0df787eb38 100644 --- a/libbeat/docs/processors-using.asciidoc +++ b/libbeat/docs/processors-using.asciidoc @@ -108,7 +108,7 @@ For example: ------ + Similarly, for {beatname_uc} modules, you can define processors under the -`input` section of the module definition. +`input` section of the module definition. endif::[] ifeval::["{beatname_lc}"=="metricbeat"] [source,yaml] @@ -119,7 +119,7 @@ ifeval::["{beatname_lc}"=="metricbeat"] - : when: - + ---- endif::[] ifeval::["{beatname_lc}"=="auditbeat"] @@ -133,7 +133,7 @@ auditbeat.modules: - : when: - + ---- endif::[] ifeval::["{beatname_lc}"=="packetbeat"] @@ -142,7 +142,7 @@ For example: [source,yaml] ---- packetbeat.protocols: -- type: +- type: processors: - : when: @@ -214,6 +214,7 @@ ifdef::has_decode_csv_fields_processor[] * <> endif::[] * <> + * <> * <> * <> * <> @@ -651,7 +652,7 @@ scalar values, arrays, dictionaries, or any nested combination of these. By default the fields that you specify will be grouped under the `fields` sub-dictionary in the event. To group the fields under a different sub-dictionary, use the `target` setting. To store the fields as -top-level fields, set `target: ''`. +top-level fields, set `target: ''`. `target`:: (Optional) Sub-dictionary to put all fields into. Defaults to `fields`. `fields`:: Fields to be added. @@ -864,6 +865,40 @@ is treated as if the field was not set at all. exist in the event are overwritten by keys from the decoded JSON object. The default value is false. +[[decode-base64-field]] +=== Decode Base64 fields + +The `decode_base64_field` processor specifies a field to base64 decode. +The `field` key contains a `from: old-key` and a `to: new-key` pair. `from` is +the origin and `to` the target name of the field. + +To overwrite fields either first rename the target field or use the `drop_fields` +processor to drop the field and then rename the field. + +[source,yaml] +------- +processors: +- decode_base64_field: + from: "field1" + to: "field2" + ignore_missing: false + fail_on_error: true +------- + +In the example above: + - field1 is decoded in field2 + +The `decode_base64_field` processor has the following configuration settings: + +`ignore_missing`:: (Optional) If set to true, no error is logged in case a key +which should be base64 decoded is missing. Default is `false`. + +`fail_on_error`:: (Optional) If set to true, in case of an error the base6 4decode +of fields is stopped and the original event is returned. If set to false, decoding +continues also if an error happened during decoding. Default is `true`. + +See <> for a list of supported conditions. + [[community-id]] === Community ID Network Flow Hash @@ -1092,7 +1127,7 @@ construct a lookup key with the value of the field `metricset.host`. Each Beat can define its own default indexers and matchers which are enabled by default. For example, FileBeat enables the `container` indexer, which indexes pod metadata based on all container IDs, and a `logs_path` matcher, which takes -the `log.file.path` field, extracts the container ID, and uses it to retrieve +the `log.file.path` field, extracts the container ID, and uses it to retrieve metadata. The configuration below enables the processor when {beatname_lc} is run as a pod in @@ -1175,7 +1210,7 @@ You can do this by mounting the socket inside the container. For example: To avoid privilege issues, you may also need to add `--user=root` to the `docker run` flags. Because the user must be part of the docker group in order to access `/var/run/docker.sock`, root access is required if {beatname_uc} is -running as non-root inside the container. +running as non-root inside the container. ===== [source,yaml] diff --git a/libbeat/processors/actions/decode_base64_field.go b/libbeat/processors/actions/decode_base64_field.go new file mode 100644 index 00000000000..e56daa4335d --- /dev/null +++ b/libbeat/processors/actions/decode_base64_field.go @@ -0,0 +1,139 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package actions + +import ( + "encoding/base64" + "fmt" + + "github.com/pkg/errors" + + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/processors" + "github.com/elastic/beats/libbeat/processors/checks" +) + +const ( + processorName = "decode_base64_field" +) + +type decodeBase64Field struct { + log *logp.Logger + + config base64Config +} + +type base64Config struct { + fromTo `config:"field"` + IgnoreMissing bool `config:"ignore_missing"` + FailOnError bool `config:"fail_on_error"` +} + +var ( + defaultBase64Config = base64Config{ + IgnoreMissing: false, + FailOnError: true, + } +) + +func init() { + processors.RegisterPlugin(processorName, + checks.ConfigChecked(NewDecodeBase64Field, + checks.RequireFields("field"), + checks.AllowedFields("field", "when"))) +} + +// NewDecodeBase64Field construct a new decode_base64_field processor. +func NewDecodeBase64Field(c *common.Config) (processors.Processor, error) { + config := defaultBase64Config + + log := logp.NewLogger(processorName) + + err := c.Unpack(&config) + if err != nil { + return nil, fmt.Errorf("fail to unpack the %s configuration: %s", processorName, err) + } + + return &decodeBase64Field{ + log: log, + config: config, + }, nil +} + +func (f *decodeBase64Field) Run(event *beat.Event) (*beat.Event, error) { + var backup common.MapStr + // Creates a copy of the event to revert in case of failure + if f.config.FailOnError { + backup = event.Fields.Clone() + } + + err := f.decodeField(f.config.From, f.config.To, event.Fields) + if err != nil && f.config.FailOnError { + errMsg := fmt.Errorf("failed to decode base64 fields in processor: %v", err) + f.log.Debug("decode base64", errMsg.Error()) + event.Fields = backup + _, _ = event.PutValue("error.message", errMsg.Error()) + return event, err + } + + return event, nil +} + +func (f decodeBase64Field) String() string { + return fmt.Sprintf("%s=%+v", processorName, f.config.fromTo) +} + +func (f *decodeBase64Field) decodeField(from string, to string, fields common.MapStr) error { + value, err := fields.GetValue(from) + if err != nil { + // Ignore ErrKeyNotFound errors + if f.config.IgnoreMissing && errors.Cause(err) == common.ErrKeyNotFound { + return nil + } + return fmt.Errorf("could not fetch value for key: %s, Error: %s", from, err) + } + + text, ok := value.(string) + if !ok { + return fmt.Errorf("invalid type for `from`, expecting a string received %T", value) + } + + decodedData, err := base64.StdEncoding.DecodeString(text) + if err != nil { + return fmt.Errorf("error trying to unmarshal %s: %v", text, err) + } + + field := to + // If to is empty + if to == "" || from == to { + // Deletion must happen first to support cases where a becomes a.b + if err = fields.Delete(from); err != nil { + return fmt.Errorf("could not delete key: %s, %+v", from, err) + } + + field = from + } + + if _, err = fields.Put(field, string(decodedData)); err != nil { + return fmt.Errorf("could not put value: %s: %v, %v", decodedData, field, err) + } + + return nil +} diff --git a/libbeat/processors/actions/decode_base64_field_test.go b/libbeat/processors/actions/decode_base64_field_test.go new file mode 100644 index 00000000000..67e0127110a --- /dev/null +++ b/libbeat/processors/actions/decode_base64_field_test.go @@ -0,0 +1,190 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package actions + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" +) + +func TestDecodeBase64Run(t *testing.T) { + var testCases = []struct { + description string + config base64Config + Input common.MapStr + Output common.MapStr + error bool + }{ + { + description: "simple field base64 decode", + config: base64Config{ + fromTo: fromTo{ + From: "field1", To: "field2", + }, + IgnoreMissing: false, + FailOnError: true, + }, + Input: common.MapStr{ + "field1": "Y29ycmVjdCBkYXRh", + }, + Output: common.MapStr{ + "field1": "Y29ycmVjdCBkYXRh", + "field2": "correct data", + }, + error: false, + }, + { + description: "simple field base64 decode To empty", + config: base64Config{ + fromTo: fromTo{ + From: "field1", To: "", + }, + IgnoreMissing: false, + FailOnError: true, + }, + Input: common.MapStr{ + "field1": "Y29ycmVjdCBkYXRh", + }, + Output: common.MapStr{ + "field1": "correct data", + }, + error: false, + }, + { + description: "simple field base64 decode from and to equals", + config: base64Config{ + fromTo: fromTo{ + From: "field1", To: "field1", + }, + IgnoreMissing: false, + FailOnError: true, + }, + Input: common.MapStr{ + "field1": "Y29ycmVjdCBkYXRh", + }, + Output: common.MapStr{ + "field1": "correct data", + }, + error: false, + }, + { + description: "simple field bad data - fail on error", + config: base64Config{ + fromTo: fromTo{ + From: "field1", To: "field1", + }, + IgnoreMissing: false, + FailOnError: true, + }, + Input: common.MapStr{ + "field1": "bad data", + }, + Output: common.MapStr{ + "field1": "bad data", + "error": common.MapStr{ + "message": "failed to decode base64 fields in processor: error trying to unmarshal bad data: illegal base64 data at input byte 3", + }, + }, + error: true, + }, + { + description: "simple field bad data fail on error false", + config: base64Config{ + fromTo: fromTo{ + From: "field1", To: "field2", + }, + IgnoreMissing: false, + FailOnError: false, + }, + Input: common.MapStr{ + "field1": "bad data", + }, + Output: common.MapStr{ + "field1": "bad data", + }, + error: false, + }, + { + description: "missing field", + config: base64Config{ + fromTo: fromTo{ + From: "field2", To: "field3", + }, + IgnoreMissing: false, + FailOnError: true, + }, + Input: common.MapStr{ + "field1": "Y29ycmVjdCBkYXRh", + }, + Output: common.MapStr{ + "field1": "Y29ycmVjdCBkYXRh", + "error": common.MapStr{ + "message": "failed to decode base64 fields in processor: could not fetch value for key: field2, Error: key not found", + }, + }, + error: true, + }, + { + description: "missing field ignore", + config: base64Config{ + fromTo: fromTo{ + From: "field2", To: "field3", + }, + IgnoreMissing: true, + FailOnError: true, + }, + Input: common.MapStr{ + "field1": "Y29ycmVjdCBkYXRh", + }, + Output: common.MapStr{ + "field1": "Y29ycmVjdCBkYXRh", + }, + error: false, + }, + } + + for _, test := range testCases { + test := test + t.Run(test.description, func(t *testing.T) { + t.Parallel() + + f := &decodeBase64Field{ + log: logp.NewLogger(processorName), + config: test.config, + } + + event := &beat.Event{ + Fields: test.Input, + } + + newEvent, err := f.Run(event) + if !test.error { + assert.Nil(t, err) + } else { + assert.NotNil(t, err) + } + + assert.Equal(t, test.Output, newEvent.Fields) + }) + } +} diff --git a/libbeat/processors/script/javascript/module/processor/processor.go b/libbeat/processors/script/javascript/module/processor/processor.go index 096cadadf0f..4322a4cfbd4 100644 --- a/libbeat/processors/script/javascript/module/processor/processor.go +++ b/libbeat/processors/script/javascript/module/processor/processor.go @@ -54,6 +54,7 @@ var constructors = map[string]processors.Constructor{ "CommunityID": communityid.New, "Convert": convert.New, "CopyFields": actions.NewCopyFields, + "DecodeBase64Field": actions.NewDecodeBase64Field, "DecodeCSVField": decode_csv_fields.NewDecodeCSVField, "DecodeJSONFields": actions.NewDecodeJSONFields, "Dissect": dissect.NewProcessor,