From d81c39f454faa1af3499fbb914ac0491ef0aec14 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Mon, 17 Jan 2022 17:40:49 +0100 Subject: [PATCH 1/6] Reinit --- libbeat/esleg/eslegclient/bulkapi.go | 9 +++++---- libbeat/idxmgmt/std.go | 4 ++++ libbeat/outputs/elasticsearch/client.go | 9 +++++---- libbeat/outputs/elasticsearch/death_letter_selector.go | 2 ++ libbeat/outputs/outil/select.go | 2 ++ libbeat/outputs/output_reg.go | 2 ++ 6 files changed, 20 insertions(+), 8 deletions(-) diff --git a/libbeat/esleg/eslegclient/bulkapi.go b/libbeat/esleg/eslegclient/bulkapi.go index 70118d57fd5..0b76260e672 100644 --- a/libbeat/esleg/eslegclient/bulkapi.go +++ b/libbeat/esleg/eslegclient/bulkapi.go @@ -51,10 +51,11 @@ type BulkDeleteAction struct { } type BulkMeta struct { - Index string `json:"_index" struct:"_index"` - DocType string `json:"_type,omitempty" struct:"_type,omitempty"` - Pipeline string `json:"pipeline,omitempty" struct:"pipeline,omitempty"` - ID string `json:"_id,omitempty" struct:"_id,omitempty"` + Index string `json:"_index" struct:"_index"` + DocType string `json:"_type,omitempty" struct:"_type,omitempty"` + Pipeline string `json:"pipeline,omitempty" struct:"pipeline,omitempty"` + ID string `json:"_id,omitempty" struct:"_id,omitempty"` + RequireAlias bool `json:"require_alias" struct:"require_alias"` } type bulkRequest struct { diff --git a/libbeat/idxmgmt/std.go b/libbeat/idxmgmt/std.go index b1f0071ade2..d8f032c4f42 100644 --- a/libbeat/idxmgmt/std.go +++ b/libbeat/idxmgmt/std.go @@ -337,6 +337,8 @@ func (s *ilmIndexSelector) Select(evt *beat.Event) (string, error) { return idx, err } +func (s ilmIndexSelector) IsAlias() bool { return true } + func (s indexSelector) Select(evt *beat.Event) (string, error) { if idx := getEventCustomIndex(evt, s.beatInfo); idx != "" { return idx, nil @@ -344,6 +346,8 @@ func (s indexSelector) Select(evt *beat.Event) (string, error) { return s.sel.Select(evt) } +func (s indexSelector) IsAlias() bool { return false } + func getEventCustomIndex(evt *beat.Event, beatInfo beat.Info) string { if len(evt.Meta) == 0 { return "" diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index deab29c3dcd..691e519b36c 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -311,10 +311,11 @@ func (client *Client) createEventBulkMeta(version common.Version, event *beat.Ev opType := events.GetOpType(*event) meta := eslegclient.BulkMeta{ - Index: index, - DocType: eventType, - Pipeline: pipeline, - ID: id, + Index: index, + DocType: eventType, + Pipeline: pipeline, + ID: id, + RequireAlias: client.index.IsAlias(), } if opType == events.OpTypeDelete { diff --git a/libbeat/outputs/elasticsearch/death_letter_selector.go b/libbeat/outputs/elasticsearch/death_letter_selector.go index 02bd3780cab..34184c80c1a 100644 --- a/libbeat/outputs/elasticsearch/death_letter_selector.go +++ b/libbeat/outputs/elasticsearch/death_letter_selector.go @@ -34,3 +34,5 @@ func (d DeadLetterSelector) Select(event *beat.Event) (string, error) { } return d.Selector.Select(event) } + +func (d DeadLetterSelector) IsAlias() bool { return false } diff --git a/libbeat/outputs/outil/select.go b/libbeat/outputs/outil/select.go index 1615a3bdb11..ebe55674e4a 100644 --- a/libbeat/outputs/outil/select.go +++ b/libbeat/outputs/outil/select.go @@ -87,6 +87,8 @@ func (s Selector) Select(evt *beat.Event) (string, error) { return s.sel.sel(evt) } +func (s Selector) IsAlias() bool { return false } + // IsEmpty checks if the selector is not configured and will always return an empty string. func (s Selector) IsEmpty() bool { return s.sel == nilSelector || s.sel == nil diff --git a/libbeat/outputs/output_reg.go b/libbeat/outputs/output_reg.go index 86c1323c505..f4abc63298a 100644 --- a/libbeat/outputs/output_reg.go +++ b/libbeat/outputs/output_reg.go @@ -43,8 +43,10 @@ type IndexManager interface { } // IndexSelector is used to find the index name an event shall be indexed to. +// It also used to check if during indexing required_alias should be set. type IndexSelector interface { Select(event *beat.Event) (string, error) + IsAlias() bool } // Group configures and combines multiple clients into load-balanced group of clients From c99ebc6ff0ea29f17cb5e375ac7abadbc91b24a0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Mon, 17 Jan 2022 18:08:40 +0100 Subject: [PATCH 2/6] add tests --- libbeat/outputs/elasticsearch/client.go | 7 ++- libbeat/outputs/elasticsearch/client_test.go | 45 ++++++++++++++------ 2 files changed, 37 insertions(+), 15 deletions(-) diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index 691e519b36c..490b62d7a34 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -310,12 +310,17 @@ func (client *Client) createEventBulkMeta(version common.Version, event *beat.Ev id, _ := events.GetMetaStringValue(*event, events.FieldMetaID) opType := events.GetOpType(*event) + alias := client.index.IsAlias() + if version.Major < 7 { + alias = false + } + meta := eslegclient.BulkMeta{ Index: index, DocType: eventType, Pipeline: pipeline, ID: id, - RequireAlias: client.index.IsAlias(), + RequireAlias: alias, } if opType == events.OpTypeDelete { diff --git a/libbeat/outputs/elasticsearch/client_test.go b/libbeat/outputs/elasticsearch/client_test.go index 2a03d10481d..c05a9d75b33 100644 --- a/libbeat/outputs/elasticsearch/client_test.go +++ b/libbeat/outputs/elasticsearch/client_test.go @@ -409,22 +409,34 @@ func TestClientWithHeaders(t *testing.T) { func TestBulkEncodeEvents(t *testing.T) { cases := map[string]struct { - version string - docType string - config common.MapStr - events []common.MapStr + version string + docType string + config common.MapStr + ilmConfig *common.Config + isAlias bool + events []common.MapStr }{ "6.x": { - version: "6.8.0", - docType: "doc", - config: common.MapStr{}, - events: []common.MapStr{{"message": "test"}}, + version: "6.8.0", + docType: "doc", + config: common.MapStr{}, + ilmConfig: common.NewConfig(), + events: []common.MapStr{{"message": "test"}}, }, - "latest": { - version: version.GetDefaultVersion(), - docType: "", - config: common.MapStr{}, - events: []common.MapStr{{"message": "test"}}, + "latest with ILM": { + version: version.GetDefaultVersion(), + docType: "", + config: common.MapStr{}, + ilmConfig: common.NewConfig(), + isAlias: true, + events: []common.MapStr{{"message": "test"}}, + }, + "latest without ILM": { + version: version.GetDefaultVersion(), + docType: "", + config: common.MapStr{}, + ilmConfig: disabledILMConfig(), + events: []common.MapStr{{"message": "test"}}, }, } @@ -437,7 +449,7 @@ func TestBulkEncodeEvents(t *testing.T) { Version: test.version, } - im, err := idxmgmt.DefaultSupport(nil, info, common.NewConfig()) + im, err := idxmgmt.DefaultSupport(nil, info, test.ilmConfig) require.NoError(t, err) index, pipeline, err := buildSelectors(im, info, cfg) @@ -479,6 +491,7 @@ func TestBulkEncodeEvents(t *testing.T) { } assert.NotEqual(t, "", meta.Index) + assert.Equal(t, test.isAlias, meta.RequireAlias) assert.Equal(t, test.docType, meta.DocType) } @@ -487,6 +500,10 @@ func TestBulkEncodeEvents(t *testing.T) { } } +func disabledILMConfig() *common.Config { + return common.MustNewConfigFrom(map[string]interface{}{"setup": map[string]interface{}{"ilm": map[string]interface{}{"enabled": false}}}) +} + func TestBulkEncodeEventsWithOpType(t *testing.T) { cases := []common.MapStr{ {"_id": "111", "op_type": e.OpTypeIndex, "message": "test 1", "bulkIndex": 0}, From fe3807056d9480ac2a9f82153c1bc6d71d4cef35 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Mon, 17 Jan 2022 18:11:12 +0100 Subject: [PATCH 3/6] add changelog entry --- CHANGELOG.next.asciidoc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 88562b2df6e..8eeac98795f 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -36,6 +36,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d *Affecting all Beats* +- Enable `require_alias` for Bulk requests for all actions when target is a write alias. {issue}27874[27874] {pull}29879[29879] + *Auditbeat* From 1c9ecb169a260d5ff5f57025d9710dadaf57a067 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Mon, 17 Jan 2022 18:22:29 +0100 Subject: [PATCH 4/6] feature is only supported since 7.10 --- libbeat/esleg/eslegclient/bulkapi.go | 2 +- libbeat/outputs/elasticsearch/client.go | 20 +++++++++++--------- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/libbeat/esleg/eslegclient/bulkapi.go b/libbeat/esleg/eslegclient/bulkapi.go index 0b76260e672..4c72a2ad932 100644 --- a/libbeat/esleg/eslegclient/bulkapi.go +++ b/libbeat/esleg/eslegclient/bulkapi.go @@ -55,7 +55,7 @@ type BulkMeta struct { DocType string `json:"_type,omitempty" struct:"_type,omitempty"` Pipeline string `json:"pipeline,omitempty" struct:"pipeline,omitempty"` ID string `json:"_id,omitempty" struct:"_id,omitempty"` - RequireAlias bool `json:"require_alias" struct:"require_alias"` + RequireAlias bool `json:"require_alias,omitempty" struct:"require_alias"` } type bulkRequest struct { diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index 490b62d7a34..6c0af16c46d 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -310,17 +310,15 @@ func (client *Client) createEventBulkMeta(version common.Version, event *beat.Ev id, _ := events.GetMetaStringValue(*event, events.FieldMetaID) opType := events.GetOpType(*event) - alias := client.index.IsAlias() - if version.Major < 7 { - alias = false + meta := eslegclient.BulkMeta{ + Index: index, + DocType: eventType, + Pipeline: pipeline, + ID: id, } - meta := eslegclient.BulkMeta{ - Index: index, - DocType: eventType, - Pipeline: pipeline, - ID: id, - RequireAlias: alias, + if isRequireAliasSupported(version) { + meta.RequireAlias = client.index.IsAlias() } if opType == events.OpTypeDelete { @@ -339,6 +337,10 @@ func (client *Client) createEventBulkMeta(version common.Version, event *beat.Ev return eslegclient.BulkIndexAction{Index: meta}, nil } +func isRequireAliasSupported(version common.Version) bool { + return !version.LessThan(common.MustNewVersion("7.9.0")) +} + func (client *Client) getPipeline(event *beat.Event) (string, error) { if event.Meta != nil { pipeline, err := events.GetMetaStringValue(*event, events.FieldMetaPipeline) From 40aad495ebd862098a5e85a32e237cc7a35b9426 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Tue, 18 Jan 2022 09:02:02 +0100 Subject: [PATCH 5/6] address review notes --- libbeat/esleg/eslegclient/bulkapi.go | 2 +- libbeat/outputs/elasticsearch/client.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/libbeat/esleg/eslegclient/bulkapi.go b/libbeat/esleg/eslegclient/bulkapi.go index 4c72a2ad932..44027a837d3 100644 --- a/libbeat/esleg/eslegclient/bulkapi.go +++ b/libbeat/esleg/eslegclient/bulkapi.go @@ -55,7 +55,7 @@ type BulkMeta struct { DocType string `json:"_type,omitempty" struct:"_type,omitempty"` Pipeline string `json:"pipeline,omitempty" struct:"pipeline,omitempty"` ID string `json:"_id,omitempty" struct:"_id,omitempty"` - RequireAlias bool `json:"require_alias,omitempty" struct:"require_alias"` + RequireAlias bool `json:"require_alias,omitempty" struct:"require_alias,omitempty"` } type bulkRequest struct { diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index 6c0af16c46d..3900bdeb479 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -338,7 +338,7 @@ func (client *Client) createEventBulkMeta(version common.Version, event *beat.Ev } func isRequireAliasSupported(version common.Version) bool { - return !version.LessThan(common.MustNewVersion("7.9.0")) + return !version.LessThan(common.MustNewVersion("7.10.0")) } func (client *Client) getPipeline(event *beat.Event) (string, error) { From 3f61d77a88660e19148a3a21831b6ab614c5c3cc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Tue, 18 Jan 2022 09:02:16 +0100 Subject: [PATCH 6/6] add more tests and follow up in integration tests --- .../elasticsearch/client_integration_test.go | 7 ++++++- libbeat/outputs/elasticsearch/client_test.go | 15 +++++++++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/libbeat/outputs/elasticsearch/client_integration_test.go b/libbeat/outputs/elasticsearch/client_integration_test.go index 2b05f8a3cdb..1bf5d99ce29 100644 --- a/libbeat/outputs/elasticsearch/client_integration_test.go +++ b/libbeat/outputs/elasticsearch/client_integration_test.go @@ -420,7 +420,8 @@ func connectTestEs(t *testing.T, cfg interface{}, stats outputs.Observer) (outpu } info := beat.Info{Beat: "libbeat"} - im, _ := idxmgmt.DefaultSupport(nil, info, nil) + // ILM must be disabled otherwise custom index settings are ignored. + im, _ := idxmgmt.DefaultSupport(nil, info, disabledILMConfig()) output, err := makeES(im, info, stats, config) if err != nil { t.Fatal(err) @@ -438,6 +439,10 @@ func connectTestEs(t *testing.T, cfg interface{}, stats outputs.Observer) (outpu return client, client } +func disabledILMConfig() *common.Config { + return common.MustNewConfigFrom(map[string]interface{}{"setup": map[string]interface{}{"ilm": map[string]interface{}{"enabled": false}}}) +} + // setupRoleMapping sets up role mapping for the Kerberos user beats@ELASTIC func setupRoleMapping(t *testing.T, host string) error { _, client := connectTestEsWithoutStats(t, map[string]interface{}{ diff --git a/libbeat/outputs/elasticsearch/client_test.go b/libbeat/outputs/elasticsearch/client_test.go index c05a9d75b33..9cdd43dca08 100644 --- a/libbeat/outputs/elasticsearch/client_test.go +++ b/libbeat/outputs/elasticsearch/client_test.go @@ -423,6 +423,21 @@ func TestBulkEncodeEvents(t *testing.T) { ilmConfig: common.NewConfig(), events: []common.MapStr{{"message": "test"}}, }, + "require_alias not supported": { + version: "7.9.0", + docType: "", + config: common.MapStr{}, + ilmConfig: common.NewConfig(), + events: []common.MapStr{{"message": "test"}}, + }, + "require_alias is supported": { + version: "7.10.0", + docType: "", + config: common.MapStr{}, + ilmConfig: common.NewConfig(), + isAlias: true, + events: []common.MapStr{{"message": "test"}}, + }, "latest with ILM": { version: version.GetDefaultVersion(), docType: "",