diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc
index 4f10509a123..a0e555166cc 100644
--- a/CHANGELOG.next.asciidoc
+++ b/CHANGELOG.next.asciidoc
@@ -200,6 +200,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Update RPM packages contained in Beat Docker images. {issue}17035[17035]
- Update supported versions of `redis` output. {pull}17198[17198]
- Update documentation for system.process.memory fields to include clarification on Windows os's. {pull}17268[17268]
+- Add `replace` processor for replacing string values of fields. {pull}17342[17342]
- Add optional regex based cid extractor to `add_kubernetes_metadata` processor. {pull}17360[17360]
- Add `urldecode` processor to for decoding URL-encoded fields. {pull}17505[17505]
- Add support for AWS IAM `role_arn` in credentials config. {pull}17658[17658] {issue}12464[12464]
diff --git a/libbeat/processors/actions/docs/replace.asciidoc b/libbeat/processors/actions/docs/replace.asciidoc
new file mode 100644
index 00000000000..3faf3e0bcce
--- /dev/null
+++ b/libbeat/processors/actions/docs/replace.asciidoc
@@ -0,0 +1,49 @@
+[[replace-fields]]
+=== Replace fields from events
+
+++++
+replace
+++++
+
+The `replace` processor takes a list of fields to replace the field value
+matching a pattern with replacement string. Under the `fields` key, each entry
+contains a `field: field-name`, `pattern: regex-pattern` and
+`replacement: replacement-string`, where:
+
+* `field` is the original field name
+* `pattern` is regex pattern to match field's value
+* `replacement` is the replacement string to use for updating the field's value
+
+The `replace` processor cannot be used to replace value with a completely new value.
+
+TIP: You can replace field value to truncate part of field value or replace
+it with a new string. It can also be used for masking PII information.
+
+Following example will change path from /usr/bin to /usr/local/bin
+
+[source,yaml]
+-------
+processors:
+- replace:
+ fields:
+ - field: "file.path"
+ pattern: "/usr/"
+ replacement: "/usr/local/"
+ ignore_missing: false
+ fail_on_error: true
+-------
+
+The `replace` processor has following configuration settings:
+
+`ignore_missing`:: (Optional) If set to true, no error is logged in case a specifiedfield
+is missing. Default is `false`.
+
+`fail_on_error`:: (Optional) If set to true, in case of an error the replacement of
+field values is stopped and the original event is returned. If set to false, replacement
+continues even if an error occurs during replacement. Default is `true`.
+
+See <> for a list of supported conditions.
+
+You can specify multiple `ignore_missing` processors under the `processors`
+section.
+
diff --git a/libbeat/processors/actions/replace.go b/libbeat/processors/actions/replace.go
new file mode 100644
index 00000000000..37245817050
--- /dev/null
+++ b/libbeat/processors/actions/replace.go
@@ -0,0 +1,118 @@
+// Licensed to Elasticsearch B.V. under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Elasticsearch B.V. licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package actions
+
+import (
+ "fmt"
+ "regexp"
+
+ "github.com/pkg/errors"
+
+ "github.com/elastic/beats/v7/libbeat/beat"
+ "github.com/elastic/beats/v7/libbeat/common"
+ "github.com/elastic/beats/v7/libbeat/logp"
+ "github.com/elastic/beats/v7/libbeat/processors"
+ "github.com/elastic/beats/v7/libbeat/processors/checks"
+ jsprocessor "github.com/elastic/beats/v7/libbeat/processors/script/javascript/module/processor"
+)
+
+type replaceString struct {
+ config replaceStringConfig
+}
+
+type replaceStringConfig struct {
+ Fields []replaceConfig `config:"fields"`
+ IgnoreMissing bool `config:"ignore_missing"`
+ FailOnError bool `config:"fail_on_error"`
+}
+
+type replaceConfig struct {
+ Field string `config:"field"`
+ Pattern *regexp.Regexp `config:"pattern"`
+ Replacement string `config:"replacement"`
+}
+
+func init() {
+ processors.RegisterPlugin("replace",
+ checks.ConfigChecked(NewReplaceString,
+ checks.RequireFields("fields")))
+
+ jsprocessor.RegisterPlugin("Replace", NewReplaceString)
+}
+
+// NewReplaceString returns a new replace processor.
+func NewReplaceString(c *common.Config) (processors.Processor, error) {
+ config := replaceStringConfig{
+ IgnoreMissing: false,
+ FailOnError: true,
+ }
+ err := c.Unpack(&config)
+ if err != nil {
+ return nil, fmt.Errorf("failed to unpack the replace configuration: %s", err)
+ }
+
+ f := &replaceString{
+ config: config,
+ }
+ return f, nil
+}
+
+func (f *replaceString) Run(event *beat.Event) (*beat.Event, error) {
+ var backup common.MapStr
+ // Creates a copy of the event to revert in case of failure
+ if f.config.FailOnError {
+ backup = event.Fields.Clone()
+ }
+
+ for _, field := range f.config.Fields {
+ err := f.replaceField(field.Field, field.Pattern, field.Replacement, event.Fields)
+ if err != nil {
+ errMsg := fmt.Errorf("Failed to replace fields in processor: %s", err)
+ logp.Debug("replace", errMsg.Error())
+ if f.config.FailOnError {
+ event.Fields = backup
+ event.PutValue("error.message", errMsg.Error())
+ return event, err
+ }
+ }
+ }
+
+ return event, nil
+}
+
+func (f *replaceString) replaceField(field string, pattern *regexp.Regexp, replacement string, fields common.MapStr) error {
+ currentValue, err := fields.GetValue(field)
+ if err != nil {
+ // Ignore ErrKeyNotFound errors
+ if f.config.IgnoreMissing && errors.Cause(err) == common.ErrKeyNotFound {
+ return nil
+ }
+ return fmt.Errorf("could not fetch value for key: %s, Error: %s", field, err)
+ }
+
+ updatedString := pattern.ReplaceAllString(currentValue.(string), replacement)
+ _, err = fields.Put(field, updatedString)
+ if err != nil {
+ return fmt.Errorf("could not put value: %s: %v, %v", replacement, currentValue, err)
+ }
+ return nil
+}
+
+func (f *replaceString) String() string {
+ return "replace=" + fmt.Sprintf("%+v", f.config.Fields)
+}
diff --git a/libbeat/processors/actions/replace_test.go b/libbeat/processors/actions/replace_test.go
new file mode 100644
index 00000000000..e54d16c5012
--- /dev/null
+++ b/libbeat/processors/actions/replace_test.go
@@ -0,0 +1,248 @@
+// Licensed to Elasticsearch B.V. under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Elasticsearch B.V. licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package actions
+
+import (
+ "reflect"
+ "regexp"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+
+ "github.com/elastic/beats/v7/libbeat/beat"
+ "github.com/elastic/beats/v7/libbeat/common"
+)
+
+func TestReplaceRun(t *testing.T) {
+ var tests = []struct {
+ description string
+ Fields []replaceConfig
+ IgnoreMissing bool
+ FailOnError bool
+ Input common.MapStr
+ Output common.MapStr
+ error bool
+ }{
+ {
+ description: "simple field replacing",
+ Fields: []replaceConfig{
+ {
+ Field: "f",
+ Pattern: regexp.MustCompile(`a`),
+ Replacement: "b",
+ },
+ },
+ Input: common.MapStr{
+ "f": "abc",
+ },
+ Output: common.MapStr{
+ "f": "bbc",
+ },
+ error: false,
+ IgnoreMissing: false,
+ FailOnError: true,
+ },
+ {
+ description: "Add one more hierarchy to event",
+ Fields: []replaceConfig{
+ {
+ Field: "f.b",
+ Pattern: regexp.MustCompile(`a`),
+ Replacement: "b",
+ },
+ },
+ Input: common.MapStr{
+ "f": common.MapStr{
+ "b": "abc",
+ },
+ },
+ Output: common.MapStr{
+ "f": common.MapStr{
+ "b": "bbc",
+ },
+ },
+ error: false,
+ IgnoreMissing: false,
+ FailOnError: true,
+ },
+ {
+ description: "replace two fields at the same time.",
+ Fields: []replaceConfig{
+ {
+ Field: "f",
+ Pattern: regexp.MustCompile(`a.*c`),
+ Replacement: "cab",
+ },
+ {
+ Field: "g",
+ Pattern: regexp.MustCompile(`ef`),
+ Replacement: "oor",
+ },
+ },
+ Input: common.MapStr{
+ "f": "abbbc",
+ "g": "def",
+ },
+ Output: common.MapStr{
+ "f": "cab",
+ "g": "door",
+ },
+ error: false,
+ IgnoreMissing: false,
+ FailOnError: true,
+ },
+ {
+ description: "test missing fields",
+ Fields: []replaceConfig{
+ {
+ Field: "f",
+ Pattern: regexp.MustCompile(`abc`),
+ Replacement: "xyz",
+ },
+ {
+ Field: "g",
+ Pattern: regexp.MustCompile(`def`),
+ Replacement: "",
+ },
+ },
+ Input: common.MapStr{
+ "m": "abc",
+ "n": "def",
+ },
+ Output: common.MapStr{
+ "m": "abc",
+ "n": "def",
+ "error": common.MapStr{
+ "message": "Failed to replace fields in processor: could not fetch value for key: f, Error: key not found",
+ },
+ },
+ error: true,
+ IgnoreMissing: false,
+ FailOnError: true,
+ },
+ }
+
+ for _, test := range tests {
+ t.Run(test.description, func(t *testing.T) {
+ f := &replaceString{
+ config: replaceStringConfig{
+ Fields: test.Fields,
+ IgnoreMissing: test.IgnoreMissing,
+ FailOnError: test.FailOnError,
+ },
+ }
+ event := &beat.Event{
+ Fields: test.Input,
+ }
+
+ newEvent, err := f.Run(event)
+ if !test.error {
+ assert.Nil(t, err)
+ } else {
+ assert.NotNil(t, err)
+ }
+
+ assert.True(t, reflect.DeepEqual(newEvent.Fields, test.Output))
+ })
+ }
+}
+
+func TestReplaceField(t *testing.T) {
+ var tests = []struct {
+ Field string
+ Pattern *regexp.Regexp
+ Replacement string
+ ignoreMissing bool
+ failOnError bool
+ Input common.MapStr
+ Output common.MapStr
+ error bool
+ description string
+ }{
+ {
+ description: "replace part of field value with another string",
+ Field: "f",
+ Pattern: regexp.MustCompile(`a`),
+ Replacement: "b",
+ Input: common.MapStr{
+ "f": "abc",
+ },
+ Output: common.MapStr{
+ "f": "bbc",
+ },
+ error: false,
+ failOnError: true,
+ ignoreMissing: false,
+ },
+ {
+ description: "Add hierarchy to event and replace",
+ Field: "f.b",
+ Pattern: regexp.MustCompile(`a`),
+ Replacement: "b",
+ Input: common.MapStr{
+ "f": common.MapStr{
+ "b": "abc",
+ },
+ },
+ Output: common.MapStr{
+ "f": common.MapStr{
+ "b": "bbc",
+ },
+ },
+ error: false,
+ ignoreMissing: false,
+ failOnError: true,
+ },
+ {
+ description: "try replacing value of missing fields in event",
+ Field: "f",
+ Pattern: regexp.MustCompile(`abc`),
+ Replacement: "xyz",
+ Input: common.MapStr{
+ "m": "abc",
+ "n": "def",
+ },
+ Output: common.MapStr{
+ "m": "abc",
+ "n": "def",
+ },
+ error: true,
+ ignoreMissing: false,
+ failOnError: true,
+ },
+ }
+
+ for _, test := range tests {
+ t.Run(test.description, func(t *testing.T) {
+
+ f := &replaceString{
+ config: replaceStringConfig{
+ IgnoreMissing: test.ignoreMissing,
+ FailOnError: test.failOnError,
+ },
+ }
+
+ err := f.replaceField(test.Field, test.Pattern, test.Replacement, test.Input)
+ if err != nil {
+ assert.Equal(t, test.error, true)
+ }
+
+ assert.True(t, reflect.DeepEqual(test.Input, test.Output))
+ })
+ }
+}