Skip to content

Commit

Permalink
Add backwards compatibility for append allow_duplicates (#21159) (#21191
Browse files Browse the repository at this point in the history
)

(cherry picked from commit e51494f)
  • Loading branch information
marc-gr committed Sep 21, 2020
1 parent e2102e3 commit 95e7e11
Show file tree
Hide file tree
Showing 3 changed files with 277 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ field. You can revert this change by configuring tags for the module and omittin
- Fix an error updating file size being logged when EOF is reached. {pull}21048[21048]
- Fix error when processing AWS Cloudtrail Digest logs. {pull}21086[21086] {issue}20943[20943]
- Provide backwards compatibility for the `set` processor when Elasticsearch is less than 7.9.0. {pull}20908[20908]
- Provide backwards compatibility for the `append` processor when Elasticsearch is less than 7.10.0. {pull}21159[21159]

*Heartbeat*

Expand Down
74 changes: 74 additions & 0 deletions filebeat/fileset/pipelines.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,10 @@ func loadPipeline(esClient PipelineLoader, pipelineID string, content map[string
return fmt.Errorf("failed to modify set processor in pipeline: %v", err)
}

if err := modifyAppendProcessor(esClient.GetVersion(), pipelineID, content); err != nil {
return fmt.Errorf("failed to modify append processor in pipeline: %v", err)
}

body, err := esClient.LoadJSON(path, content)
if err != nil {
return interpretError(err, body)
Expand Down Expand Up @@ -291,3 +295,73 @@ func modifySetProcessor(esVersion common.Version, pipelineID string, content map
}
return nil
}

// modifyAppendProcessor replaces allow_duplicates option with an if statement
// so ES less than 7.10 will still work
func modifyAppendProcessor(esVersion common.Version, pipelineID string, content map[string]interface{}) error {
flagVersion := common.MustNewVersion("7.10.0")
if !esVersion.LessThan(flagVersion) {
return nil
}

p, ok := content["processors"]
if !ok {
return nil
}
processors, ok := p.([]interface{})
if !ok {
return fmt.Errorf("'processors' in pipeline '%s' expected to be a list, found %T", pipelineID, p)
}

for _, p := range processors {
processor, ok := p.(map[string]interface{})
if !ok {
continue
}
if options, ok := processor["append"].(map[string]interface{}); ok {
allow, ok := options["allow_duplicates"].(bool)
if !ok {
// don't have allow_duplicates, nothing to do
continue
}

logp.Debug("modules", "In pipeline %q removing unsupported 'allow_duplicates' in append processor", pipelineID)
delete(options, "allow_duplicates")
if allow {
// it was set to true, nothing else to do after removing the option
continue
}

currIf, _ := options["if"].(string)
if strings.Contains(strings.ToLower(currIf), "contains") {
// if it has a contains statement, we assume it is checking for duplicates already
continue
}
field, ok := options["field"].(string)
if !ok {
continue
}
val, ok := options["value"].(string)
if !ok {
continue
}

field = strings.ReplaceAll(field, ".", "?.")

val = strings.TrimLeft(val, "{ ")
val = strings.TrimRight(val, "} ")
val = strings.ReplaceAll(val, ".", "?.")

if currIf == "" {
// if there is not a previous if we add a value sanity check
currIf = fmt.Sprintf("ctx?.%s != null", val)
}

newIf := fmt.Sprintf("%s && ((ctx?.%s instanceof List && !ctx?.%s.contains(ctx?.%s)) || ctx?.%s != ctx?.%s)", currIf, field, field, val, field, val)

logp.Debug("modules", "In pipeline %q adding if %s to replace 'allow_duplicates: false' in append processor", pipelineID, newIf)
options["if"] = newIf
}
}
return nil
}
202 changes: 202 additions & 0 deletions filebeat/fileset/pipelines_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,3 +392,205 @@ func TestModifySetProcessor(t *testing.T) {
})
}
}

func TestModifyAppendProcessor(t *testing.T) {
cases := []struct {
name string
esVersion *common.Version
content map[string]interface{}
expected map[string]interface{}
isErrExpected bool
}{
{
name: "ES < 7.10.0: set to true",
esVersion: common.MustNewVersion("7.9.0"),
content: map[string]interface{}{
"processors": []interface{}{
map[string]interface{}{
"append": map[string]interface{}{
"field": "related.hosts",
"value": "{{host.hostname}}",
"allow_duplicates": true,
},
},
}},
expected: map[string]interface{}{
"processors": []interface{}{
map[string]interface{}{
"append": map[string]interface{}{
"field": "related.hosts",
"value": "{{host.hostname}}",
},
},
},
},
isErrExpected: false,
},
{
name: "ES < 7.10.0: set to false",
esVersion: common.MustNewVersion("7.9.0"),
content: map[string]interface{}{
"processors": []interface{}{
map[string]interface{}{
"append": map[string]interface{}{
"field": "related.hosts",
"value": "{{host.hostname}}",
"allow_duplicates": false,
},
},
}},
expected: map[string]interface{}{
"processors": []interface{}{
map[string]interface{}{
"append": map[string]interface{}{
"field": "related.hosts",
"value": "{{host.hostname}}",
"if": "ctx?.host?.hostname != null && ((ctx?.related?.hosts instanceof List && !ctx?.related?.hosts.contains(ctx?.host?.hostname)) || ctx?.related?.hosts != ctx?.host?.hostname)",
},
},
},
},
isErrExpected: false,
},
{
name: "ES == 7.10.0",
esVersion: common.MustNewVersion("7.10.0"),
content: map[string]interface{}{
"processors": []interface{}{
map[string]interface{}{
"append": map[string]interface{}{
"field": "related.hosts",
"value": "{{host.hostname}}",
"allow_duplicates": false,
},
},
}},
expected: map[string]interface{}{
"processors": []interface{}{
map[string]interface{}{
"append": map[string]interface{}{
"field": "related.hosts",
"value": "{{host.hostname}}",
"allow_duplicates": false,
},
},
},
},
isErrExpected: false,
},
{
name: "ES > 7.10.0",
esVersion: common.MustNewVersion("8.0.0"),
content: map[string]interface{}{
"processors": []interface{}{
map[string]interface{}{
"append": map[string]interface{}{
"field": "related.hosts",
"value": "{{host.hostname}}",
"allow_duplicates": false,
},
},
}},
expected: map[string]interface{}{
"processors": []interface{}{
map[string]interface{}{
"append": map[string]interface{}{
"field": "related.hosts",
"value": "{{host.hostname}}",
"allow_duplicates": false,
},
},
},
},
isErrExpected: false,
},
{
name: "ES < 7.10.0: existing if",
esVersion: common.MustNewVersion("7.7.7"),
content: map[string]interface{}{
"processors": []interface{}{
map[string]interface{}{
"append": map[string]interface{}{
"field": "related.hosts",
"value": "{{host.hostname}}",
"allow_duplicates": false,
"if": "ctx?.host?.hostname != null",
},
},
}},
expected: map[string]interface{}{
"processors": []interface{}{
map[string]interface{}{
"append": map[string]interface{}{
"field": "related.hosts",
"value": "{{host.hostname}}",
"if": "ctx?.host?.hostname != null && ((ctx?.related?.hosts instanceof List && !ctx?.related?.hosts.contains(ctx?.host?.hostname)) || ctx?.related?.hosts != ctx?.host?.hostname)",
},
},
}},
isErrExpected: false,
},
{
name: "ES < 7.10.0: existing if with contains",
esVersion: common.MustNewVersion("7.7.7"),
content: map[string]interface{}{
"processors": []interface{}{
map[string]interface{}{
"append": map[string]interface{}{
"field": "related.hosts",
"value": "{{host.hostname}}",
"allow_duplicates": false,
"if": "!ctx?.related?.hosts.contains(ctx?.host?.hostname)",
},
},
}},
expected: map[string]interface{}{
"processors": []interface{}{
map[string]interface{}{
"append": map[string]interface{}{
"field": "related.hosts",
"value": "{{host.hostname}}",
"if": "!ctx?.related?.hosts.contains(ctx?.host?.hostname)",
},
},
}},
isErrExpected: false,
},
{
name: "ES < 7.10.0: no value",
esVersion: common.MustNewVersion("7.7.7"),
content: map[string]interface{}{
"processors": []interface{}{
map[string]interface{}{
"append": map[string]interface{}{
"field": "related.hosts",
"allow_duplicates": false,
},
},
}},
expected: map[string]interface{}{
"processors": []interface{}{
map[string]interface{}{
"append": map[string]interface{}{
"field": "related.hosts",
},
},
}},
isErrExpected: false,
},
}

for _, test := range cases {
test := test
t.Run(test.name, func(t *testing.T) {
t.Parallel()
err := modifyAppendProcessor(*test.esVersion, "foo-pipeline", test.content)
if test.isErrExpected {
assert.Error(t, err)
} else {
assert.NoError(t, err)
assert.Equal(t, test.expected, test.content, test.name)
}
})
}
}

0 comments on commit 95e7e11

Please sign in to comment.