From 84bf434017a88c9d1a1860d711bfcfa4abe3c5fb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Tue, 18 Jan 2022 11:39:59 +0100 Subject: [PATCH] Enable require_alias for Bulk requests for all actions when target is a write alias (#29879) ## What does this PR do? This PR adds support for requiring alias when using ILM. From now on a `Selector` can tell Elasticsearch client if the target we are shipping events to is an alias or an index. By default, we consider everything an index, and only consider a target an alias when ILM is enabled. The feature is only supported since ES 7.10, so if the user tries to connect to an older version, we cannot help them with this parameter. ## Why is it important? We see issues around ILM sometimes where users have deleted their write alias causing running beats instances to auto-create an index where the write alias should (with auto-mappings to boot, since the template won't be applied). --- CHANGELOG.next.asciidoc | 2 + libbeat/esleg/eslegclient/bulkapi.go | 9 +-- libbeat/idxmgmt/std.go | 4 ++ libbeat/outputs/elasticsearch/client.go | 8 +++ .../elasticsearch/client_integration_test.go | 7 ++- libbeat/outputs/elasticsearch/client_test.go | 60 ++++++++++++++----- .../elasticsearch/death_letter_selector.go | 2 + libbeat/outputs/outil/select.go | 2 + libbeat/outputs/output_reg.go | 2 + 9 files changed, 77 insertions(+), 19 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 88562b2df6e..8eeac98795f 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -36,6 +36,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d *Affecting all Beats* +- Enable `require_alias` for Bulk requests for all actions when target is a write alias. {issue}27874[27874] {pull}29879[29879] + *Auditbeat* diff --git a/libbeat/esleg/eslegclient/bulkapi.go b/libbeat/esleg/eslegclient/bulkapi.go index 70118d57fd5..44027a837d3 100644 --- a/libbeat/esleg/eslegclient/bulkapi.go +++ b/libbeat/esleg/eslegclient/bulkapi.go @@ -51,10 +51,11 @@ type BulkDeleteAction struct { } type BulkMeta struct { - Index string `json:"_index" struct:"_index"` - DocType string `json:"_type,omitempty" struct:"_type,omitempty"` - Pipeline string `json:"pipeline,omitempty" struct:"pipeline,omitempty"` - ID string `json:"_id,omitempty" struct:"_id,omitempty"` + Index string `json:"_index" struct:"_index"` + DocType string `json:"_type,omitempty" struct:"_type,omitempty"` + Pipeline string `json:"pipeline,omitempty" struct:"pipeline,omitempty"` + ID string `json:"_id,omitempty" struct:"_id,omitempty"` + RequireAlias bool `json:"require_alias,omitempty" struct:"require_alias,omitempty"` } type bulkRequest struct { diff --git a/libbeat/idxmgmt/std.go b/libbeat/idxmgmt/std.go index b1f0071ade2..d8f032c4f42 100644 --- a/libbeat/idxmgmt/std.go +++ b/libbeat/idxmgmt/std.go @@ -337,6 +337,8 @@ func (s *ilmIndexSelector) Select(evt *beat.Event) (string, error) { return idx, err } +func (s ilmIndexSelector) IsAlias() bool { return true } + func (s indexSelector) Select(evt *beat.Event) (string, error) { if idx := getEventCustomIndex(evt, s.beatInfo); idx != "" { return idx, nil @@ -344,6 +346,8 @@ func (s indexSelector) Select(evt *beat.Event) (string, error) { return s.sel.Select(evt) } +func (s indexSelector) IsAlias() bool { return false } + func getEventCustomIndex(evt *beat.Event, beatInfo beat.Info) string { if len(evt.Meta) == 0 { return "" diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index deab29c3dcd..3900bdeb479 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -317,6 +317,10 @@ func (client *Client) createEventBulkMeta(version common.Version, event *beat.Ev ID: id, } + if isRequireAliasSupported(version) { + meta.RequireAlias = client.index.IsAlias() + } + if opType == events.OpTypeDelete { if id != "" { return eslegclient.BulkDeleteAction{Delete: meta}, nil @@ -333,6 +337,10 @@ func (client *Client) createEventBulkMeta(version common.Version, event *beat.Ev return eslegclient.BulkIndexAction{Index: meta}, nil } +func isRequireAliasSupported(version common.Version) bool { + return !version.LessThan(common.MustNewVersion("7.10.0")) +} + func (client *Client) getPipeline(event *beat.Event) (string, error) { if event.Meta != nil { pipeline, err := events.GetMetaStringValue(*event, events.FieldMetaPipeline) diff --git a/libbeat/outputs/elasticsearch/client_integration_test.go b/libbeat/outputs/elasticsearch/client_integration_test.go index 2b05f8a3cdb..1bf5d99ce29 100644 --- a/libbeat/outputs/elasticsearch/client_integration_test.go +++ b/libbeat/outputs/elasticsearch/client_integration_test.go @@ -420,7 +420,8 @@ func connectTestEs(t *testing.T, cfg interface{}, stats outputs.Observer) (outpu } info := beat.Info{Beat: "libbeat"} - im, _ := idxmgmt.DefaultSupport(nil, info, nil) + // ILM must be disabled otherwise custom index settings are ignored. + im, _ := idxmgmt.DefaultSupport(nil, info, disabledILMConfig()) output, err := makeES(im, info, stats, config) if err != nil { t.Fatal(err) @@ -438,6 +439,10 @@ func connectTestEs(t *testing.T, cfg interface{}, stats outputs.Observer) (outpu return client, client } +func disabledILMConfig() *common.Config { + return common.MustNewConfigFrom(map[string]interface{}{"setup": map[string]interface{}{"ilm": map[string]interface{}{"enabled": false}}}) +} + // setupRoleMapping sets up role mapping for the Kerberos user beats@ELASTIC func setupRoleMapping(t *testing.T, host string) error { _, client := connectTestEsWithoutStats(t, map[string]interface{}{ diff --git a/libbeat/outputs/elasticsearch/client_test.go b/libbeat/outputs/elasticsearch/client_test.go index 2a03d10481d..9cdd43dca08 100644 --- a/libbeat/outputs/elasticsearch/client_test.go +++ b/libbeat/outputs/elasticsearch/client_test.go @@ -409,22 +409,49 @@ func TestClientWithHeaders(t *testing.T) { func TestBulkEncodeEvents(t *testing.T) { cases := map[string]struct { - version string - docType string - config common.MapStr - events []common.MapStr + version string + docType string + config common.MapStr + ilmConfig *common.Config + isAlias bool + events []common.MapStr }{ "6.x": { - version: "6.8.0", - docType: "doc", - config: common.MapStr{}, - events: []common.MapStr{{"message": "test"}}, + version: "6.8.0", + docType: "doc", + config: common.MapStr{}, + ilmConfig: common.NewConfig(), + events: []common.MapStr{{"message": "test"}}, }, - "latest": { - version: version.GetDefaultVersion(), - docType: "", - config: common.MapStr{}, - events: []common.MapStr{{"message": "test"}}, + "require_alias not supported": { + version: "7.9.0", + docType: "", + config: common.MapStr{}, + ilmConfig: common.NewConfig(), + events: []common.MapStr{{"message": "test"}}, + }, + "require_alias is supported": { + version: "7.10.0", + docType: "", + config: common.MapStr{}, + ilmConfig: common.NewConfig(), + isAlias: true, + events: []common.MapStr{{"message": "test"}}, + }, + "latest with ILM": { + version: version.GetDefaultVersion(), + docType: "", + config: common.MapStr{}, + ilmConfig: common.NewConfig(), + isAlias: true, + events: []common.MapStr{{"message": "test"}}, + }, + "latest without ILM": { + version: version.GetDefaultVersion(), + docType: "", + config: common.MapStr{}, + ilmConfig: disabledILMConfig(), + events: []common.MapStr{{"message": "test"}}, }, } @@ -437,7 +464,7 @@ func TestBulkEncodeEvents(t *testing.T) { Version: test.version, } - im, err := idxmgmt.DefaultSupport(nil, info, common.NewConfig()) + im, err := idxmgmt.DefaultSupport(nil, info, test.ilmConfig) require.NoError(t, err) index, pipeline, err := buildSelectors(im, info, cfg) @@ -479,6 +506,7 @@ func TestBulkEncodeEvents(t *testing.T) { } assert.NotEqual(t, "", meta.Index) + assert.Equal(t, test.isAlias, meta.RequireAlias) assert.Equal(t, test.docType, meta.DocType) } @@ -487,6 +515,10 @@ func TestBulkEncodeEvents(t *testing.T) { } } +func disabledILMConfig() *common.Config { + return common.MustNewConfigFrom(map[string]interface{}{"setup": map[string]interface{}{"ilm": map[string]interface{}{"enabled": false}}}) +} + func TestBulkEncodeEventsWithOpType(t *testing.T) { cases := []common.MapStr{ {"_id": "111", "op_type": e.OpTypeIndex, "message": "test 1", "bulkIndex": 0}, diff --git a/libbeat/outputs/elasticsearch/death_letter_selector.go b/libbeat/outputs/elasticsearch/death_letter_selector.go index 02bd3780cab..34184c80c1a 100644 --- a/libbeat/outputs/elasticsearch/death_letter_selector.go +++ b/libbeat/outputs/elasticsearch/death_letter_selector.go @@ -34,3 +34,5 @@ func (d DeadLetterSelector) Select(event *beat.Event) (string, error) { } return d.Selector.Select(event) } + +func (d DeadLetterSelector) IsAlias() bool { return false } diff --git a/libbeat/outputs/outil/select.go b/libbeat/outputs/outil/select.go index 1615a3bdb11..ebe55674e4a 100644 --- a/libbeat/outputs/outil/select.go +++ b/libbeat/outputs/outil/select.go @@ -87,6 +87,8 @@ func (s Selector) Select(evt *beat.Event) (string, error) { return s.sel.sel(evt) } +func (s Selector) IsAlias() bool { return false } + // IsEmpty checks if the selector is not configured and will always return an empty string. func (s Selector) IsEmpty() bool { return s.sel == nilSelector || s.sel == nil diff --git a/libbeat/outputs/output_reg.go b/libbeat/outputs/output_reg.go index 86c1323c505..f4abc63298a 100644 --- a/libbeat/outputs/output_reg.go +++ b/libbeat/outputs/output_reg.go @@ -43,8 +43,10 @@ type IndexManager interface { } // IndexSelector is used to find the index name an event shall be indexed to. +// It also used to check if during indexing required_alias should be set. type IndexSelector interface { Select(event *beat.Event) (string, error) + IsAlias() bool } // Group configures and combines multiple clients into load-balanced group of clients