Skip to content

Commit

Permalink
Cherry-pick #18854 to 7.7: Make selector string casing configurable (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
Steffen Siering committed Jun 15, 2020
1 parent 3c5b8cb commit aa0d320
Show file tree
Hide file tree
Showing 16 changed files with 526 additions and 137 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix regression in `add_kubernetes_metadata`, so configured `indexers` and `matchers` are used if defaults are not disabled. {issue}18481[18481] {pull}18818[18818]
- Fix the `translate_sid` processor's handling of unconfigured target fields. {issue}18990[18990] {pull}18991[18991]
- Fixed a service restart failure under Windows. {issue}18914[18914] {pull}18916[18916]
- Fix kafka topic setting not allowing upper case characters. {pull}18854[18854] {issue}18640[18640]
- Fix redis key setting not allowing upper case characters. {pull}18854[18854] {issue}18640[18640]

*Auditbeat*

Expand Down
8 changes: 5 additions & 3 deletions libbeat/idxmgmt/std.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package idxmgmt
import (
"errors"
"fmt"
"strings"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
Expand Down Expand Up @@ -197,6 +198,7 @@ func (s *indexSupport) BuildSelector(cfg *common.Config) (outputs.IndexSelector,
MultiKey: "indices",
EnableSingleOnly: true,
FailEmpty: mode != ilm.ModeEnabled,
Case: outil.SelectorLowerCase,
}

indexSel, err := outil.BuildSelectorFromConfig(selCfg, buildSettings)
Expand Down Expand Up @@ -354,15 +356,15 @@ func getEventCustomIndex(evt *beat.Event, beatInfo beat.Info) string {

if tmp := evt.Meta["alias"]; tmp != nil {
if alias, ok := tmp.(string); ok {
return alias
return strings.ToLower(alias)
}
}

if tmp := evt.Meta["index"]; tmp != nil {
if idx, ok := tmp.(string); ok {
ts := evt.Timestamp.UTC()
return fmt.Sprintf("%s-%d.%02d.%02d",
idx, ts.Year(), ts.Month(), ts.Day())
strings.ToLower(idx), ts.Year(), ts.Month(), ts.Day())
}
}

Expand All @@ -372,7 +374,7 @@ func getEventCustomIndex(evt *beat.Event, beatInfo beat.Info) string {
// which are then expanded by a processor to the "raw_index" field.
if tmp := evt.Meta["raw_index"]; tmp != nil {
if idx, ok := tmp.(string); ok {
return idx
return strings.ToLower(idx)
}
}

Expand Down
44 changes: 44 additions & 0 deletions libbeat/idxmgmt/std_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,11 @@ func TestDefaultSupport_BuildSelector(t *testing.T) {
cfg: map[string]interface{}{"index": "test-%{[agent.version]}"},
want: stable("test-9.9.9"),
},
"without ilm must be lowercase": {
ilmCalls: noILM,
cfg: map[string]interface{}{"index": "TeSt-%{[agent.version]}"},
want: stable("test-9.9.9"),
},
"event alias without ilm": {
ilmCalls: noILM,
cfg: map[string]interface{}{"index": "test-%{[agent.version]}"},
Expand All @@ -147,6 +152,14 @@ func TestDefaultSupport_BuildSelector(t *testing.T) {
"alias": "test",
},
},
"event alias without ilm must be lowercae": {
ilmCalls: noILM,
cfg: map[string]interface{}{"index": "test-%{[agent.version]}"},
want: stable("test"),
meta: common.MapStr{
"alias": "Test",
},
},
"event index without ilm": {
ilmCalls: noILM,
cfg: map[string]interface{}{"index": "test-%{[agent.version]}"},
Expand All @@ -155,11 +168,24 @@ func TestDefaultSupport_BuildSelector(t *testing.T) {
"index": "test",
},
},
"event index without ilm must be lowercase": {
ilmCalls: noILM,
cfg: map[string]interface{}{"index": "test-%{[agent.version]}"},
want: dateIdx("test"),
meta: common.MapStr{
"index": "Test",
},
},
"with ilm": {
ilmCalls: ilmTemplateSettings("test-9.9.9", "test-9.9.9"),
cfg: map[string]interface{}{"index": "wrong-%{[agent.version]}"},
want: stable("test-9.9.9"),
},
"with ilm must be lowercase": {
ilmCalls: ilmTemplateSettings("Test-9.9.9", "Test-9.9.9"),
cfg: map[string]interface{}{"index": "wrong-%{[agent.version]}"},
want: stable("test-9.9.9"),
},
"event alias wit ilm": {
ilmCalls: ilmTemplateSettings("test-9.9.9", "test-9.9.9"),
cfg: map[string]interface{}{"index": "test-%{[agent.version]}"},
Expand All @@ -168,6 +194,14 @@ func TestDefaultSupport_BuildSelector(t *testing.T) {
"alias": "event-alias",
},
},
"event alias wit ilm must be lowercase": {
ilmCalls: ilmTemplateSettings("test-9.9.9", "test-9.9.9"),
cfg: map[string]interface{}{"index": "test-%{[agent.version]}"},
want: stable("event-alias"),
meta: common.MapStr{
"alias": "Event-alias",
},
},
"event index with ilm": {
ilmCalls: ilmTemplateSettings("test-9.9.9", "test-9.9.9"),
cfg: map[string]interface{}{"index": "test-%{[agent.version]}"},
Expand All @@ -186,6 +220,16 @@ func TestDefaultSupport_BuildSelector(t *testing.T) {
},
want: stable("myindex"),
},
"use indices settings must be lowercase": {
ilmCalls: ilmTemplateSettings("test-9.9.9", "test-9.9.9"),
cfg: map[string]interface{}{
"index": "test-%{[agent.version]}",
"indices": []map[string]interface{}{
{"index": "MyIndex"},
},
},
want: stable("myindex"),
},
}
for name, test := range cases {
t.Run(name, func(t *testing.T) {
Expand Down
3 changes: 2 additions & 1 deletion libbeat/outputs/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"errors"
"fmt"
"net/http"
"strings"
"time"

"github.com/elastic/beats/v7/libbeat/beat"
Expand Down Expand Up @@ -327,7 +328,7 @@ func getPipeline(event *beat.Event, pipelineSel *outil.Selector) (string, error)
if event.Meta != nil {
if pipeline, exists := event.Meta["pipeline"]; exists {
if p, ok := pipeline.(string); ok {
return p, nil
return strings.ToLower(p), nil
}
return "", errors.New("pipeline metadata is no string")
}
Expand Down
2 changes: 1 addition & 1 deletion libbeat/outputs/elasticsearch/client_proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func doClientPing(t *testing.T) {
Headers: map[string]string{headerTestField: headerTestValue},
ProxyDisable: proxyDisable != "",
},
Index: outil.MakeSelector(outil.ConstSelectorExpr("test")),
Index: outil.MakeSelector(outil.ConstSelectorExpr("test", outil.SelectorLowerCase)),
}
if proxy != "" {
proxyURL, err := url.Parse(proxy)
Expand Down
2 changes: 1 addition & 1 deletion libbeat/outputs/elasticsearch/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func TestClientWithHeaders(t *testing.T) {
"X-Test": "testing value",
},
},
Index: outil.MakeSelector(outil.ConstSelectorExpr("test")),
Index: outil.MakeSelector(outil.ConstSelectorExpr("test", outil.SelectorLowerCase)),
}, nil)
assert.NoError(t, err)

Expand Down
17 changes: 11 additions & 6 deletions libbeat/outputs/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,12 +132,7 @@ func buildSelectors(
return index, pipeline, err
}

pipelineSel, err := outil.BuildSelectorFromConfig(cfg, outil.Settings{
Key: "pipeline",
MultiKey: "pipelines",
EnableSingleOnly: true,
FailEmpty: false,
})
pipelineSel, err := buildPipelineSelector(cfg)
if err != nil {
return index, pipeline, err
}
Expand All @@ -148,3 +143,13 @@ func buildSelectors(

return index, pipeline, err
}

func buildPipelineSelector(cfg *common.Config) (outil.Selector, error) {
return outil.BuildSelectorFromConfig(cfg, outil.Settings{
Key: "pipeline",
MultiKey: "pipelines",
EnableSingleOnly: true,
FailEmpty: false,
Case: outil.SelectorLowerCase,
})
}
58 changes: 58 additions & 0 deletions libbeat/outputs/elasticsearch/elasticsearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"fmt"
"testing"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/esleg/eslegclient"
)

Expand Down Expand Up @@ -73,3 +75,59 @@ func TestGlobalConnectCallbacksManagement(t *testing.T) {
t.Fatalf("third callback cannot be retrieved")
}
}

func TestPipelineSelection(t *testing.T) {
cases := map[string]struct {
cfg map[string]interface{}
event beat.Event
want string
}{
"no pipline configured": {},
"pipeline configured": {
cfg: map[string]interface{}{"pipeline": "test"},
want: "test",
},
"pipeline must be lowercase": {
cfg: map[string]interface{}{"pipeline": "Test"},
want: "test",
},
"pipeline via event meta": {
event: beat.Event{Meta: common.MapStr{"pipeline": "test"}},
want: "test",
},
"pipeline via event meta must be lowercase": {
event: beat.Event{Meta: common.MapStr{"pipeline": "Test"}},
want: "test",
},
"pipelines setting": {
cfg: map[string]interface{}{
"pipelines": []map[string]interface{}{{"pipeline": "test"}},
},
want: "test",
},
"pipelines setting must be lowercase": {
cfg: map[string]interface{}{
"pipelines": []map[string]interface{}{{"pipeline": "Test"}},
},
want: "test",
},
}

for name, test := range cases {
t.Run(name, func(t *testing.T) {
selector, err := buildPipelineSelector(common.MustNewConfigFrom(test.cfg))
if err != nil {
t.Fatalf("Failed to parse configuration: %v", err)
}

got, err := getPipeline(&test.event, &selector)
if err != nil {
t.Fatalf("Failed to create pipeline name: %v", err)
}

if test.want != got {
t.Errorf("Pipeline name missmatch (want: %v, got: %v)", test.want, got)
}
})
}
}
62 changes: 62 additions & 0 deletions libbeat/outputs/kafka/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package kafka
import (
"testing"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
)
Expand Down Expand Up @@ -97,3 +98,64 @@ func TestConfigInvalid(t *testing.T) {
})
}
}

func TestTopicSelection(t *testing.T) {
cases := map[string]struct {
cfg map[string]interface{}
event beat.Event
want string
}{
"topic configured": {
cfg: map[string]interface{}{"topic": "test"},
want: "test",
},
"topic must keep case": {
cfg: map[string]interface{}{"topic": "Test"},
want: "Test",
},
"topics setting": {
cfg: map[string]interface{}{
"topics": []map[string]interface{}{{"topic": "test"}},
},
want: "test",
},
"topics setting must keep case": {
cfg: map[string]interface{}{
"topics": []map[string]interface{}{{"topic": "Test"}},
},
want: "Test",
},
"use event field": {
cfg: map[string]interface{}{"topic": "test-%{[field]}"},
event: beat.Event{
Fields: common.MapStr{"field": "from-event"},
},
want: "test-from-event",
},
"use event field must keep case": {
cfg: map[string]interface{}{"topic": "Test-%{[field]}"},
event: beat.Event{
Fields: common.MapStr{"field": "From-Event"},
},
want: "Test-From-Event",
},
}

for name, test := range cases {
t.Run(name, func(t *testing.T) {
selector, err := buildTopicSelector(common.MustNewConfigFrom(test.cfg))
if err != nil {
t.Fatalf("Failed to parse configuration: %v", err)
}

got, err := selector.Select(&test.event)
if err != nil {
t.Fatalf("Failed to create topic name: %v", err)
}

if test.want != got {
t.Errorf("Pipeline name missmatch (want: %v, got: %v)", test.want, got)
}
})
}
}
17 changes: 11 additions & 6 deletions libbeat/outputs/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,7 @@ func makeKafka(
return outputs.Fail(err)
}

topic, err := outil.BuildSelectorFromConfig(cfg, outil.Settings{
Key: "topic",
MultiKey: "topics",
EnableSingleOnly: true,
FailEmpty: true,
})
topic, err := buildTopicSelector(cfg)
if err != nil {
return outputs.Fail(err)
}
Expand Down Expand Up @@ -102,3 +97,13 @@ func makeKafka(
}
return outputs.Success(config.BulkMaxSize, retry, client)
}

func buildTopicSelector(cfg *common.Config) (outil.Selector, error) {
return outil.BuildSelectorFromConfig(cfg, outil.Settings{
Key: "topic",
MultiKey: "topics",
EnableSingleOnly: true,
FailEmpty: true,
Case: outil.SelectorKeepCase,
})
}
2 changes: 1 addition & 1 deletion libbeat/outputs/logstash/logstash_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func esConnect(t *testing.T, index string) *esConnection {

host := getElasticsearchHost()
indexFmt := fmtstr.MustCompileEvent(fmt.Sprintf("%s-%%{+yyyy.MM.dd}", index))
indexFmtExpr, _ := outil.FmtSelectorExpr(indexFmt, "")
indexFmtExpr, _ := outil.FmtSelectorExpr(indexFmt, "", outil.SelectorLowerCase)
indexSel := outil.MakeSelector(indexFmtExpr)
index, _ = indexSel.Select(&beat.Event{
Timestamp: ts,
Expand Down
Loading

0 comments on commit aa0d320

Please sign in to comment.